# üß† L-MAC Evaluation (SpeechOcean762)

Questo notebook calcola **AI/AD** e genera esempi ascoltabili per L-MAC.

**Supporta ambienti:**
- üñ•Ô∏è Local
- ‚òÅÔ∏è Google Colab  
- üìä Kaggle (Dataset + Modelli da input)

**Dataset:** SpeechOcean762 (full)  
**Backbone:** HuBERT Large o Early Fusion

In [None]:
import sys, subprocess
from pathlib import Path
import os

# Fix audio decoding: monkey-patch prima di qualsiasi uso di datasets
import soundfile as sf
import io
import datasets
import datasets.features.audio as audio_module

def decode_audio_with_soundfile(self, value, token_per_repo_id=None):
    """Fallback audio decoder usando soundfile."""
    if isinstance(value, dict):
        if "bytes" in value:
            audio_bytes = value["bytes"]
            audio, sr = sf.read(io.BytesIO(audio_bytes))
            return {"array": audio, "sampling_rate": sr, "path": value.get("path", "")}
        elif "path" in value:
            audio, sr = sf.read(value["path"])
            return {"array": audio, "sampling_rate": sr, "path": value["path"]}
    return value

audio_module.Audio.decode_example = decode_audio_with_soundfile
print("‚úì Audio decoder patched to use soundfile")

def detect_environment():
    if 'COLAB_GPU' in os.environ or 'google.colab' in sys.modules:
        return 'colab'
    elif '/kaggle' in os.getcwd() or 'KAGGLE_KERNEL_RUN_TYPE' in os.environ:
        return 'kaggle'
    return 'local'

ENV = detect_environment()
print(f'üñ•Ô∏è Ambiente: {ENV.upper()}')

In [None]:
# Install dependencies + clone repo
pkgs = [
    'transformers>=4.38',
    'datasets>=2.18',
    'evaluate',
    'jiwer',
    'soundfile',
    'librosa',
    'safetensors',
    'accelerate',
    'tqdm',
    'pyyaml',
    'pandas',
]

subprocess.run([sys.executable, '-m', 'pip', 'install', '-q', *pkgs], check=False)

import torch
print(f'üî• PyTorch {torch.__version__}, CUDA: {torch.cuda.is_available()}')
if torch.cuda.is_available():
    print(f'üìä GPU: {torch.cuda.get_device_name(0)}')
    print(f'üíæ VRAM: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB')

# Clone repo
IS_KAGGLE = Path('/kaggle').exists()
SKIP_CLONE = str(os.environ.get('DL_PHONEME_SKIP_CLONE', '')).strip().lower() in ('1', 'true', 'yes')
REPO_URL = 'https://github.com/maurocarlu/pronuncIAtion.git'
PROJECT_DIR = Path('/kaggle/working/pronuncIAtion') if IS_KAGGLE else Path.cwd().parent.parent

if IS_KAGGLE and (not SKIP_CLONE) and REPO_URL:
    if not PROJECT_DIR.exists():
        print('Cloning repo:', REPO_URL)
        subprocess.run(['git', 'clone', REPO_URL, str(PROJECT_DIR)], check=False)
    else:
        print('Repo gi√† presente:', PROJECT_DIR)

if PROJECT_DIR.exists():
    os.chdir(PROJECT_DIR)
    sys.path.insert(0, str(PROJECT_DIR))
print('CWD:', os.getcwd())
print('PROJECT_DIR:', PROJECT_DIR)

In [None]:
# ====== Kaggle: Symlink dati e path modelli ======
DATA_INPUT = Path('/kaggle/input/pronunciation-data/data')
DATA_TARGET = Path(PROJECT_DIR) / 'data'

# Symlink data
if Path('/kaggle').exists() and DATA_INPUT.exists():
    try:
        if not DATA_TARGET.exists():
            os.symlink(str(DATA_INPUT), str(DATA_TARGET))
            print('‚úì data symlink creato')
    except Exception as e:
        print('‚ö†Ô∏è Symlink fallito:', e)

# ====== Model paths ======
# Kaggle: modelli da input dataset
KAGGLE_MODELS_PATH = Path('/kaggle/input/late-fusion/LateFusion')
LOCAL_MODELS_PATH = PROJECT_DIR / 'outputs' / 'backup'

if KAGGLE_MODELS_PATH.exists():
    MODELS_ROOT = KAGGLE_MODELS_PATH
    print(f'‚úì Using Kaggle models: {MODELS_ROOT}')
