# Hindi EmoKnob ‚Äî Demo (v15)

Safe, local-model-first notebook. Fixes XTTS wrapper mismatch and adds robust helpers.

Run cells in order (Environment ‚Üí XTTS download/load ‚Üí Indic load ‚Üí Helpers ‚Üí GUI).

In [11]:
# Environment & paths (run first)
import os, sys, shutil, traceback
from pathlib import Path
import torch

PROJECT_ROOT = Path.cwd()
MODELS_DIR = PROJECT_ROOT / "models"
XTTS_LOCAL_DIR = MODELS_DIR / "xtts_v2"   # local XTTS folder
INDIC_LOCAL_DIR = MODELS_DIR / "ai4bharat_indicwav2vec_hindi"  # local Indic wav2vec

for p in [MODELS_DIR, XTTS_LOCAL_DIR, INDIC_LOCAL_DIR]:
    p.mkdir(parents=True, exist_ok=True)

# Safe thread config: try set_num_threads, avoid set_num_interop_threads after parallel work started
try:
    torch.set_num_threads(2)
except Exception as e:
    print("Warning: torch.set_num_threads failed:", e)

# Sampling rates
SR_XTTS = 22050   # XTTS default sample rate used by get_conditioning_latents
SR_INDIC = 16000  # ai4bharat/indicwav2vec-hindi expects 16k

print('PROJECT_ROOT:', PROJECT_ROOT)
print('MODELS_DIR:', MODELS_DIR)
print('XTTS_LOCAL_DIR:', XTTS_LOCAL_DIR)
print('INDIC_LOCAL_DIR:', INDIC_LOCAL_DIR)
print('SR_XTTS:', SR_XTTS, 'SR_INDIC:', SR_INDIC)


PROJECT_ROOT: D:\Downloads\Projects\Hindi Emotion controlled TTS with Voice cloning
MODELS_DIR: D:\Downloads\Projects\Hindi Emotion controlled TTS with Voice cloning\models
XTTS_LOCAL_DIR: D:\Downloads\Projects\Hindi Emotion controlled TTS with Voice cloning\models\xtts_v2
INDIC_LOCAL_DIR: D:\Downloads\Projects\Hindi Emotion controlled TTS with Voice cloning\models\ai4bharat_indicwav2vec_hindi
SR_XTTS: 22050 SR_INDIC: 16000


In [13]:
# Utilities: ffmpeg-based conversion + normalization + unique path
import subprocess
import librosa, soundfile as sf
from pathlib import Path
import numpy as np

def run_ffmpeg_convert_to_wav(in_path, out_path, sr):
    in_path = str(in_path)
    out_path = str(out_path)
    cmd = ["ffmpeg", "-y", "-i", in_path, "-ac", "1", "-ar", str(sr), "-vn",
           "-hide_banner", "-loglevel", "error", out_path]
    try:
        subprocess.run(cmd, check=True)
    except Exception as e:
        raise RuntimeError(f"ffmpeg conversion failed for {in_path}: {e}")

def preprocess_audio(in_path, out_wav_path, sr, normalize=True):
    '''Convert any audio file to mono WAV with sample-rate 'sr', do simple amplitude normalization.'''
    in_path = Path(in_path)
    out_wav_path = Path(out_wav_path)
    out_wav_path.parent.mkdir(parents=True, exist_ok=True)
    tmp = out_wav_path.with_suffix('.tmp.wav')
    run_ffmpeg_convert_to_wav(in_path, tmp, sr)
    y, _ = librosa.load(str(tmp), sr=sr, mono=True)
    if normalize:
        peak = max(1e-9, max(abs(float(y.max())), abs(float(y.min()))))
        y = y / peak * 0.95
    sf.write(str(out_wav_path), y.astype(np.float32), sr)
    try:
        tmp.unlink()
    except:
        pass
    return out_wav_path

def unique_path(path: Path):
    path = Path(path)
    if not path.exists():
        return path
    base = path.stem
    suf = path.suffix
    parent = path.parent
    i = 1
    while True:
        candidate = parent / f"{base}_{i}{suf}"
        if not candidate.exists():
            return candidate
        i += 1

# --- MISSING HELPERS ADDED BY FIX ---
def list_emotions():
    emotion_dir = PROJECT_ROOT / 'data' / 'emotion_samples'
    if not emotion_dir.exists(): return []
    return sorted([d.name for d in emotion_dir.iterdir() if d.is_dir()])

def list_speakers():
    speaker_dir = PROJECT_ROOT / 'data' / 'speakers'
    if not speaker_dir.exists(): return []
    return sorted([f.name for f in speaker_dir.iterdir() if f.is_file() and f.suffix.lower() in ['.wav', '.mp3', '.m4a', '.flac']])

def ensure_speaker_clean(speaker_path, sr=SR_XTTS):
    speaker_path = Path(speaker_path)
    if speaker_path.stem.endswith('_clean'):
         return speaker_path
    
    clean_path = speaker_path.with_name(speaker_path.stem + '_clean.wav')
    if clean_path.exists():
        return clean_path
        
    preprocess_audio(speaker_path, clean_path, sr=sr)
    return clean_path

def gui_alpha_to_internal(gui_val):
    # Map 0.0-1.0 to 0.0-0.5 roughly
    return float(gui_val) * 0.5

print('‚úì Utilities & Helpers Ready: list_emotions, list_speakers, ensure_speaker_clean, gui_alpha_to_internal defined.')

‚úì Utilities & Helpers Ready: list_emotions, list_speakers, ensure_speaker_clean, gui_alpha_to_internal defined.


In [59]:
# # XTTS local download & loader (local-first)
# from TTS.api import TTS
# from huggingface_hub import snapshot_download
# import shutil, os, traceback
# from pathlib import Path

# def ensure_xtts_local(target_dir: Path):
#     target_dir.mkdir(parents=True, exist_ok=True)
#     ck = target_dir / 'model.pth'
#     cfg = target_dir / 'config.json'
#     if ck.exists() and cfg.exists():
#         print('XTTS local present:', target_dir)
#         return True
#     print('Attempting snapshot_download of coqui/xtts-v2 into models folder (best-effort)...')
#     try:
#         tmp = snapshot_download(repo_id='coqui/xtts-v2', cache_dir=str(target_dir), repo_type='model', allow_patterns=['*'])
#         print('snapshot_download result:', tmp)
#     except Exception as e:
#         print('snapshot_download failed (this is OK if huggingface auth required). Error:', e)
#         traceback.print_exc()
#     ck = target_dir / 'model.pth'
#     cfg = target_dir / 'config.json'
#     if ck.exists() and cfg.exists():
#         return True
#     print('XTTS not available locally. You can allow TTS to download to cache once, then move folder to models/xtts_v2.')
#     return False

