In [18]:
# creare csv con file path e path audio corrispondente CON BABELE

import os
import glob
import subprocess
import torch



os.environ["ACCELERATE_MIXED_PRECISION"] = "no"
os.environ.pop("USE_FP16", None)
os.environ.setdefault("PYTORCH_MPS_HIGH_WATERMARK_RATIO", "0.0")
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
print(f"Device: {device}")

dataset_dir = "/Users/ludovicagenovese/Documents/GitHub/mothertongueVSspoken/BABELE"
audio_dir   = os.path.join(dataset_dir, "audio_wav")
csv_path    = os.path.join(dataset_dir, "dataset.csv")
os.makedirs(audio_dir, exist_ok=True)

video_paths = glob.glob(os.path.join(dataset_dir, "**/*.mp4"), recursive=True)

rows = []
for vp in video_paths:
    if   "English" in vp: label = 0
    elif "Italian" in vp: label = 1
    #elif "french"  in vp: label = 3
    elif "Spanish" in vp: label = 2
    else:
        print("Lingua non riconosciuta → salto:", vp)
        continue

    fname = os.path.splitext(os.path.basename(vp))[0]
    wav   = os.path.join(audio_dir, f"{fname}{label}language.wav")

    if not os.path.exists(wav):
        res = subprocess.run(
            ["ffmpeg", "-i", vp, "-ar", "16000", "-ac", "1",
             "-f", "wav", "-vn", wav],
            stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
        )
        if res.returncode != 0:
            print("ffmpeg errore → salto:", vp)
            continue        # non appendere se fallita conversione

    rows.append({"audio_path": wav, "video_path": vp, "label": label})


if not os.path.exists(csv_path):
    import pandas as pd
    pd.DataFrame(rows).to_csv(csv_path, index=False)
    print(f"CSV creato: {csv_path}")




Device: mps
Lingua non riconosciuta → salto: /Users/ludovicagenovese/Documents/GitHub/mothertongueVSspoken/BABELE/French/8_1_2_3_25.mp4
Lingua non riconosciuta → salto: /Users/ludovicagenovese/Documents/GitHub/mothertongueVSspoken/BABELE/French/8_2_1_5_25.mp4
Lingua non riconosciuta → salto: /Users/ludovicagenovese/Documents/GitHub/mothertongueVSspoken/BABELE/French/8_2_1_7_25.mp4
Lingua non riconosciuta → salto: /Users/ludovicagenovese/Documents/GitHub/mothertongueVSspoken/BABELE/French/8_1_2_1_25.mp4
Lingua non riconosciuta → salto: /Users/ludovicagenovese/Documents/GitHub/mothertongueVSspoken/BABELE/French/8_2_2_2_50.mp4
Lingua non riconosciuta → salto: /Users/ludovicagenovese/Documents/GitHub/mothertongueVSspoken/BABELE/French/8_1_1_2_25.mp4
Lingua non riconosciuta → salto: /Users/ludovicagenovese/Documents/GitHub/mothertongueVSspoken/BABELE/French/8_2_2_4_25.mp4
Lingua non riconosciuta → salto: /Users/ludovicagenovese/Documents/GitHub/mothertongueVSspoken/BABELE/French/8_1_2_5_25.

In [None]:
import whisper, pandas as pd, torch, soundfile as sf
from sklearn.metrics import classification_report, confusion_matrix
from tqdm import tqdm
import webrtcvad
import numpy as np
import torch.nn.functional as F

MODEL_NAME   = "small"
CSV          = "/Users/ludovicagenovese/.../dataset.csv"
id2lang      = {0: "en", 1: "it", 2: "es"}
SR           = 16000
MAX_SECONDS  = 10         # 10 secondi di parlato
MAX_SAMPLES  = MAX_SECONDS * SR
vad          = webrtcvad.Vad(1)

# 1) estrai l’audio “speech-only” come numpy array (float32, mono)
def extract_speech_np(audio: np.ndarray, sr: int, vad: webrtcvad.Vad, max_samples: int) -> np.ndarray:
    frame_ms     = 30
    frame_len    = int(sr * frame_ms / 1000)
    audio_int16  = (audio * 32767).astype(np.int16)
    speech_bytes = bytearray()
    speech_on    = False

    for i in range(0, len(audio_int16), frame_len):
        chunk = audio_int16[i:i+frame_len]
        if len(chunk) < frame_len:
            break
        b = chunk.tobytes()
        if vad.is_speech(b, sr):
            speech_on = True
            speech_bytes += b
        elif speech_on:
            speech_bytes += b
        if len(speech_bytes) >= max_samples * 2:
            break

    speech = np.frombuffer(speech_bytes, dtype=np.int16).astype(np.float32) / 32767
    return speech