else:
    MODELS_ROOT = LOCAL_MODELS_PATH
    print(f'‚úì Using local models: {MODELS_ROOT}')

# Find available models
print('\nModelli disponibili:')
for p in sorted(MODELS_ROOT.glob('**/config.json'))[:10]:
    print(f'  ‚úì {p.parent.name}')

## ‚öôÔ∏è Configuration

Configura:
- `BACKBONE`: tipo di backbone (`hubert` o `early_fusion`)
- `TARGET_PHONEME`: fonema IPA target per L-MAC
- `MODEL_PATH`: path al modello fine-tuned

In [None]:
# === CONFIG ===
BACKBONE = "hubert"  # oppure "early_fusion"
TARGET_PHONEME = None  # None = random phoneme sampling (multi-fonema)

# Auto-detect model path based on environment
if (MODELS_ROOT / 'final_model_hubert').exists():
    MODEL_PATH = str(MODELS_ROOT / 'final_model_hubert')
elif (MODELS_ROOT / 'hubert_large' / 'final_model_hubert').exists():
    MODEL_PATH = str(MODELS_ROOT / 'hubert_large' / 'final_model_hubert')
else:
    # Fallback: cerca il primo modello con config.json
    candidates = list(MODELS_ROOT.glob('**/config.json'))
    MODEL_PATH = str(candidates[0].parent) if candidates else ""

print(f'BACKBONE: {BACKBONE}')
print(f'MODEL_PATH: {MODEL_PATH}')
print(f'TARGET_PHONEME: {TARGET_PHONEME or "None (multi-phoneme)"}')

# Auto-find decoder checkpoint (if already trained)
# Per multi-fonema usa cartella "multi" invece del nome fonema
phoneme_folder = TARGET_PHONEME if TARGET_PHONEME else "multi"
decoder_root = PROJECT_DIR / "outputs" / "lmac" / BACKBONE / phoneme_folder
candidates = sorted(decoder_root.glob("decoder_*.pt"))
DECODER_CKPT = str(candidates[-1]) if candidates else ""
print(f'DECODER_CKPT: {DECODER_CKPT or "Not found (will train)"}')

In [None]:
# === IMPORTS ===
from torch.utils.data import DataLoader

# Import dal progetto (gi√† in sys.path)
from scripts.analysis.lmac_core import (
    LMACBackboneConfig,
    LMACSpeechOceanDataset,
    LMACWrapper,
    collate_audio_batch,
    compute_ai_ad,
    generate_listenable_map,
)

print('‚úì L-MAC imports loaded')

## üéØ Train Decoder (se non presente)

Se il decoder L-MAC non √® gi√† stato trainato per il fonema target, lo alleniamo qui.

In [None]:
# Fix audio decoding: usa soundfile invece di torchcodec
import datasets
datasets.config.TORCHCODEC_AVAILABLE = False

# Se ancora non funziona, forza soundfile:
import os
os.environ["HF_DATASETS_AUDIO_DECODER"] = "soundfile"

In [None]:
# === TRAIN (se decoder non presente) ===
from types import SimpleNamespace
from importlib import reload
from scripts.analysis import train_lmac_decoder
reload(train_lmac_decoder)  # Ricarica per avere le ultime modifiche

if not DECODER_CKPT:
    print('üèãÔ∏è Training L-MAC decoder...')
    
    # Se TARGET_PHONEME √® None, siamo in modalit√† multi-fonema 
    # e attiviamo il conditioning per permettere al modello di imparare
    use_cond = (TARGET_PHONEME is None)
    
    args = SimpleNamespace(
        model_path=MODEL_PATH,
        backbone=BACKBONE,
        target_phoneme=TARGET_PHONEME,
        layer_ids="6,12,18,24",
        epochs=10,
        batch_size=2,
        lr=2e-4,
        lambda_out=1.0,
        lambda_reg=1e-4,
        max_samples=None,
        log_interval=50,
        output_dir=str(PROJECT_DIR / "outputs" / "lmac"),
        use_conditioning=use_cond, # <--- NUOVO PARAMETRO
    )
    train_lmac_decoder.train_lmac(args)
    candidates = sorted(decoder_root.glob("decoder_*.pt"))
    DECODER_CKPT = str(candidates[-1]) if candidates else ""
    print(f'‚úì Decoder trained: {DECODER_CKPT}')