# def load_xtts_local_or_remote(gpu=False):
#     ok = ensure_xtts_local(Path('models') / 'xtts_v2')
#     try:
#         if ok:
#             print('Trying to load XTTS from local models/xtts_v2 ...')
#             t = TTS(model_path=str(Path('models') / 'xtts_v2' / 'model.pth'),
#                     config_path=str(Path('models') / 'xtts_v2' / 'config.json'),
#                     gpu=gpu)
#             print('Loaded XTTS from local files.')
#             return t
#     except Exception as e:
#         print('Failed to load local XTTS (will try model_name). Error:', e)
#         traceback.print_exc()
#     print('Loading XTTS via model_name (this will download to user cache if not present)...')
#     t = TTS(model_name='tts_models/multilingual/multi-dataset/xtts_v2', gpu=gpu)
#     print('XTTS loaded via model_name.')
#     return t

# # Load XTTS (CPU first)
# XTTS = None
# try:
#     XTTS = load_xtts_local_or_remote(gpu=False)
# except Exception as e:
#     print('XTTS load error:', e)
#     import traceback; traceback.print_exc()


In [14]:
# XTTS local download & loader (local-first)
from TTS.api import TTS
from huggingface_hub import snapshot_download
import shutil, os, traceback
from pathlib import Path
def ensure_xtts_local(target_dir: Path):
    target_dir.mkdir(parents=True, exist_ok=True)
    ck = target_dir / 'model.pth'
    cfg = target_dir / 'config.json'
    if ck.exists() and cfg.exists():
        print('XTTS local present:', target_dir)
        return True
    print('Attempting snapshot_download of coqui/xtts-v2 into models folder (best-effort)...')
    try:
        tmp = snapshot_download(repo_id='coqui/xtts-v2', cache_dir=str(target_dir), repo_type='model', allow_patterns=['*'])
        print('snapshot_download result:', tmp)
    except Exception as e:
        print('snapshot_download failed (this is OK if huggingface auth required). Error:', e)
        traceback.print_exc()
    ck = target_dir / 'model.pth'
    cfg = target_dir / 'config.json'
    if ck.exists() and cfg.exists():
        return True
    print('XTTS not available locally. You can allow TTS to download to cache once, then move folder to models/xtts_v2.')
    return False
def load_xtts_local_or_remote(gpu=False):
    # DIRECT LOAD: Skipping local check and going straight to model_name
    print('Loading XTTS via model_name (this will download to user cache if not present)...')
    t = TTS(model_name='tts_models/multilingual/multi-dataset/xtts_v2', gpu=gpu)
    print('XTTS loaded via model_name.')
    return t
# Load XTTS (CPU first)
XTTS = None
try:
    XTTS = load_xtts_local_or_remote(gpu=False)
except Exception as e:
    print('XTTS load error:', e)
    import traceback; traceback.print_exc()

Loading XTTS via model_name (this will download to user cache if not present)...
 > tts_models/multilingual/multi-dataset/xtts_v2 is already downloaded.
 > Using model: xtts
XTTS loaded via model_name.


In [15]:
# Load ai4bharat/indicwav2vec-hindi from local models folder
from transformers import Wav2Vec2Processor, Wav2Vec2Model
import torch
from pathlib import Path

INDIC_LOCAL = Path('models') / 'ai4bharat_indicwav2vec_hindi'
if not INDIC_LOCAL.exists():
    print('Warning: Indic model folder not found at:', INDIC_LOCAL)
else:
    print('Loading Indic wav2vec from:', INDIC_LOCAL)
    processor = Wav2Vec2Processor.from_pretrained(str(INDIC_LOCAL))
    indic_enc = Wav2Vec2Model.from_pretrained(str(INDIC_LOCAL))
    indic_enc.eval()
    print('Indic encoder loaded.')


Loading Indic wav2vec from: models\ai4bharat_indicwav2vec_hindi