# carica modello e dataset
model  = whisper.load_model(MODEL_NAME).to("cuda" if torch.cuda.is_available() else "cpu")
df     = pd.read_csv(CSV, usecols=["audio_path", "label"])
df["label"] = df["label"].map(id2lang)

preds, golds = [], []
for wav, true_lang in tqdm(df.itertuples(index=False), total=len(df), desc="LID"):
    audio, sr = sf.read(wav)
    if sr != SR:
        raise ValueError(f"{wav}: campionamento ≠16 kHz")

    # 2) mono e float32
    if audio.ndim > 1:
        audio = audio.mean(axis=1)
    audio = audio.astype(np.float32)

    # 3) VAD → numpy speech-only
    speech_np = extract_speech_np(audio, SR, vad, MAX_SAMPLES)

    # 4) torch.Tensor 1D + pad/trim manuale
    x = torch.from_numpy(speech_np)
    if x.dim() > 1:
        x = x.squeeze()
    if x.numel() < MAX_SAMPLES:
        x = F.pad(x, (0, MAX_SAMPLES - x.numel()))
    else:
        x = x[:MAX_SAMPLES]

    # 5) mel + detect_language
    mel       = whisper.log_mel_spectrogram(x).to(model.device)
    _, probs  = model.detect_language(mel)
    pred_lang = max(probs, key=probs.get)

    preds.append(pred_lang)
    golds.append(true_lang)

# report finale
labels = ["en","it","es"]
print(classification_report(golds, preds, labels=labels, target_names=labels, digits=3))
print("Confusion matrix\n", confusion_matrix(golds, preds, labels=labels))


LID:   0%|          | 0/95 [00:00<?, ?it/s]


AssertionError: incorrect audio shape

In [None]:
#estrazione delle feature dal teacher
import whisper, torch, soundfile as sf, json, numpy as np, pandas as pd
from tqdm import tqdm
import webrtcvad

dataset_dir  = "/Users/ludovicagenovese/Documents/GitHub/mothertongueVSspoken/BABELE/dataset.csv"                 
out_dir = "/Users/ludovicagenovese/Documents/GitHub/mothertongueVSspoken/BABELE/distillation_dataset.csv"

model = whisper.load_model("small")
device = model.device
id2lang = {0: "en", 1: "it", 2: "es"}



vad = webrtcvad.Vad(3)   # 0–3 aggressività

def frame_generator(frame_duration_ms, audio, sample_rate):
    n = int(sample_rate * (frame_duration_ms / 1000.0))
    offset = 0
    while offset + n < len(audio):
        yield audio[offset:offset + n]
        offset += n
# num di secondi presi in considerazione - per adesso cambia son 10
def vad_collector(audio, sample_rate, max_speech_s=10):
    
    frames = list(frame_generator(10, audio, sample_rate))
    speech_segments, curr_start, speech_time = [], None, 0.0
    t0 = 0.0
    for f in frames:
        is_speech = vad.is_speech((f*32768).astype('int16').tobytes(), sample_rate)
        if is_speech:
            if curr_start is None:
                curr_start = t0
        else:
            if curr_start is not None:
                seg_dur = t0 - curr_start
                speech_segments.append((curr_start, t0))
                speech_time += seg_dur
                if speech_time >= max_speech_s:
                    break
                curr_start = None
        t0 += 0.03
    # se termina parlando
    if curr_start is not None and speech_time < max_speech_s:
        speech_segments.append((curr_start, curr_start + min(30 - speech_time, t0 - curr_start)))
    # tronca all’esatto 30 s se serve
    total = 0.0
    out = []
    for s,e in speech_segments:
        dur = e-s
        if total+dur <= max_speech_s:
            out.append((s,e))
            total += dur
        else:
            out.append((s, s + (max_speech_s-total)))
            break
    return out  # list of (start_s, end_s)


rows = []
for vid, wav, lbl in tqdm(
        pd.read_csv(dataset_dir)[["video_path", "audio_path", "label"]].values):

    audio, sr = sf.read(wav)                 
    if sr != 16000: # whisper vuole i 16 hkz
        raise ValueError(f"{wav}: sample-rate ≠ 16 kHz")
    if audio.ndim == 2:                      #Se l’audio ha due canali, ne fa la media per ottenere un vettore 1-D.
        audio = audio.mean(axis=1)

    audio = torch.from_numpy(audio).to(device).float()

    # primi 30 secondi di solo parlato
    raw = audio.cpu().numpy()        
    segs = vad_collector(raw, sr, max_speech_s=10.0)

    clipped = np.concatenate([raw[int(s*sr):int(e*sr)] for s,e in segs])
    audio   = torch.from_numpy(clipped).to(device).float()   
                  
    # roba di tensori e di come tradformarli nella forma che vuole whisper 
    mel   = whisper.log_mel_spectrogram(audio)          
    mel   = mel.to(device).float()                      


    mel = mel.unsqueeze(0)  
                            
    #detect della lingua
    with torch.no_grad():
        enc   = model.encoder(mel)                      
        probs = model.detect_language(mel)[1]

    emb = enc.mean(1).squeeze(0).cpu().numpy()
    # salva le soft labels
    hid_path   = wav + ".hid.npy"
    probs_path = wav + ".probs.json"
    np.save(hid_path, emb)
    json.dump(probs, open(probs_path, "w"))

    rows.append([vid, wav, json.dumps(segs), probs_path, hid_path, id2lang[lbl]])