else:
    print(f'‚úì Using existing decoder: {DECODER_CKPT}')

In [None]:
# === Load decoder + backbone ===
if not DECODER_CKPT:
    raise FileNotFoundError(f"Decoder checkpoint non trovato in {decoder_root}")

print(f"Loading decoder from {DECODER_CKPT}...")
ckpt = torch.load(DECODER_CKPT, map_location="cpu")

# Extract config from checkpoint if available (backward compatibility)
use_conditioning = ckpt.get("use_conditioning", False)
vocab_size = ckpt.get("vocab_size", 0)

print(f"Configuration: Conditioning={use_conditioning}, VocabSize={vocab_size}")

config = LMACBackboneConfig(
    backbone_type=BACKBONE,
    model_path=MODEL_PATH,
    layer_ids=(6, 12, 18, 24),
    use_conditioning=use_conditioning,
    vocab_size=vocab_size,
)
wrapper = LMACWrapper(config=config, target_phoneme=TARGET_PHONEME)
wrapper.decoder.load_state_dict(ckpt["decoder_state"])
wrapper.eval()
print('‚úì L-MAC Wrapper loaded')

## üìä Evaluation: AI / AD Metrics

Calcola le metriche **Attribution Intersection (AI)** e **Attribution Deletion (AD)** su SpeechOcean762.

In [None]:
# === AI / AD on SpeechOcean762 (test) ===
print('üìä Computing AI/AD metrics on SpeechOcean762...')
test_ds = LMACSpeechOceanDataset(split="test", target_phoneme=TARGET_PHONEME, full=True)
test_loader = DataLoader(test_ds, batch_size=2, shuffle=False, collate_fn=collate_audio_batch)
metrics = compute_ai_ad(wrapper, test_loader, max_batches=None)

print('\nüìà Global Results:')
print(f'  AI: {metrics["AI"]:.4f}%')
print(f'  AD: {metrics["AD"]:.4f}')

if "per_phoneme" in metrics and metrics["per_phoneme"]:
    print('\nüî¨ Per-Phoneme Breakdown (Top 10 by count):')
    
    # Sort by count
    sorted_ph = sorted(
        metrics["per_phoneme"].items(), 
        key=lambda x: x[1]['count'], 
        reverse=True
    )
    
    print(f"{'Phoneme':<10} {'AI (%)':<10} {'AD':<10} {'Count':<10}")
    print("-" * 40)
    for ph, stats in sorted_ph[:10]:
        print(f"{ph:<10} {stats['AI']:<10.2f} {stats['AD']:<10.4f} {stats['count']:<10}")

## üîä Listenable Maps

Genera audio modificati per visualizzare/ascoltare le aree attribuite al fonema target.

In [None]:
# === Esempio ascoltabile ===
import random

# Seleziona un audio dal dataset
sample = test_ds[0]
audio_path = None
if isinstance(sample.get("audio"), dict) and sample["audio"].get("path"):
    audio_path = sample["audio"]["path"]

# Fallback: salva temporaneamente l'audio se non esiste path
if audio_path is None:
    import soundfile as sf
    tmp_path = Path(PROJECT_DIR) / "outputs" / "lmac" / "tmp_audio.wav"
    tmp_path.parent.mkdir(parents=True, exist_ok=True)
    sf.write(tmp_path, sample["audio"], 16000)
    audio_path = str(tmp_path)

# Logica Target:
# 1. Se TARGET_PHONEME √® fissato (single mode), usiamo quello.
# 2. Se √® None (multi mode), prendiamo un fonema REALE presente in questo audio.
vis_target = TARGET_PHONEME
if vis_target is None and wrapper.config.use_conditioning:
    # Cerchiamo i fonemi presenti nella reference di questo sample
    ref_ipa = sample.get("reference_ipa", "")
    if ref_ipa:
        # Pulisci e splitta
        candidates = list(set(ref_ipa.replace(' ', ''))) # Fonemi unici
        # Filtra quelli non nel vocab
        candidates = [c for c in candidates if c in wrapper.vocab]
        if candidates:
            vis_target = random.choice(candidates)
            print(f"‚ÑπÔ∏è Multi-Phoneme Mode: Auto-selected target '/{vis_target}/' for visualization")

out_dir = str(PROJECT_DIR / "outputs" / "lmac" / "listenable_maps")