Some weights of Wav2Vec2Model were not initialized from the model checkpoint at models\ai4bharat_indicwav2vec_hindi and are newly initialized: ['wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original0', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original1']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Indic encoder loaded.


In [16]:
# Embedding helpers: safe XTTS resolver and embedding extraction (UPDATED for GPT Latent)
import numpy as np
import torch
import librosa
from pathlib import Path

def resolve_xtts_internal_model(tts_obj):
    '''Return the internal XTTS model used by TTS wrapper.'''
    if tts_obj is None:
        raise RuntimeError('Provided tts_obj is None')
    if hasattr(tts_obj, 'synthesizer') and hasattr(tts_obj.synthesizer, 'tts_model'):
        return tts_obj.synthesizer.tts_model
    if hasattr(tts_obj, 'tts_model'):
        return tts_obj.tts_model
    raise RuntimeError('Could not resolve internal XTTS model. Ensure you loaded native XTTS-v2 via TTS API.')

def get_indic_embedding(wav_path, sr_source=SR_XTTS, sr_indic=SR_INDIC):
    '''Load wav, resample to sr_indic if needed, and return 1D numpy embedding (mean of last_hidden_state).'''
    global processor, indic_enc
    if 'processor' not in globals() or 'indic_enc' not in globals():
        raise RuntimeError('Indic encoder not loaded. Run the Indic load cell.')
    y, sr = librosa.load(str(wav_path), sr=sr_source, mono=True)
    if sr != sr_indic:
        y = librosa.resample(y, orig_sr=sr, target_sr=sr_indic)
    inp = processor(y, sampling_rate=sr_indic, return_tensors='pt', padding=True)
    with torch.no_grad():
        out = indic_enc(**inp).last_hidden_state
    emb = out.mean(dim=1).squeeze().detach().cpu().numpy()
    return emb

def get_xtts_speaker_latent(tts_obj, wav_path, load_sr=SR_XTTS):
    '''Extract GPT conditioning latent (prosody) from XTTS.
    Returns flattened numpy array (approx 32k dims).'''
    model = resolve_xtts_internal_model(tts_obj)
    try:
        res = model.get_conditioning_latents(str(wav_path), load_sr=load_sr)
    except TypeError:
        res = model.get_conditioning_latents(str(wav_path))
        
    if isinstance(res, (list, tuple)) and len(res) >= 2:
        # res[0] is gpt_cond_latent (1, 32, 1024) -> flatten to (32768,)
        # res[1] is speaker_embedding (1, 512, 1)
        gpt_lat = res[0]
    else:
        # Fallback if structure is different
        gpt_lat = res
        
    try:
        sp = gpt_lat.cpu().numpy() if hasattr(gpt_lat, 'cpu') else np.array(gpt_lat)
        return sp.ravel() # Flatten to 1D (~32768)
    except Exception as e:
        raise RuntimeError('Failed to convert GPT latent to numpy: ' + str(e))

print('Helpers ready (UPDATED): resolve_xtts_internal_model, get_indic_embedding, get_xtts_speaker_latent (GPT target)')


Helpers ready (UPDATED): resolve_xtts_internal_model, get_indic_embedding, get_xtts_speaker_latent (GPT target)


In [17]:
# Run this cell once to fix the function in memory!
def compute_emotion_vector_xtts_multi(emotion_dir, method='cca', n_comp=32, mode='average', sample_id=1,
                                      save_single_dir=None, save_avg_dir=None):
    # FIX: Import numpy specifically at the top to avoid UnboundLocalError scopes
    import numpy as np
    from pathlib import Path
    
    emotion_dir = Path(emotion_dir)
    emotion_name = emotion_dir.name
    
    # CACHE CHECK: If mode is average and save_avg_dir is provided
    if mode == 'average' and save_avg_dir:
        save_avg_dir = Path(save_avg_dir)
        save_avg_dir.mkdir(parents=True, exist_ok=True)
        avg_file_name = f"{emotion_name}_avg_{method}.npy"
        avg_file = save_avg_dir / avg_file_name
        
        if avg_file.exists():
            print(f'‚ö° Loading cached average vector for {emotion_name}...')
            return np.load(avg_file)
            
    sample_dirs = [d for d in sorted(emotion_dir.iterdir()) if d.is_dir()]
    if len(sample_dirs) == 0:
        raise ValueError('No sample subfolders found in: ' + str(emotion_dir))

    X = []
    Y = []
    single_vectors = []

    for sd in sample_dirs:
        n_clean = sd / 'neutral_clean.wav'
        e_clean = sd / f'{emotion_name}_clean.wav'
        
        if not (n_clean.exists() and e_clean.exists()):
             continue

        xi = get_indic_embedding(n_clean, sr_source=SR_XTTS, sr_indic=SR_INDIC)
        xe = get_indic_embedding(e_clean, sr_source=SR_XTTS, sr_indic=SR_INDIC)
        yi = get_xtts_speaker_latent(XTTS, n_clean, load_sr=SR_XTTS)
        ye = get_xtts_speaker_latent(XTTS, e_clean, load_sr=SR_XTTS)
        
        # Store deltas
        X.append(xe - xi)
        Y.append(ye - yi)
        single_vectors.append((sd.name, xe - xi, ye - yi))
        
    if len(X) == 0:
        raise ValueError('No matched pairs extracted for emotion: ' + str(emotion_dir))

    X = np.stack(X)
    Y = np.stack(Y)
    
    # Check dimensions
    dim_y = Y.shape[1] 
    if dim_y > 1024 and method != 'xtts_native':
        print(f'‚ö†Ô∏è Detected high-dimensional latent ({dim_y} dims). Forcing method="xtts_native".')
        method = 'xtts_native'

    if len(X) < 5 and method != 'xtts_native':
        print(f'‚ö†Ô∏è Only {len(X)} samples; Switch to "xtts_native" for stability.')
        method = 'xtts_native'

    result_vec = None
    
    if method == 'xtts_native':
        if mode == 'single':
            idx = sample_id - 1
            result_vec = single_vectors[idx][2]
        else:
            # Average raw emotion deltas
            result_vec = np.mean([v for (_,_,v) in single_vectors], axis=0)
            
    else:
        # CCA/PLS logic
        max_comp = min(X.shape[0], X.shape[1], Y.shape[1])
        actual_n_comp = min(n_comp, max_comp)
        mapper = fit_cca_or_pls(X, Y, method=method, n_comp=actual_n_comp)
        v_indic = np.mean([xi for (_,xi,_) in single_vectors], axis=0)
        v_indic = v_indic / (np.linalg.norm(v_indic) + 1e-12)
        result_vec = map_indic_vector_to_xtts(mapper, v_indic)

    # CACHE SAVE: If mode is average and save_avg_dir is provided
    if mode == 'average' and save_avg_dir and result_vec is not None:
        save_avg_dir = Path(save_avg_dir)
        save_avg_dir.mkdir(parents=True, exist_ok=True)
        avg_file_name = f"{emotion_name}_avg_{method}.npy"
        print(f'üíæ Saving average vector to {save_avg_dir / avg_file_name}...')
        np.save(save_avg_dir / avg_file_name, result_vec)

    return result_vec
print("Function patched in memory. Try synthesizing now!")

Function patched in memory. Try synthesizing now!


In [18]:
# [FIXED v2] Apply emotion vector using DIRECT INFERENCE + LANGUAGE FIX
# Handles KeyError: 'hi' by patching char_limits or disabling splitting.
import numpy as np
import torch
from pathlib import Path
import soundfile as sf

OUTPUT_GEN_DIR = PROJECT_ROOT / 'data' / 'outputs' / 'generated'
OUTPUT_GEN_DIR.mkdir(parents=True, exist_ok=True)

def apply_emotion_and_synthesize(text, speaker_wav, emotion_vec, alpha=0.1, out_path=None, language='hi', scale_to_speaker=False):
    if out_path is None:
        out_path = OUTPUT_GEN_DIR / 'test_hindi_emotional.wav'
    out_path = unique_path(Path(out_path))

    # 1. Resolve Model
    model = resolve_xtts_internal_model(XTTS)
    
    # 2. Get Base Latents
    print(f'Extracting base latents from {Path(speaker_wav).name}...')
    latents = model.get_conditioning_latents(str(speaker_wav), load_sr=SR_XTTS)
    gpt_cond_latent = latents[0]
    speaker_embedding = latents[1]
    
    # 3. Prepare Emotion Vector
    ev = np.asarray(emotion_vec).astype(np.float32)
    
    # 4. Modify GPT Latent (High-Dim)
    if ev.size > 2000:
        ev_tensor = torch.tensor(ev).float().reshape(1, 32, 1024).to(gpt_cond_latent.device)
        
        # Diagnostic output
        base_norm = torch.norm(gpt_cond_latent).item()
        delta_norm = torch.norm(ev_tensor).item()
        ratio = delta_norm / (base_norm + 1e-9)
        print(f"[Direct] Base Norm: {base_norm:.2f}, Delta Norm: {delta_norm:.2f}, Ratio: {ratio:.2%}")
        
        # Apply modification
        new_gpt_cond = gpt_cond_latent + alpha * ev_tensor
        print(f" -> Modified GPT latent with alpha={alpha}")
    else:
        print("Warning: Low-dim vector ignored in this fixed version.")
        new_gpt_cond = gpt_cond_latent

    # 5. LANGUAGE SUPPORT FIX
    # Ensure tokenizer has limits for the language to avoid KeyError
    if hasattr(model, 'tokenizer') and hasattr(model.tokenizer, 'char_limits'):
        if language not in model.tokenizer.char_limits:
            print(f"Patching missing char_limit for '{language}'...")
            model.tokenizer.char_limits[language] = 200 # Default safe limit

    # 6. DIRECT INFERENCE
    print("Synthesizing via direct model.inference()...")
    try:
        out = model.inference(
            text=text,
            language=language,
            gpt_cond_latent=new_gpt_cond,
            speaker_embedding=speaker_embedding,
            temperature=0.7,
            length_penalty=1.0,
            repetition_penalty=2.0,
            top_k=50,
            top_p=0.8,
            enable_text_splitting=True
        )
        
        # 7. Save Output
        wav = out['wav']
        sf.write(str(out_path), wav, 24000)
        print(f'‚úì Synthesis complete -> {out_path}')
        return out_path
        
    except Exception as e:
        print(f'Direct synthesis failed: {e}')
        raise RuntimeError(str(e))

print('‚úì apply_emotion_and_synthesize() FIXED v2 (Direct + LangPatch)')

‚úì apply_emotion_and_synthesize() FIXED v2 (Direct + LangPatch)


## Samples Batch Pre-processing

In [19]:
# Preprocessing: Batch clean all emotion and speaker samples
import os
from pathlib import Path

AUDIO_EXTS = ['.wav', '.mp3', '.m4a', '.flac']

def clean_emotion_samples(emotion_dir, sr=SR_XTTS):
    """Clean all audio files in emotion sample folders.
    
    For each sample folder under emotion_dir:
    - Finds raw audio files (neutral and emotion)
    - Creates *_clean.wav versions using preprocess_audio
    - Skips if cleaned version already exists
    
    Returns count of cleaned files.
    """
    emotion_dir = Path(emotion_dir)
    emotion_name = emotion_dir.name
    sample_dirs = sorted([d for d in emotion_dir.iterdir() if d.is_dir()])
    
    if not sample_dirs:
        print(f'No sample folders found in {emotion_dir}')
        return 0
    
    cleaned_count = 0
    print(f'\nüéµ Cleaning emotion samples for "{emotion_name}" ({len(sample_dirs)} samples)')
    print('‚îÄ' * 70)
    
    for sd in sample_dirs:
        # Find raw audio files
        raw_files = sorted([f for f in sd.iterdir() if f.suffix.lower() in AUDIO_EXTS])
        if not raw_files:
            print(f'  {sd.name}: ‚ö†Ô∏è No audio files found')
            continue
        
        # Match neutral and emotion files
        neutral_raw = None
        emotion_raw = None
        
        for f in raw_files:
            f_lower = f.stem.lower()
            if 'neutral' in f_lower:
                neutral_raw = f
            elif emotion_name.lower() in f_lower:
                emotion_raw = f
        
        # Fallback to first two files if no match
        if neutral_raw is None or emotion_raw is None:
            neutral_raw = raw_files[0]
            emotion_raw = raw_files[1] if len(raw_files) > 1 else raw_files[0]
        
        # Clean neutral file
        n_clean = sd / 'neutral_clean.wav'
        if not n_clean.exists():
            try:
                preprocess_audio(neutral_raw, n_clean, sr=sr)
                cleaned_count += 1
                print(f'  {sd.name}: ‚úì neutral_clean.wav')
            except Exception as e:
                print(f'  {sd.name}: ‚úó neutral failed - {str(e)[:40]}')
        else:
            print(f'  {sd.name}: ‚äò neutral_clean.wav (exists)')
        
        # Clean emotion file
        e_clean = sd / f'{emotion_name}_clean.wav'
        if not e_clean.exists():
            try:
                preprocess_audio(emotion_raw, e_clean, sr=sr)
                cleaned_count += 1
                print(f'  {sd.name}: ‚úì {emotion_name}_clean.wav')
            except Exception as e:
                print(f'  {sd.name}: ‚úó {emotion_name} failed - {str(e)[:40]}')
        else:
            print(f'  {sd.name}: ‚äò {emotion_name}_clean.wav (exists)')
    
    print('‚îÄ' * 70)
    print(f'‚úì Emotion cleaning complete: {cleaned_count} new files created\n')
    return cleaned_count


def clean_speaker_samples(sr=SR_XTTS):
    """Clean all speaker audio files in data/speakers.
    
    For each supported audio file:
    - Creates {stem}_clean.wav if it doesn't exist
    - Skips if already cleaned
    
    Returns count of cleaned files.
    """
    sp_dir = PROJECT_ROOT / 'data' / 'speakers'
    sp_dir.mkdir(parents=True, exist_ok=True)
    
    raw_files = sorted([f for f in sp_dir.iterdir() if f.suffix.lower() in AUDIO_EXTS and f.is_file()])
    
    if not raw_files:
        print('No speaker files found in data/speakers')
        return 0
    
    cleaned_count = 0
    print(f'\nüé§ Cleaning speaker samples ({len(raw_files)} files)')
    print('‚îÄ' * 70)
    
    for f in raw_files:
        # Skip if already a clean file
        if f.stem.endswith('_clean'):
            print(f'  {f.name}: ‚äò (already clean)')
            continue
        
        clean_name = f.with_name(f.stem + '_clean.wav')
        if clean_name.exists():
            print(f'  {f.name}: ‚äò {clean_name.name} (exists)')
            continue
        
        try:
            preprocess_audio(f, clean_name, sr=sr)
            cleaned_count += 1
            print(f'  {f.name}: ‚úì {clean_name.name}')
        except Exception as e:
            print(f'  {f.name}: ‚úó failed - {str(e)[:40]}')
    
    print('‚îÄ' * 70)
    print(f'‚úì Speaker cleaning complete: {cleaned_count} new files created\n')
    return cleaned_count


def preprocess_all(emotion_names=None):
    """Batch clean all emotion and speaker samples.
    
    Args:
        emotion_names: list of emotion folder names to clean. If None, clean all.
    
    Returns: dict with cleaning stats.
    """
    emotion_dir = PROJECT_ROOT / 'data' / 'emotion_samples'
    emotion_dir.mkdir(parents=True, exist_ok=True)
    
    if emotion_names is None:
        emotion_names = [d.name for d in emotion_dir.iterdir() if d.is_dir()]
    
    stats = {'emotion': {}, 'speaker': 0}
    
    print('\n' + '=' * 70)
    print('BATCH PREPROCESSING: EMOTION + SPEAKER SAMPLES')
    print('=' * 70)
    
    for emotion in emotion_names:
        ed = emotion_dir / emotion
        if ed.exists():
            count = clean_emotion_samples(ed)
            stats['emotion'][emotion] = count
    
    stats['speaker'] = clean_speaker_samples()
    
    print('=' * 70)
    print('SUMMARY:')
    for emotion, count in stats['emotion'].items():
        print(f'  {emotion}: {count} files cleaned')
    print(f'  speakers: {stats["speaker"]} files cleaned')
    print('=' * 70 + '\n')
    
    return stats


print('Preprocessing functions ready: preprocess_all(emotion_names=[...])')

Preprocessing functions ready: preprocess_all(emotion_names=[...])


In [None]:
# Interactive Preprocessing GUI
import ipywidgets as widgets
from IPython.display import display, clear_output

# Get available emotions
emotion_dir = PROJECT_ROOT / 'data' / 'emotion_samples'
available_emotions = sorted([d.name for d in emotion_dir.iterdir() if d.is_dir()])

# Create checkboxes for emotion selection
emotion_checkboxes = {
    emotion: widgets.Checkbox(value=True, description=emotion, indent=False)
    for emotion in available_emotions
}

# Output area
output = widgets.Output()

def on_clean_emotions(button):
    """Clean selected emotion samples."""
    with output:
        clear_output()
        selected = [e for e, cb in emotion_checkboxes.items() if cb.value]
        if not selected:
            print('‚ö†Ô∏è No emotions selected')
            return
        preprocess_all(emotion_names=selected)

def on_clean_speakers(button):
    """Clean speaker samples."""
    with output:
        clear_output()
        clean_speaker_samples()

def on_clean_all(button):
    """Clean all emotion and speaker samples."""
    with output:
        clear_output()
        preprocess_all()

# Create buttons
btn_emotions = widgets.Button(description='üéµ Clean Selected Emotions', button_style='info', tooltip='Clean checked emotion samples')
btn_speakers = widgets.Button(description='üé§ Clean Speaker Samples', button_style='warning', tooltip='Clean speaker audio files')
btn_all = widgets.Button(description='üîÑ Clean All Samples', button_style='danger', tooltip='Clean all emotions and speakers')

# Attach callbacks
btn_emotions.on_click(on_clean_emotions)
btn_speakers.on_click(on_clean_speakers)
btn_all.on_click(on_clean_all)

# Layout
emotion_box = widgets.VBox(
    [widgets.HTML('<b>Select Emotions to Clean:</b>')] + 
    [emotion_checkboxes[e] for e in available_emotions],
    layout=widgets.Layout(border='1px solid #ccc', padding='10px', margin='10px 0')
)

button_box = widgets.HBox([btn_emotions, btn_speakers, btn_all], layout=widgets.Layout(margin='10px 0'))

panel = widgets.VBox([emotion_box, button_box, output])
display(panel)

print('‚úì Preprocessing GUI ready. Use buttons above to clean samples.')

VBox(children=(VBox(children=(HTML(value='<b>Select Emotions to Clean:</b>'), Checkbox(value=True, description‚Ä¶

‚úì Preprocessing GUI ready. Use buttons above to clean samples.


In [21]:
# Sample Stats: View cleaned files and their properties
from pathlib import Path

def show_sample_stats():
    """Display stats about cleaned samples and speakers."""
    emotion_dir = PROJECT_ROOT / 'data' / 'emotion_samples'
    speaker_dir = PROJECT_ROOT / 'data' / 'speakers'
    
    print('\n' + '=' * 70)
    print('üìä CLEANED SAMPLES INVENTORY')
    print('=' * 70)
    
    # Check emotions
    total_emotion_samples = 0
    total_emotion_files = 0
    for emotion_folder in sorted(emotion_dir.iterdir()):
        if not emotion_folder.is_dir():
            continue
        emotion_name = emotion_folder.name
        samples_with_both = 0
        samples_with_partial = 0
        
        for sample_folder in sorted(emotion_folder.iterdir()):
            if not sample_folder.is_dir():
                continue
            n_clean = sample_folder / 'neutral_clean.wav'
            e_clean = sample_folder / f'{emotion_name}_clean.wav'
            
            if n_clean.exists() and e_clean.exists():
                samples_with_both += 1
                total_emotion_files += 2
            elif n_clean.exists() or e_clean.exists():
                samples_with_partial += 1
        
        total_emotion_samples += samples_with_both
        
        status = '‚úì' if samples_with_both > 0 else '‚úó'
        print(f'\n{status} {emotion_name.upper()}:')
        print(f'   Ready for visualization: {samples_with_both} samples (with both neutral + emotion clean files)')
        if samples_with_partial > 0:
            print(f'   Partial: {samples_with_partial} samples (only one file cleaned)')
    
    # Check speakers
    print(f'\n{"‚îÄ" * 70}')
    print('\nüé§ SPEAKER SAMPLES:')
    if speaker_dir.exists():
        all_files = list(speaker_dir.glob('*.wav')) + list(speaker_dir.glob('*.mp3')) + list(speaker_dir.glob('*.m4a')) + list(speaker_dir.glob('*.flac'))
        clean_files = list(speaker_dir.glob('*_clean.wav'))
        if all_files:
            print(f'   Total files: {len(all_files)}')
            print(f'   Cleaned files: {len(clean_files)}')
            for cf in sorted(clean_files):
                size_kb = cf.stat().st_size / 1024
                print(f'      ‚úì {cf.name} ({size_kb:.1f} KB)')
        else:
            print('   No speaker files found')
    else:
        print('   Directory not found')
    
    print('\n' + '=' * 70)
    print(f'Summary: {total_emotion_samples} emotion samples ready')
    print('=' * 70 + '\n')


# Display stats
show_sample_stats()
print('Tip: Run "Clean All Samples" or "Clean Selected Emotions" above to prepare files.')


üìä CLEANED SAMPLES INVENTORY

‚úì ANGRY:
   Ready for visualization: 4 samples (with both neutral + emotion clean files)

‚úì DEMO:
   Ready for visualization: 2 samples (with both neutral + emotion clean files)

‚úì HAPPY:
   Ready for visualization: 4 samples (with both neutral + emotion clean files)

‚úì SAD:
   Ready for visualization: 4 samples (with both neutral + emotion clean files)

‚úì SINGING:
   Ready for visualization: 1 samples (with both neutral + emotion clean files)

‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ

üé§ SPEAKER SAMPLES:
   Total files: 3
   Cleaned files: 3
      ‚úì amir_khan_clean.wav (687.4 KB)
      ‚úì character_1_clean.wav (281.4 KB)
      ‚úì lata_mangeskar_clean.wav (626.6 KB)

Summary: 15 emotion samples ready

Tip: Run "Clean All Samples" or "Clean Selected Emotions" above to pre

In [22]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from pathlib import Path

def plot_emotion_samples_pca(emotion_name):
    """Plot emotion samples in 2D PCA space showing neutral vs emotion shifts.
    
    Args:
        emotion_name: name of emotion folder (e.g., 'happy', 'sad')
    """
    emotion_dir = PROJECT_ROOT / 'data' / 'emotion_samples' / emotion_name
    
    if not emotion_dir.exists():
        print(f'Emotion folder not found: {emotion_dir}')
        return
    
    neutral_vecs = []
    emotion_vecs = []
    sample_names = []
    
    sample_dirs = sorted([d for d in emotion_dir.iterdir() if d.is_dir()])
    
    print(f'üéµ Loading {emotion_name} samples...')
    
    for sd in sample_dirs:
        n_clean = sd / 'neutral_clean.wav'
        e_clean = sd / f'{emotion_name}_clean.wav'
        
        # Skip if clean files don't exist
        if not (n_clean.exists() and e_clean.exists()):
            print(f'  ‚ö†Ô∏è {sd.name}: missing clean files, skipping')
            continue
        
        try:
            # Extract speaker latents
            n_emb = get_xtts_speaker_latent(XTTS, n_clean, load_sr=SR_XTTS)
            e_emb = get_xtts_speaker_latent(XTTS, e_clean, load_sr=SR_XTTS)
            
            neutral_vecs.append(n_emb)
            emotion_vecs.append(e_emb)
            sample_names.append(sd.name)
            print(f'  ‚úì {sd.name}')
        except Exception as e:
            print(f'  ‚úó {sd.name}: {str(e)[:40]}')
    
    if len(neutral_vecs) == 0:
        print(f'No valid samples found in {emotion_dir}')
        return
    
    neutral_vecs = np.array(neutral_vecs)  # shape: (n_samples, 512)
    emotion_vecs = np.array(emotion_vecs)  # shape: (n_samples, 512)
    
    # Compute averages
    neutral_avg = neutral_vecs.mean(axis=0)  # (512,)
    emotion_avg = emotion_vecs.mean(axis=0)  # (512,)
    
    # Stack all vectors for PCA
    all_vecs = np.vstack([neutral_vecs, emotion_vecs, neutral_avg.reshape(1, -1), emotion_avg.reshape(1, -1)])
    
    # Apply PCA to 2D
    pca = PCA(n_components=2)
    vecs_2d = pca.fit_transform(all_vecs)
    
    # Split back
    n_samples = len(neutral_vecs)
    neutral_2d = vecs_2d[:n_samples]
    emotion_2d = vecs_2d[n_samples:2*n_samples]
    neutral_avg_2d = vecs_2d[2*n_samples]
    emotion_avg_2d = vecs_2d[2*n_samples + 1]
    
    # Plot
    plt.figure(figsize=(12, 8))
    
    # Plot individual samples
    plt.scatter(neutral_2d[:, 0], neutral_2d[:, 1], c='blue', s=100, alpha=0.6, 
               label='Neutral samples', edgecolors='darkblue', linewidth=1.5)
    plt.scatter(emotion_2d[:, 0], emotion_2d[:, 1], c='red', s=100, alpha=0.6, 
               label=f'{emotion_name.capitalize()} samples', edgecolors='darkred', linewidth=1.5)
    
    # Annotate sample names
    for i, name in enumerate(sample_names):
        plt.annotate(name, (neutral_2d[i, 0], neutral_2d[i, 1]), 
                    fontsize=8, alpha=0.7, xytext=(5, 5), textcoords='offset points')
        plt.annotate(name, (emotion_2d[i, 0], emotion_2d[i, 1]), 
                    fontsize=8, alpha=0.7, xytext=(5, 5), textcoords='offset points')
    
    # Plot averages (larger markers)
    plt.scatter(neutral_avg_2d[0], neutral_avg_2d[1], c='blue', s=400, marker='X', 
               edgecolors='darkblue', linewidth=2, label='Neutral avg', zorder=10)
    plt.scatter(emotion_avg_2d[0], emotion_avg_2d[1], c='red', s=400, marker='X', 
               edgecolors='darkred', linewidth=2, label=f'{emotion_name.capitalize()} avg', zorder=10)
    
    # Draw arrow from neutral to emotion average (emotion shift)
    plt.arrow(neutral_avg_2d[0], neutral_avg_2d[1], 
             emotion_avg_2d[0] - neutral_avg_2d[0], 
             emotion_avg_2d[1] - neutral_avg_2d[1],
             head_width=0.2, head_length=0.15, fc='green', ec='green', alpha=0.7, linewidth=2.5, zorder=5)
    
    # Labels and formatting
    explained_var = pca.explained_variance_ratio_
    cumsum_var = np.cumsum(explained_var)
    
    plt.xlabel(f'PC1 ({explained_var[0]:.1%})', fontsize=12)
    plt.ylabel(f'PC2 ({explained_var[1]:.1%})', fontsize=12)
    plt.title(f'Emotion Vectors: {emotion_name.capitalize()}\nPCA 2D Projection (Cumulative: {cumsum_var[1]:.1%})',
             fontsize=14, fontweight='bold')
    plt.legend(fontsize=11, loc='best')
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()
    
    # Print statistics
    print(f'\nüìä PCA Statistics:')
    print(f'   PC1 variance: {explained_var[0]:.2%}')
    print(f'   PC2 variance: {explained_var[1]:.2%}')
    print(f'   Cumulative: {cumsum_var[1]:.2%}')
    
    print(f'\nüìà Vector Statistics ({n_samples} samples):')
    print(f'   Neutral avg norm: {np.linalg.norm(neutral_avg):.3f}')
    print(f'   {emotion_name.capitalize()} avg norm: {np.linalg.norm(emotion_avg):.3f}')
    emotion_diff = emotion_avg - neutral_avg
    print(f'   Difference norm: {np.linalg.norm(emotion_diff):.3f}')
    cosine_sim = np.dot(neutral_avg, emotion_avg) / (np.linalg.norm(neutral_avg) * np.linalg.norm(emotion_avg) + 1e-12)
    print(f'   Cosine similarity: {cosine_sim:.3f}')


# Interactive dropdown to select emotion and plot
emotion_plot_dropdown = widgets.Dropdown(
    options=list_emotions(),
    description='Emotion:',
    style={'description_width': '100px'}
)

plot_button = widgets.Button(description='Plot Samples', button_style='info')
plot_output = widgets.Output()

def on_plot_clicked(b):
    with plot_output:
        clear_output()
        emotion = emotion_plot_dropdown.value
        if emotion == '(no emotions found)':
            print('No emotions available. Run preprocessing first.')
            return
        plot_emotion_samples_pca(emotion)

plot_button.on_click(on_plot_clicked)

plot_panel = widgets.VBox([
    widgets.HTML('<b>Select emotion and click Plot:</b>'),
    widgets.HBox([emotion_plot_dropdown, plot_button]),
    plot_output
])

display(plot_panel)
print('‚úì Interactive emotion sample plotter ready')

VBox(children=(HTML(value='<b>Select emotion and click Plot:</b>'), HBox(children=(Dropdown(description='Emoti‚Ä¶

‚úì Interactive emotion sample plotter ready


## Caching Latents to speed up repeated runs

In [23]:
# [OPTIMIZED] Caching Latents to speed up inference
import torch
import numpy as np
from pathlib import Path
import soundfile as sf

# Global Cache: Maps speaker_wav_path (str) -> (gpt_cond_latent, speaker_embedding)
SPEAKER_CACHE = {}

def get_cached_latents(model, speaker_wav):
    """Retrieve latents from cache or compute them once."""
    speaker_wav = str(speaker_wav)
    
    if speaker_wav in SPEAKER_CACHE:
        # print(f"DEBUG: Using cached latents for {Path(speaker_wav).name}")
        return SPEAKER_CACHE[speaker_wav]
    
    # Compute if not in cache
    print(f"Computing new latents for {Path(speaker_wav).name} ...")
    latents = model.get_conditioning_latents(speaker_wav, load_sr=SR_XTTS)
    
    # Store in cache
    SPEAKER_CACHE[speaker_wav] = latents
    return latents

def apply_emotion_and_synthesize(text, speaker_wav, emotion_vec, alpha=0.1, out_path=None, language='hi'):
    if out_path is None:
        out_path = OUTPUT_GEN_DIR / 'test_hindi_emotional.wav'
    out_path = unique_path(Path(out_path))

    # 1. Resolve Model
    model = resolve_xtts_internal_model(XTTS)
    
    # 2. Get Base Latents (FROM CACHE)
    # This is the optimization!
    latents = get_cached_latents(model, speaker_wav)
    
    # Clone to avoid modifying the cached version in place (very important!)
    gpt_cond_latent = latents[0].clone().detach() 
    speaker_embedding = latents[1].clone().detach()
    
    # 3. Prepare Emotion Vector
    ev = np.asarray(emotion_vec).astype(np.float32)
    
    # 4. Modify GPT Latent (High-Dim)
    if ev.size > 2000:
        ev_tensor = torch.tensor(ev).float().reshape(1, 32, 1024).to(gpt_cond_latent.device)
        
        # Apply modification
        # We modify the clone, so the cache remains pure "Neutral/Original"
        new_gpt_cond = gpt_cond_latent + alpha * ev_tensor
    else:
        # Fallback (shouldn't happen with xtts_native)
        new_gpt_cond = gpt_cond_latent

    # 5. LANGUAGE SUPPORT FIX
    if hasattr(model, 'tokenizer') and hasattr(model.tokenizer, 'char_limits'):
        if language not in model.tokenizer.char_limits:
            model.tokenizer.char_limits[language] = 200 

    # 6. DIRECT INFERENCE
    # print("Synthesizing...")
    try:
        out = model.inference(
            text=text,
            language=language,
            gpt_cond_latent=new_gpt_cond,
            speaker_embedding=speaker_embedding,
            temperature=0.7,
            length_penalty=1.0,
            repetition_penalty=2.0,
            top_k=50,
            top_p=0.8,
            enable_text_splitting=True
        )
        
        # 7. Save Output
        wav = out['wav']
        sf.write(str(out_path), wav, 24000)
        return out_path
        
    except Exception as e:
        print(f'Direct synthesis failed: {e}')
        raise RuntimeError(str(e))

print("‚úì apply_emotion_and_synthesize() updated with CACHING.")

‚úì apply_emotion_and_synthesize() updated with CACHING.


## TTS GUI

---
---

## 1. Shared State (Run Once)

In [24]:
# [1] SHARED STATE INITIALIZATION
LAST_GEN_STATE = {
    "audio_path": None,
    "ref_speaker": None,
    "text": "",
    "emotion": "",
    "mode": "",
    "alpha": 0.0,
    "speaker_name": "",
    "timestamp": ""
}
print("üîó Shared state initialized.")

üîó Shared state initialized.


## 2. Helper Functions (Metric Logics)

In [25]:
# [2] METRIC HELPERS
import torch
import numpy as np
import whisper
from torch.nn.functional import cosine_similarity
import time

# Attempt to load Whisper for WER (ignore if on CPU and it's too slow)
try:
    if 'whisper_model' not in globals():
        print("‚è≥ Loading Whisper (tiny) for WER...")
        whisper_model = whisper.load_model("tiny")
        print("‚úì Whisper loaded.")
except:
    print("‚ö†Ô∏è Whisper load failed/skipped. WER will be disabled.")

def get_cosine_sim(path1, path2):
    """Calculate Speaker Similarity"""
    try:
        model = resolve_xtts_internal_model(XTTS)
        lat1 = model.get_conditioning_latents(str(path1), load_sr=SR_XTTS)[1]
        lat2 = model.get_conditioning_latents(str(path2), load_sr=SR_XTTS)[1]
        emb1 = torch.tensor(np.array(lat1)).flatten().float().unsqueeze(0)
        emb2 = torch.tensor(np.array(lat2)).flatten().float().unsqueeze(0)
        return cosine_similarity(emb1, emb2).item()
    except Exception as e:
        return 0.0

def get_wer(ref_text, audio_path):
    """Calculate Word Error Rate (Approx)"""
    if 'whisper_model' not in globals(): return -1.0
    try:
        res = whisper_model.transcribe(str(audio_path), language='hi')
        hyp = res['text']
        # Simple ratio metric (Length Diff) for speed
        # If you have 'jiwer' installed, use jiwer.wer(ref, hyp)
        return abs(len(hyp) - len(ref_text)) / len(ref_text)
    except:
        return -1.0
print("‚úì Metric helpers ready.")

‚è≥ Loading Whisper (tiny) for WER...
‚úì Whisper loaded.
‚úì Metric helpers ready.


## 3. Synthesis GUI

In [26]:
# [3] SYNTHESIS GUI
import ipywidgets as widgets
from IPython.display import display, Audio
import traceback

# --- Widgets ---
emotion_dropdown = widgets.Dropdown(description='Emotion:')
speaker_dropdown = widgets.Dropdown(description='Speaker:')
mode_dropdown = widgets.Dropdown(options=['average','single'], value='average', description='Mode:')
sample_dropdown = widgets.Dropdown(description='Sample:')
alpha_input = widgets.FloatText(value=1.5, step=0.1, description='Alpha:')
text_in = widgets.Text(value='‡§Ü‡§ú ‡§ï‡§æ ‡§¶‡§ø‡§® ‡§®‡§à ‡§∏‡§Ç‡§≠‡§æ‡§µ‡§®‡§æ‡§ì‡§Ç ‡§∏‡•á ‡§≠‡§∞‡§æ ‡§π‡•Å‡§Ü ‡§π‡•à, ‡§ú‡•à‡§∏‡•á ‡§ï‡•ã‡§à ‡§∂‡§æ‡§®‡§¶‡§æ‡§∞ ‡§∂‡•Å‡§∞‡•Å‡§Ü‡§§ ‡§π‡•ã ‡§∞‡§π‡•Ä ‡§π‡•ã! ‡§π‡§∞ ‡§™‡§≤ ‡§ú‡•Ä‡§µ‡§® ‡§ï‡•á ‡§∞‡§Ç‡§ó‡•ã‡§Ç ‡§î‡§∞ ‡§Ü‡§∂‡•ç‡§ö‡§∞‡•ç‡§Ø‡•ã‡§Ç ‡§ï‡§æ ‡§â‡§§‡•ç‡§∏‡§µ ‡§π‡•à‡•§', description='Text:') 
run_button = widgets.Button(description='Synthesize', button_style='success')
run_output = widgets.Output()

# --- Helpers ---
def safe_get_samples(emotion_name):
    try:
        if not emotion_name: return []
        emotion_dir = PROJECT_ROOT / 'data' / 'emotion_samples' / emotion_name
        if not emotion_dir.exists(): return []
        return sorted([d.name for d in emotion_dir.iterdir() if d.is_dir()])
    except: return []

# --- Layout ---
ui_syn = widgets.VBox([
    widgets.HBox([emotion_dropdown, mode_dropdown]),
    widgets.HBox([speaker_dropdown, sample_dropdown]),
    widgets.HBox([alpha_input, text_in]),
    run_button,
    run_output
])
display(ui_syn)

# --- Init Data ---
try:
    eml = list_emotions()
    emotion_dropdown.options = eml
    if eml: emotion_dropdown.value = eml[0]
    spl = list_speakers()
    speaker_dropdown.options = spl
    if spl: speaker_dropdown.value = spl[0]
except: pass

def on_emotion_change(change):
    if change['type'] == 'change' and change['name'] == 'value':
        new_samples = safe_get_samples(change['new'])
        sample_dropdown.options = new_samples
        if new_samples: sample_dropdown.value = new_samples[0]
emotion_dropdown.observe(on_emotion_change)

def on_run_clicked(b):
    with run_output:
        run_output.clear_output()
        try:
            emotion = emotion_dropdown.value
            sample_name = sample_dropdown.value
            speaker_name = speaker_dropdown.value
            mode = mode_dropdown.value
            alpha = float(alpha_input.value)
            txt = text_in.value
            
            print(f"Synthesizing... (Alpha={alpha}, Mode={mode})")
            
            # 1. Compute Vector
            sid = 1
            all_samples = safe_get_samples(emotion)
            if sample_name in all_samples: sid = all_samples.index(sample_name) + 1
            
            ed = compute_emotion_vector_xtts_multi(
                PROJECT_ROOT/'data'/'emotion_samples'/emotion, 
                method='xtts_native', mode=mode, sample_id=sid,
                save_avg_dir=PROJECT_ROOT/'data'/'outputs'/'emotion_vectors'/'average'
            )
            
            # 2. Get Speaker
            speaker_path = PROJECT_ROOT/'data'/'speakers'/speaker_name
            speaker_clean = ensure_speaker_clean(speaker_path, sr=SR_XTTS)
            
            # 3. Synthesize
            out_name = f'gen_{emotion}_{mode}_{sample_name}_a{alpha:.1f}.wav'
            out_file = PROJECT_ROOT/'data'/'outputs'/'generated'/out_name
            
            out = apply_emotion_and_synthesize(
                txt, speaker_clean, ed, alpha=alpha, out_path=out_file, language='hi'
            )
            display(Audio(str(out), rate=24000))
            
            # UPDATE STATE
            LAST_GEN_STATE.update({
                "audio_path": str(out),
                "ref_speaker": str(speaker_clean),
                "text": txt,
                "emotion": emotion,
                "mode": mode,
                "alpha": alpha,
                "speaker_name": speaker_name,
                "sample": sample_name,
                "timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
            })
            print("‚úÖ Ready for Evaluation below.")
            
        except Exception:
            traceback.print_exc()

run_button.on_click(on_run_clicked)

VBox(children=(HBox(children=(Dropdown(description='Emotion:', options=(), value=None), Dropdown(description='‚Ä¶

## 4. Metric GUI 

In [None]:
# [4] METRICS & LOGGING GUI
import ipywidgets as widgets
from IPython.display import display
import pandas as pd
from pathlib import Path

# --- Log File ---
LOG_FILE = PROJECT_ROOT / "data" / "outputs" / "comprehensive_metrics_log.csv"

# --- Widgets ---
lbl_info = widgets.HTML("<h3>Evaluation Panel</h3>Waiting for Synthesis...")
btn_load = widgets.Button(description="1. Load Audio", button_style='warning')
btn_calc = widgets.Button(description="2. Calc Objective", button_style='info', disabled=True)

# Subjective Sliders
slider_intensity = widgets.IntSlider(value=3, min=1, max=5, description='Emotion Strength:', style={'description_width': 'initial'})
slider_natural = widgets.IntSlider(value=3, min=1, max=5, description='Naturalness:', style={'description_width': 'initial'})

btn_save = widgets.Button(description="3. Save to Log", button_style='success', disabled=True)
out_met = widgets.Output()

# Holding logic
current_obj_metrics = {}

def on_load_click(b):
    with out_met:
        out_met.clear_output()
        if not LAST_GEN_STATE["audio_path"]:
            print("‚ö†Ô∏è Synthesis not run yet.")
            return
        
        fname = Path(LAST_GEN_STATE["audio_path"]).name
        lbl_info.value = f"<h3>Evaluating: {fname}</h3>(Alpha: {LAST_GEN_STATE['alpha']} | Emotion: {LAST_GEN_STATE['emotion']})"
        btn_calc.disabled = False
        print(f"Loaded: {fname}")

def on_calc_click(b):
    with out_met:
        print("‚è≥ Calculating... (Sim + WER)")
        sim = get_cosine_sim(LAST_GEN_STATE["ref_speaker"], LAST_GEN_STATE["audio_path"])
        wer = get_wer(LAST_GEN_STATE["text"], LAST_GEN_STATE["audio_path"])
        
        global current_obj_metrics
        current_obj_metrics = {"sim": sim, "wer": wer}
        
        print(f"Speaker Sim: {sim:.3f}")
        print(f"WER Score:   {wer:.3f}")
        
        # Enable save now that we have data
        btn_save.disabled = False

def on_save_click(b):
    with out_met:
        # Combine everything
        entry = {
            "Timestamp": LAST_GEN_STATE["timestamp"],
            "Speaker": LAST_GEN_STATE["speaker_name"],
            "Emotion": LAST_GEN_STATE["emotion"],
            "Alpha": LAST_GEN_STATE["alpha"],
            "Mode": LAST_GEN_STATE["mode"],
            "Audio_File": Path(LAST_GEN_STATE["audio_path"]).name,
            # Objective
            "SIM_Score": round(current_obj_metrics.get("sim", 0), 4),
            "WER_Score": round(current_obj_metrics.get("wer", 0), 4),
            # Subjective
            "Subj_Intensity": slider_intensity.value,
            "Subj_Naturalness": slider_natural.value
        }
        
        df = pd.DataFrame([entry])
        header = not LOG_FILE.exists()
        df.to_csv(LOG_FILE, mode='a', header=header, index=False)
        print(f"‚úÖ Data Saved to {LOG_FILE.name}")
        btn_save.disabled = True

btn_load.on_click(on_load_click)
btn_calc.on_click(on_calc_click)
btn_save.on_click(on_save_click)

ui_met = widgets.VBox([
    lbl_info,
    btn_load,
    btn_calc,
    widgets.HTML("<b>Subjective Ratings (1-5):</b>"),
    slider_intensity,
    slider_natural,
    btn_save,
    out_met
])
display(ui_met)