pd.DataFrame(rows,
    columns=["video_path","audio_path","segments","probs_path","hid_path","label"]
).to_csv(out_dir, index=False)



100%|██████████| 95/95 [01:37<00:00,  1.02s/it]


In [3]:
import os
import cv2
import json
import numpy as np
import pandas as pd
from tqdm import tqdm
from sklearn.model_selection import train_test_split
import face_alignment
from face_alignment import FaceAlignment

# --- Configurazione ---
distill_csv = "/Users/ludovicagenovese/Documents/GitHub/mothertongueVSspoken/BABELE/distillation_dataset.csv"
out_dir     = "video_mouth"
os.makedirs(out_dir, exist_ok=True)

# Inizializza FaceAlignment su CPU (2D landmarks di default)
fa = FaceAlignment(face_alignment.LandmarksType.TWO_D, device="cpu")

def mouth_roi(frame, landmarks, size=96, scale=1.4):
    pts = landmarks[48:68]
    cx, cy = pts.mean(axis=0)
    w = max(np.ptp(pts[:,0]), np.ptp(pts[:,1])) * scale
    x1, y1 = int(cx - w/2), int(cy - w/2)
    crop = frame[y1:y1+int(w), x1:x1+int(w)]
    return cv2.resize(crop, (size, size))

# Leggi il manifest
mani = pd.read_csv(distill_csv)
mouth_paths = []

# Processa ogni video
for idx, row in tqdm(mani.iterrows(), total=len(mani), desc="Video"):
    video_path = row["video_path"]
    print(f"\n▶️ Processing video [{idx+1}/{len(mani)}]: {video_path}")
    segments = json.loads(row.get("segments", "[]"))
    
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS) or 25
    frames = []

    # Scorri i segmenti parlati
    for seg_i, (start_s, end_s) in enumerate(segments, start=1):
        start_f = int(start_s * fps)
        end_f   = int(end_s   * fps)
        cap.set(cv2.CAP_PROP_POS_FRAMES, start_f)
        print(f"  Segment {seg_i}/{len(segments)} → frames {start_f}–{end_f}")

        for f in range(start_f, end_f):
            ok, frame = cap.read()
            if not ok:
                print(f"    ⚠️ Unexpected end at frame {f}")
                break

            lm = fa.get_landmarks(frame)
            if lm is None or len(lm) == 0:
                continue

            roi = mouth_roi(frame, lm[0])
            frames.append(roi)
            print(f"    Estratto frame {f}")

    cap.release()

    if not frames:
        print(f"⚠️ Nessun mouth ROI estratto per {video_path}")
        mouth_paths.append("")
        continue

    arr = np.stack(frames).astype("uint8")
    out_path = os.path.join(out_dir, os.path.basename(video_path) + ".npy")
    np.save(out_path, arr)
    mouth_paths.append(out_path)
    print(f"✅ Salvati {len(frames)} frame in {out_path}")

# Aggiorna il manifest con i percorsi dei .npy
mani["mouth_path"] = mouth_paths
mani.to_csv(distill_csv, index=False)

# Split stratificato in train/val/test
t_train, temp = train_test_split(mani, test_size=0.30, stratify=mani.label, random_state=42)
t_val,   t_test = train_test_split(temp,   test_size=0.50, stratify=temp.label, random_state=42)

t_train.to_csv("dist_train.csv", index=False)
t_val.  to_csv("dist_val.csv",   index=False)
t_test. to_csv("dist_test.csv",  index=False)



Video:   0%|          | 0/95 [00:00<?, ?it/s]


▶️ Processing video [1/95]: /Users/ludovicagenovese/Documents/GitHub/mothertongueVSspoken/BABELE/Italian/1_1_2_21_25.mp4
  Segment 1/17 → frames 29–31
    Estratto frame 29
    Estratto frame 30
  Segment 2/17 → frames 45–154
    Estratto frame 45
    Estratto frame 46
    Estratto frame 47
    Estratto frame 48
    Estratto frame 49
    Estratto frame 50
    Estratto frame 51
    Estratto frame 52
    Estratto frame 53
    Estratto frame 54
    Estratto frame 55
    Estratto frame 56
    Estratto frame 57
    Estratto frame 58
    Estratto frame 59
    Estratto frame 60
    Estratto frame 61
    Estratto frame 62
    Estratto frame 63
    Estratto frame 64
    Estratto frame 65
    Estratto frame 66
    Estratto frame 67
    Estratto frame 68
    Estratto frame 69
    Estratto frame 70
    Estratto frame 71
    Estratto frame 72
    Estratto frame 73
    Estratto frame 74
    Estratto frame 75
    Estratto frame 76
    Estratto frame 77
    Estratto frame 78
    Estratto frame 79
   