# Genera mappa (passando esplicitamente il target se serve)
out = generate_listenable_map(wrapper, audio_path, out_dir=out_dir, target_phoneme=vis_target)
print(f'\nüîä Generated listenable map: {out}')

In [None]:
# === Esempio ascoltabile ===
import matplotlib.pyplot as plt
from PIL import Image
from IPython.display import Audio, display

# Seleziona un audio dal dataset
sample_idx = 0  # Cambia per vedere altri esempi
sample = test_ds[sample_idx]

print(f'\nüìù === Sample {sample_idx} ===\n')
print(f'üìÑ Reference IPA: {sample.get("reference_ipa", "N/A")}')
print(f'üìÑ Text: {sample.get("text", "N/A")}')
print(f'üéØ Target Phoneme: {TARGET_PHONEME}')

# Ottieni predizione del modello
audio_arr = sample["audio"]
input_tensor = torch.tensor(audio_arr[None, :], dtype=torch.float32).to(wrapper.device)
attn_mask = torch.ones_like(input_tensor, dtype=torch.long).to(wrapper.device)

with torch.no_grad():
    if wrapper.backbone_type == 'hubert':
        out = wrapper.backbone(input_tensor, attention_mask=attn_mask, return_dict=True)
        logits = out.logits
    else:
        out = wrapper.backbone(input_tensor, attention_mask=attn_mask)
        logits = out['logits']
    
    pred_ids = torch.argmax(logits, dim=-1)[0]
    # Decode prediction
    pred_tokens = [wrapper.tokenizer.decode([tid.item()]) for tid in pred_ids if tid.item() != 0]
    # Remove duplicates (CTC collapse)
    collapsed = []
    for t in pred_tokens:
        if not collapsed or t != collapsed[-1]:
            collapsed.append(t)
    pred_text = ''.join(collapsed).replace('|', ' ').strip()

print(f'\nüîÆ Model Prediction: {pred_text}')

# Salva audio temporaneo
import soundfile as sf
tmp_path = Path(PROJECT_DIR) / 'outputs' / 'lmac' / 'tmp_audio.wav'
tmp_path.parent.mkdir(parents=True, exist_ok=True)
sf.write(tmp_path, audio_arr, 16000)
audio_path = str(tmp_path)

# Genera mappa ascoltabile
out_dir = str(PROJECT_DIR / 'outputs' / 'lmac' / 'listenable_maps')
result = generate_listenable_map(wrapper, audio_path, out_dir=out_dir, prefix=f'sample_{sample_idx}')

print(f'\nüîä Generated files:')
print(f'   - Masked audio: {result["masked_audio"]}')
print(f'   - Plot: {result["plot"]}')

# Visualizza immagine inline
print(f'\nüìä L-MAC Visualization:')
img = Image.open(result['plot'])
plt.figure(figsize=(14, 8))
plt.imshow(img)
plt.axis('off')
plt.title(f'L-MAC Mask for phoneme /{TARGET_PHONEME}/ - Sample {sample_idx}')
plt.tight_layout()
plt.show()

# Audio player (funziona su Jupyter/Kaggle)
print('\nüîà Original Audio:')
display(Audio(audio_arr, rate=16000))

print('\nüîà Masked Audio (regions relevant for target phoneme):')
masked_audio, _ = sf.read(result['masked_audio'])
display(Audio(masked_audio, rate=16000))

In [None]:
# === Genera video animato con playhead E AUDIO sincronizzato ===
!pip install moviepy -q

from matplotlib.animation import FuncAnimation
import matplotlib.pyplot as plt
import numpy as np
import tempfile
from moviepy.editor import VideoFileClip, AudioFileClip, CompositeAudioClip

def create_lmac_video_with_audio(audio_arr, mask, output_path, fps=30):
    """Crea video MP4 con playhead e audio sincronizzato."""
    sr = 16000
    duration = len(audio_arr) / sr
    n_frames = int(duration * fps)
    
    # Crea la figura
    fig, axes = plt.subplots(3, 1, figsize=(12, 8))
    
    # Plot 1: Waveform
    time_axis = np.linspace(0, duration, len(audio_arr))
    axes[0].plot(time_axis, audio_arr, color='#2c3e50', linewidth=0.5)
    axes[0].set_xlim(0, duration)
    axes[0].set_ylabel('Amplitude')
    axes[0].set_title('Waveform')
    line1 = axes[0].axvline(x=0, color='red', linewidth=2)
    
    # Plot 2: L-MAC Mask
    mask_time = np.linspace(0, duration, len(mask))
    axes[1].fill_between(mask_time, 0, mask, color='#e74c3c', alpha=0.7)
    axes[1].plot(mask_time, mask, color='#c0392b', linewidth=1)
    axes[1].set_xlim(0, duration)
    axes[1].set_ylim(0, 1)
    axes[1].set_ylabel('Mask Value')
    axes[1].set_title(f'L-MAC Mask for /{TARGET_PHONEME or "multi"}/')
    line2 = axes[1].axvline(x=0, color='red', linewidth=2)
    
    # Plot 3: Spectrogram
    try:
        import librosa
        import librosa.display
        S = np.abs(librosa.stft(audio_arr, n_fft=512, hop_length=160))
        S_db = librosa.amplitude_to_db(S, ref=np.max)
        librosa.display.specshow(S_db, sr=sr, hop_length=160, x_axis='time', y_axis='linear', ax=axes[2])
    except:
        from scipy.signal import spectrogram
        f, t, Sxx = spectrogram(audio_arr, fs=sr)
        axes[2].pcolormesh(t, f, 10*np.log10(Sxx+1e-9), shading='gouraud')
    axes[2].set_title('Spectrogram')
    line3 = axes[2].axvline(x=0, color='red', linewidth=2)
    
    plt.tight_layout()
    
    def update(frame):
        t = frame / fps
        line1.set_xdata([t, t])
        line2.set_xdata([t, t])
        line3.set_xdata([t, t])
        return line1, line2, line3
    
    # Salva video temporaneo (no audio)
    temp_video = "temp_video.mp4"
    anim = FuncAnimation(fig, update, frames=n_frames, interval=1000/fps, blit=True)
    anim.save(temp_video, writer='ffmpeg', fps=fps, dpi=100)
    plt.close(fig)
    
    # Salva audio temporaneo
    temp_audio = "temp_audio.wav"
    import soundfile as sf
    sf.write(temp_audio, audio_arr, sr)
    
    # Combina video + audio con moviepy
    video_clip = VideoFileClip(temp_video)
    audio_clip = AudioFileClip(temp_audio)
    final_clip = video_clip.set_audio(audio_clip)
    
    final_clip.write_videofile(output_path, codec='libx264', audio_codec='aac')
    
    # Cleanup
    os.remove(temp_video)
    os.remove(temp_audio)
    return output_path

# Genera video
video_dir = Path(PROJECT_DIR) / 'outputs' / 'lmac' / 'videos'
video_dir.mkdir(parents=True, exist_ok=True)
# Logica Target per il Video (simile a listenable maps)
vid_target = TARGET_PHONEME
if vid_target is None and wrapper.config.use_conditioning:
     # Cerca fonemi nell'audio corrente
     ref_ipa = sample.get("reference_ipa", "")
     if ref_ipa:
        candidates = [c for c in list(set(ref_ipa.replace(' ', ''))) if c in wrapper.vocab]
        if candidates:
            vid_target = candidates[0] # Prendi il primo per stabilit√† nel video
            
print(f"üé¨ Generating video for target: /{vid_target or 'multi'}/...")
with torch.no_grad():
    input_values = input_tensor
    attention_mask = attn_mask
    
    target_ids = None
    if wrapper.config.use_conditioning:
         if vid_target:
             tid = wrapper.vocab.get(vid_target, 0)
             target_ids = torch.tensor([tid], device=wrapper.device, dtype=torch.long)
         else:
             target_ids = torch.tensor([0], device=wrapper.device, dtype=torch.long)
    
    out = wrapper.forward(input_values, attention_mask, target_ids=target_ids)
    mask_arr = out["mask"].cpu().numpy()[0]
output_video = str(video_dir / f'lmac_sample_{sample_idx}.mp4')
create_lmac_video_with_audio(audio_arr, mask_arr, output_video)
# Visualizza nel notebook
from IPython.display import Video
display(Video(output_video, embed=True, width=800))

## üßπ Cleanup (Kaggle)

Libera spazio disco rimuovendo cache HuggingFace.

In [None]:
# Cleanup disk (Kaggle)
import shutil
if ENV == 'kaggle':
    for f in ['/root/.cache/huggingface']:
        if os.path.exists(f) and not os.path.islink(f):
            shutil.rmtree(f)
            print(f'üóëÔ∏è Cleaned: {f}')
    # Check disk space
    !df -h /kaggle/working