Video:   0%|          | 0/95 [13:29<?, ?it/s]


KeyboardInterrupt: 

In [None]:
#dataset.py non ho capito ma okay

import torch, json, numpy as np, pandas as pd

class LipKD(torch.utils.data.Dataset):
    def __init__(self, manifest):
        self.df = pd.read_csv(manifest)
        self.lang2id = {"en":0,"it":1,"es":2}
    def __len__(self): return len(self.df)
    def __getitem__(self, idx):
        row   = self.df.iloc[idx]
        video = np.load(row.mouth_path) / 255.0           # T,H,W,3
        video = torch.from_numpy(video).permute(0,3,1,2)  # T,C,H,W
        logits_t = torch.tensor(
            [json.load(open(row.probs_path))[k] for k in ["en","it","es"]])
        emb_t   = torch.from_numpy(np.load(row.hid_path)) # 1280
        label   = self.lang2id[row.label]
        return video.float(), logits_t.float(), emb_t.float(), label


In [None]:
#creazione student

import torch, torch.nn as nn
class LipStudent(nn.Module):
    def __init__(self, d_emb=256, n_classes=3):
        super().__init__()
        self.backbone = nn.Sequential(      # 3D-Conv minimale
            nn.Conv3d(3, 32, (3,5,5), (1,2,2), 1), nn.ReLU(),
            nn.MaxPool3d((1,2,2)),
            nn.Conv3d(32, 64, (3,3,3), 1, 1), nn.ReLU(),
            nn.AdaptiveAvgPool3d((1,1,1)))
        self.fc_emb = nn.Linear(64, d_emb)          # eᵥ
        self.fc_cls = nn.Linear(d_emb, n_classes)   # logitsᵥ
        # proiezioni
        self.Pa = nn.Linear(1280, d_emb, bias=False)   # freeze dopo init
        torch.nn.init.eye_(self.Pa.weight)             # identity trim
    def forward(self, x, emb_t):
        z = self.backbone(x).flatten(1)
        e_v = self.fc_emb(z)
        logits_v = self.fc_cls(e_v)
        proj_a   = self.Pa(emb_t)
        return logits_v, e_v, proj_a


In [None]:
# train_student.py
import torch, torch.nn.functional as F, torch.optim as optim
from dataset import LipKD
from model   import LipStudent

train_set = LipKD("manifest_train.csv")
val_set   = LipKD("manifest_val.csv") 
loader    = torch.utils.data.DataLoader(train_set, batch_size=16, shuffle=True)

net = LipStudent().cuda()
opt = optim.AdamW(net.parameters(), 3e-4)
T, λ_KL, λ_MSE, λ_CE = 2.0, 1.0, 0.5, 0.2

for epoch in range(30):
    for vid, log_t, emb_t, y in loader:
        vid, log_t, emb_t, y = vid.cuda(), log_t.cuda(), emb_t.cuda(), y.cuda()
        log_s, e_v, p_a = net(vid, emb_t)

        kl  = F.kl_div(F.log_softmax(log_s/T, -1),
                       (log_t/T), reduction="batchmean")
        mse = F.mse_loss(e_v, p_a)
        ce  = F.cross_entropy(log_s, y)

        loss = λ_KL*kl + λ_MSE*mse + λ_CE*ce
        opt.zero_grad(); loss.backward(); opt.step()
    print(f"epoch {epoch:02d}  loss {loss.item():.4f}")


In [None]:
# test_student.py
import torch, pandas as pd
from sklearn.metrics import classification_report, confusion_matrix
from dataset import LipKD
from model   import LipStudent

LANGS = ["en","it","es"]
labels_id = {l:i for i,l in enumerate(LANGS)}

net = LipStudent().cuda().eval()
net.load_state_dict(torch.load("best_student.pth"))

test_loader = torch.utils.data.DataLoader(
        LipKD("manifest_test.csv"), batch_size=16, shuffle=False)

y_true, y_pred = [], []

with torch.no_grad():
    for vid, _, _, lbl in test_loader:      # _ = teacher info, inutili ora
        vid = vid.cuda()
        logits, _, _ = net(vid, torch.zeros(len(vid),1280).cuda())
        y_pred += logits.argmax(-1).cpu().tolist()
        y_true += lbl.tolist()

print(classification_report(
        y_true, y_pred, target_names=LANGS, digits=3))
print(confusion_matrix(y_true, y_pred))
