# Bimodal MOSEI (Text + Audio) Trainer ‚Äî **No MMSDK**, Colab-Ready

This notebook:
- Reads `.csd` (HDF5) files **directly via `h5py`** (no `mmsdk`).
- Supports **acoustic file in chunks** and merges them before use.
- Trains a small PyTorch regressor on **text + audio** features (mean-pooled per segment).
- Uses a simple 80/10/10 split.

**Setup tips**
- Put your dataset under `/content/dataset` or mount Google Drive.
- Expected paths (customize later):
  - `dataset/languages/CMU_MOSEI_TimestampedWordVectors.csd`
  - `dataset/acoustics/CMU_MOSEI_COVAREP.csd` (or chunks in that folder)
  - `dataset/labels/CMU_MOSEI_Labels.csd`

In [1]:
#@title 1) Environment check & installs
!nvidia-smi
import torch
print("CUDA available:", torch.cuda.is_available())
print("PyTorch CUDA version:", torch.version.cuda)

Thu Nov  6 08:03:11 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 550.54.15              Driver Version: 550.54.15      CUDA Version: 12.4     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  Tesla T4                       Off |   00000000:00:04.0 Off |                    0 |
| N/A   60C    P8             11W /   70W |       0MiB /  15360MiB |      0%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
                                                

In [2]:
#@title 2) (Optional) Mount Google Drive
from google.colab import drive
try:
    drive.mount('/content/drive')
    print('Drive mounted. Example path: /content/drive/MyDrive/Colab Notebooks/cmu_mosei_dataset')
except Exception as e:
    print('Drive mount skipped or failed:', e)


Mounted at /content/drive
Drive mounted. Example path: /content/drive/MyDrive/Colab Notebooks/cmu_mosei_dataset


In [3]:
#@title 3) Paths ‚Äî set your dataset location here
from pathlib import Path

DATA_ROOT = Path('/content/drive/MyDrive/Colab Notebooks/cmu_mosei_dataset')  # e.g., Path('/content/drive/MyDrive/mosei/dataset')

LANGUAGE = DATA_ROOT / 'languages/CMU_MOSEI_TimestampedWordVectors.csd'
ACOUSTIC = DATA_ROOT / 'acoustics/CMU_MOSEI_COVAREP.csd'
LABELS   = DATA_ROOT / 'labels/CMU_MOSEI_Labels.csd'

ACOUSTIC_CHUNKS_DIR = ACOUSTIC.parent

print('DATA_ROOT =', DATA_ROOT)
print('LANGUAGE  =', LANGUAGE)
print('ACOUSTIC  =', ACOUSTIC)
print('LABELS    =', LABELS)


DATA_ROOT = /content/drive/MyDrive/Colab Notebooks/cmu_mosei_dataset
LANGUAGE  = /content/drive/MyDrive/Colab Notebooks/cmu_mosei_dataset/languages/CMU_MOSEI_TimestampedWordVectors.csd
ACOUSTIC  = /content/drive/MyDrive/Colab Notebooks/cmu_mosei_dataset/acoustics/CMU_MOSEI_COVAREP.csd
LABELS    = /content/drive/MyDrive/Colab Notebooks/cmu_mosei_dataset/labels/CMU_MOSEI_Labels.csd


In [4]:
#@title 4) Utils: chunk merge, HDF5 readers, dataset
import os, time, random, shutil
import h5py
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

os.environ.setdefault('HDF5_USE_FILE_LOCKING', 'FALSE')

def log(msg: str):
    print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {msg}")

def detect_and_merge_chunks(chunks_dir: Path, base_filename: str, output_dir: Path) -> Path:
    output_dir.mkdir(parents=True, exist_ok=True)
    out_path = output_dir / base_filename
    patterns = ['*.csdchunk', 'part_*', '*.chunk']
    chunk_files = []
    for pat in patterns:
        matches = sorted(chunks_dir.glob(pat))
        matches = [m for m in matches if m.suffix != '.csd' and m.is_file()]
        if matches:
            chunk_files = matches
            break
    if not chunk_files:
        if out_path.exists():
            log(f'No chunks found; using existing {out_path}')
            return out_path
        log('No chunk files found. Skipping merge.')
        return out_path
    total_size = sum(cf.stat().st_size for cf in chunk_files)
    if out_path.exists() and out_path.stat().st_size == total_size:
        log(f'Merged file already present with matching size: {out_path}')
        return out_path
    log(f'Merging {len(chunk_files)} chunks from {chunks_dir} -> {out_path}')
    with open(out_path, 'wb') as w:
        for idx, cf in enumerate(chunk_files):
            with open(cf, 'rb') as r:
                shutil.copyfileobj(r, w, length=1024*1024)
            if (idx + 1) % 10 == 0 or idx == len(chunk_files)-1:
                log(f'  merged {idx+1}/{len(chunk_files)}')
    log('Merge complete.')
    return out_path

def choose_overlap_keys(lang_path: Path, acou_path: Path, labl_path: Path):
    with h5py.File(lang_path, 'r') as fl, h5py.File(acou_path, 'r') as fa, h5py.File(labl_path, 'r') as fb:
        def segs(f):
            out = []
            def visit(name, obj):
                if isinstance(obj, h5py.Group) and 'features' in obj:
                    out.append(name)
            f.visititems(visit)
            return set(out)
        L = segs(fl); A = segs(fa); B = segs(fb)
    keys = sorted(list(L & A & B))
    return keys

def read_feature_mean(h5path: Path, seg: str) -> np.ndarray:
    with h5py.File(h5path, 'r') as f:
        ds = f[seg]['features']
        arr = ds[()]
    arr = np.asarray(arr)
    return arr.astype(np.float32) if arr.ndim == 1 else arr.astype(np.float32).mean(axis=0)

class MoseiBimodalH5(Dataset):
    def __init__(self, lang_csd: Path, acou_csd: Path, labl_csd: Path, split: str, seed: int = 1337):
        self.lang = Path(lang_csd); self.acou = Path(acou_csd); self.labl = Path(labl_csd)
        all_keys = choose_overlap_keys(self.lang, self.acou, self.labl)
        if not all_keys:
            raise RuntimeError('No overlapping segments with features across the three CSD files.')
        random.Random(seed).shuffle(all_keys)
        n = len(all_keys); n_tr = int(0.8*n); n_va = int(0.1*n)
        if split == 'train':
            self.keys = all_keys[:n_tr]
        elif split == 'valid':
            self.keys = all_keys[n_tr:n_tr+n_va]
        else:
            self.keys = all_keys[n_tr+n_va:]
        xL = read_feature_mean(self.lang, self.keys[0])
        xA = read_feature_mean(self.acou, self.keys[0])
        self.lang_dim = int(xL.shape[-1]); self.acou_dim = int(xA.shape[-1])
    def __len__(self): return len(self.keys)
    def __getitem__(self, idx):
        k = self.keys[idx]
        xL = read_feature_mean(self.lang, k)
        xA = read_feature_mean(self.acou, k)
        y  = read_feature_mean(self.labl, k).mean().astype(np.float32)
        x  = np.concatenate([xL, xA], axis=-1).astype(np.float32)
        return torch.from_numpy(x), torch.tensor([y], dtype=torch.float32)

class SimpleRegressor(nn.Module):
    def __init__(self, in_dim, hidden=256, dropout=0.1):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(in_dim, hidden), nn.ReLU(), nn.Dropout(dropout),
            nn.Linear(hidden, hidden), nn.ReLU(), nn.Dropout(dropout),
            nn.Linear(hidden, 1)
        )
    def forward(self, x):
        return self.net(x)

def train_epoch(model, loader, device, optim, loss_fn):
    model.train(); total=0.0
    for xb, yb in loader:
        xb = xb.to(device); yb = yb.to(device)
        optim.zero_grad(); pred = model(xb); loss = loss_fn(pred, yb)
        loss.backward(); optim.step(); total += loss.item()*xb.size(0)
    return total/len(loader.dataset)

@torch.no_grad()
def eval_epoch(model, loader, device, loss_fn):
    model.eval(); total=0.0
    for xb, yb in loader:
        xb = xb.to(device); yb = yb.to(device)
        pred = model(xb); loss = loss_fn(pred, yb)
        total += loss.item()*xb.size(0)
    return total/len(loader.dataset)


In [5]:
#@title 5) Merge acoustic chunks (if any) and check files
if not ACOUSTIC.exists():
    print('Acoustic .csd not found. Attempting to merge chunks...')
    merged = detect_and_merge_chunks(ACOUSTIC_CHUNKS_DIR, ACOUSTIC.name, ACOUSTIC.parent)
    print('Merged path:', merged)
else:
    print('Acoustic .csd already exists:', ACOUSTIC)

for p in [LANGUAGE, ACOUSTIC, LABELS]:
    print(p, 'exists =', p.exists())


Acoustic .csd already exists: /content/drive/MyDrive/Colab Notebooks/cmu_mosei_dataset/acoustics/CMU_MOSEI_COVAREP.csd
/content/drive/MyDrive/Colab Notebooks/cmu_mosei_dataset/languages/CMU_MOSEI_TimestampedWordVectors.csd exists = True
/content/drive/MyDrive/Colab Notebooks/cmu_mosei_dataset/acoustics/CMU_MOSEI_COVAREP.csd exists = True
/content/drive/MyDrive/Colab Notebooks/cmu_mosei_dataset/labels/CMU_MOSEI_Labels.csd exists = True


In [7]:
from pathlib import Path
import h5py, itertools

def feature_groups(h5path: Path):
    out_full = []
    out_leaf = []
    with h5py.File(h5path, "r") as f:
        def visit(name, obj):
            if isinstance(obj, h5py.Group) and "features" in obj:
                out_full.append(name)
                out_leaf.append(name.split("/")[-1])
        f.visititems(visit)
    return set(out_full), set(out_leaf)

lang_full, lang_leaf = feature_groups(LANGUAGE)
acou_full, acou_leaf = feature_groups(ACOUSTIC)
labl_full, labl_leaf = feature_groups(LABELS)

print("Counts (full-path):", len(lang_full), len(acou_full), len(labl_full))
print("Counts (leaf-name):", len(lang_leaf), len(acou_leaf), len(labl_leaf))

print("\nLeaf-name intersections:")
leaf_inter = lang_leaf & acou_leaf & labl_leaf
print("  intersection size:", len(leaf_inter))
print("  examples:", list(itertools.islice(sorted(leaf_inter), 10)))

Counts (full-path): 3837 3836 3293
Counts (leaf-name): 3837 3836 3293

Leaf-name intersections:
  intersection size: 3292
  examples: ['--qXJuDtHPw', '-3g5yACwYnA', '-3nNcZdcdvU', '-571d8cVauQ', '-6rXp3zJ3kc', '-9YyBTjo1zo', '-9y-fZ3swSY', '-AUZQgSxyPQ', '-Alixo7euuU', '-Eqdz5y4pEY']


In [8]:
import os, time, random, shutil
import h5py, numpy as np, torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn

def _collect_leaf_to_full(h5path):
    """Map leaf segment name -> full HDF5 path containing 'features'."""
    map_ = {}
    with h5py.File(h5path, "r") as f:
        def visit(name, obj):
            if isinstance(obj, h5py.Group) and "features" in obj:
                leaf = name.split("/")[-1]
                # keep first occurrence; assuming leaf names are unique enough across files
                map_.setdefault(leaf, name)
        f.visititems(visit)
    return map_

def _read_feature_mean(h5path: Path, fullpath: str):
    with h5py.File(h5path, "r") as f:
        arr = f[fullpath]["features"][()]
    arr = np.asarray(arr)
    return arr.astype(np.float32) if arr.ndim == 1 else arr.astype(np.float32).mean(axis=0)

class MoseiBimodalH5(Dataset):
    """
    Aligns by LEAF group name (the final path component that holds 'features'),
    which is typically consistent across LANGUAGE / ACOUSTIC / LABELS CSDs.
    """
    def __init__(self, lang_csd, acou_csd, labl_csd, split="train", seed=1337):
        self.lang_p = Path(lang_csd); self.acou_p = Path(acou_csd); self.labl_p = Path(labl_csd)

        lang_map = _collect_leaf_to_full(self.lang_p)
        acou_map = _collect_leaf_to_full(self.acou_p)
        labl_map = _collect_leaf_to_full(self.labl_p)

        common_leaf = set(lang_map) & set(acou_map) & set(labl_map)
        if not common_leaf:
            raise RuntimeError("Still no overlap after leaf-name alignment. Check that the three CSDs correspond to the same MOSEI split/version.")

        keys = sorted(common_leaf)
        random.Random(seed).shuffle(keys)
        n = len(keys); n_tr = int(0.8*n); n_va = int(0.1*n)
        if split == "train":
            self.keys = keys[:n_tr]
        elif split == "valid":
            self.keys = keys[n_tr:n_tr+n_va]
        else:
            self.keys = keys[n_tr+n_va:]

        # store the fullpath per leaf for fast access
        self.lang_map, self.acou_map, self.labl_map = lang_map, acou_map, labl_map

        # infer dims
        k0 = self.keys[0]
        xL = _read_feature_mean(self.lang_p, self.lang_map[k0])
        xA = _read_feature_mean(self.acou_p, self.acou_map[k0])
        self.lang_dim = int(xL.shape[-1]); self.acou_dim = int(xA.shape[-1])

    def __len__(self): return len(self.keys)

    def __getitem__(self, idx):
        leaf = self.keys[idx]
        xL = _read_feature_mean(self.lang_p, self.lang_map[leaf])
        xA = _read_feature_mean(self.acou_p, self.acou_map[leaf])
        y  = _read_feature_mean(self.labl_p, self.labl_map[leaf]).mean().astype(np.float32)
        x  = np.concatenate([xL, xA], axis=-1).astype(np.float32)
        return torch.from_numpy(x), torch.tensor([y], dtype=torch.float32)

class SimpleRegressor(nn.Module):
    def __init__(self, in_dim, hidden=256, dropout=0.1):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(in_dim, hidden), nn.ReLU(), nn.Dropout(dropout),
            nn.Linear(hidden, hidden), nn.ReLU(), nn.Dropout(dropout),
            nn.Linear(hidden, 1)
        )
    def forward(self, x): return self.net(x)

In [11]:
# === NaN/empty-safe dataset + training guards ===
import h5py, numpy as np, torch, torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from pathlib import Path
import time, random

def log(msg): print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {msg}")

def _collect_leaf_to_full(h5path: Path):
    m = {}
    with h5py.File(h5path, "r") as f:
        def visit(name, obj):
            if isinstance(obj, h5py.Group) and "features" in obj:
                leaf = name.split("/")[-1]
                m.setdefault(leaf, name)
        f.visititems(visit)
    return m

def _read_feat(h5path: Path, fullpath: str):
    with h5py.File(h5path, "r") as f:
        arr = f[fullpath]["features"][()]
    arr = np.asarray(arr)
    # Safe pooling: handle empty & NaNs
    if arr.size == 0:
        return None
    if arr.ndim == 1:
        x = arr.astype(np.float32)
    else:
        x = np.nanmean(arr.astype(np.float32), axis=0)
    if not np.all(np.isfinite(x)):
        x = np.nan_to_num(x, nan=0.0, posinf=0.0, neginf=0.0)
    return x

def _read_label_scalar(h5path: Path, fullpath: str):
    with h5py.File(h5path, "r") as f:
        arr = f[fullpath]["features"][()]
    arr = np.asarray(arr, dtype=np.float32)
    if arr.size == 0:
        return None
    y = float(np.nanmean(arr))
    if not np.isfinite(y):
        return None
    return y

class MoseiBimodalH5Safe(Dataset):
    def __init__(self, lang_csd, acou_csd, labl_csd, split="train", seed=1337, max_scan=None):
        self.lang_p, self.acou_p, self.labl_p = Path(lang_csd), Path(acou_csd), Path(labl_csd)
        Lmap, Amap, Bmap = _collect_leaf_to_full(self.lang_p), _collect_leaf_to_full(self.acou_p), _collect_leaf_to_full(self.labl_p)
        common = sorted(set(Lmap) & set(Amap) & set(Bmap))
        if not common:
            raise RuntimeError("No common leaf keys across CSDs. Did you run the leaf-name version earlier?")

        # Scan and keep only segments with finite features & labels
        rng = random.Random(seed)
        rng.shuffle(common)
        if max_scan:  # speed limiter for massive scans (optional)
            common = common[:max_scan]

        valid = []
        lang_dim = acou_dim = None
        kept = 0
        for leaf in common:
            xL = _read_feat(self.lang_p, Lmap[leaf])
            xA = _read_feat(self.acou_p, Amap[leaf])
            y  = _read_label_scalar(self.labl_p, Bmap[leaf])
            if xL is None or xA is None or y is None:
                continue
            if lang_dim is None: lang_dim = int(xL.shape[-1])
            if acou_dim is None: acou_dim = int(xA.shape[-1])
            # Final safety: enforce dims
            if xL.shape[-1] != lang_dim or xA.shape[-1] != acou_dim:
                continue
            valid.append(leaf)
            kept += 1

        if kept == 0:
            raise RuntimeError("All samples filtered out as non-finite/empty.")

        n = len(valid); n_tr = int(0.8*n); n_va = int(0.1*n)
        if split == "train":
            self.keys = valid[:n_tr]
        elif split == "valid":
            self.keys = valid[n_tr:n_tr+n_va]
        else:
            self.keys = valid[n_tr+n_va:]

        self.Lmap, self.Amap, self.Bmap = Lmap, Amap, Bmap
        # Save dims
        self.lang_dim, self.acou_dim = lang_dim, acou_dim
        log(f"Dataset({split}) ‚Äî kept {len(self.keys)} samples | lang_dim={self.lang_dim}, acou_dim={self.acou_dim}")

    def __len__(self): return len(self.keys)

    def __getitem__(self, idx):
        leaf = self.keys[idx]
        xL = _read_feat(self.lang_p, self.Lmap[leaf])
        xA = _read_feat(self.acou_p, self.Amap[leaf])
        y  = _read_label_scalar(self.labl_p, self.Bmap[leaf])
        # After filtering these should be valid; still guard:
        if xL is None or xA is None or y is None:
            # Return zeroed fallback to avoid crashing batch; label 0
            # (rare; mainly if file changed after scan)
            xL = np.zeros(self.lang_dim, np.float32)
            xA = np.zeros(self.acou_dim, np.float32)
            y  = 0.0
        x = np.concatenate([xL, xA], axis=-1).astype(np.float32)
        x = np.nan_to_num(x, nan=0.0, posinf=0.0, neginf=0.0)
        return torch.from_numpy(x), torch.tensor([y], dtype=torch.float32)

# Replace your dataset references:
# train_ds = MoseiBimodalH5Safe(LANGUAGE, ACOUSTIC, LABELS, split='train')
# valid_ds = MoseiBimodalH5Safe(LANGUAGE, ACOUSTIC, LABELS, split='valid')

# Optional training-loop guards (paste into your train cell if you like):
def safe_train_epoch(model, loader, device, optim, loss_fn, clip=1.0):
    model.train(); tot=0.0; nobs=0
    for xb, yb in loader:
        xb = torch.nan_to_num(xb, nan=0.0, posinf=0.0, neginf=0.0).to(device)
        yb = torch.nan_to_num(yb, nan=0.0, posinf=0.0, neginf=0.0).to(device)
        mask = torch.isfinite(xb).all(dim=1) & torch.isfinite(yb).squeeze(1)
        if not mask.any():
            continue
        xb, yb = xb[mask], yb[mask]
        optim.zero_grad()
        pred = model(xb)
        loss = loss_fn(pred, yb)
        if not torch.isfinite(loss):  # guard explode
            continue
        loss.backward()
        if clip: nn.utils.clip_grad_norm_(model.parameters(), clip)
        optim.step()
        bs = xb.size(0); tot += loss.item()*bs; nobs += bs
    return (tot/nobs) if nobs else float('nan')

@torch.no_grad()
def safe_eval_epoch(model, loader, device, loss_fn):
    model.eval(); tot=0.0; nobs=0
    for xb, yb in loader:
        xb = torch.nan_to_num(xb, nan=0.0, posinf=0.0, neginf=0.0).to(device)
        yb = torch.nan_to_num(yb, nan=0.0, posinf=0.0, neginf=0.0).to(device)
        mask = torch.isfinite(xb).all(dim=1) & torch.isfinite(yb).squeeze(1)
        if not mask.any():
            continue
        xb, yb = xb[mask], yb[mask]
        pred = model(xb)
        loss = loss_fn(pred, yb)
        if not torch.isfinite(loss):
            continue
        bs = xb.size(0); tot += loss.item()*bs; nobs += bs
    return (tot/nobs) if nobs else float('nan')

log("Patched dataset & guards loaded. Now re-run the Train cell using MoseiBimodalH5Safe and safe_*_epoch.")


[2025-11-06 08:24:03] Patched dataset & guards loaded. Now re-run the Train cell using MoseiBimodalH5Safe and safe_*_epoch.


In [12]:
#@title 6) Train (configure hyperparameters here)
EPOCHS = 3 #@param {type:"integer"}
BATCH_SIZE = 64 #@param {type:"integer"}
LR = 1e-3 #@param {type:"number"}
NUM_WORKERS = 2 #@param {type:"integer"}
OUTDIR = 'runs_bimodal_h5' #@param {type:"string"}

import os
os.makedirs(OUTDIR, exist_ok=True)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Device:', device)

train_ds = MoseiBimodalH5Safe(LANGUAGE, ACOUSTIC, LABELS, split='train')
valid_ds = MoseiBimodalH5Safe(LANGUAGE, ACOUSTIC, LABELS, split='valid')

train_ld = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True, num_workers=NUM_WORKERS, pin_memory=True)
valid_ld = DataLoader(valid_ds, batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS, pin_memory=True)

in_dim = train_ds.lang_dim + train_ds.acou_dim
model = SimpleRegressor(in_dim).to(device)
optim = torch.optim.AdamW(model.parameters(), lr=LR)
loss_fn = nn.MSELoss()

best = float('inf')
for ep in range(1, EPOCHS+1):
    tr = safe_train_epoch(model, train_ld, device, optim, loss_fn, clip=1.0)
    va = safe_eval_epoch(model, valid_ld, device, loss_fn)
    log(f'Epoch {ep}: train_loss={tr:.4f}  valid_loss={va:.4f}')
    if va < best:
        best = va
        torch.save({'model': model.state_dict(), 'in_dim': in_dim}, f'{OUTDIR}/best.pt')
        log('  saved best.pt')
print('Done.')


Device: cuda
[2025-11-06 08:30:06] Dataset(train) ‚Äî kept 2633 samples | lang_dim=300, acou_dim=74
[2025-11-06 08:34:14] Dataset(valid) ‚Äî kept 329 samples | lang_dim=300, acou_dim=74
[2025-11-06 08:37:04] Epoch 1: train_loss=0.1612  valid_loss=0.0432
[2025-11-06 08:37:04]   saved best.pt
[2025-11-06 08:39:50] Epoch 2: train_loss=0.0217  valid_loss=0.0213
[2025-11-06 08:39:50]   saved best.pt
[2025-11-06 08:42:38] Epoch 3: train_loss=0.0178  valid_loss=0.0218
Done.


In [13]:
#@title 7) (Optional) Quick inference on a few samples
model.eval()
from itertools import islice
for i, (xb, yb) in enumerate(islice(valid_ld, 3)):
    with torch.no_grad():
        pred = model(xb.to(device)).cpu().numpy().ravel()
    print(f'Batch {i}: pred[0:3]={pred[:3]}  y[0:3]={yb.numpy().ravel()[:3]}')


Batch 0: pred[0:3]=[0.14582844 0.17016882 0.14452246]  y[0:3]=[0.52380955 0.15238096 0.09047619]
Batch 1: pred[0:3]=[0.11185965 0.23540294 0.10852221]  y[0:3]=[0.14285715 0.2190476  0.3904762 ]
Batch 2: pred[0:3]=[0.17836097 0.09962033 0.12599298]  y[0:3]=[0.12698413 0.07142857 0.21428572]


In [14]:
#@title 8) Testing

In [15]:
# One-cell test evaluation (MSE + Pearson) ‚Äî run after training
import torch, numpy as np
from torch.utils.data import DataLoader
try:
    from scipy.stats import pearsonr
except Exception:
    %pip -q install scipy
    from scipy.stats import pearsonr

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
loss_fn = torch.nn.MSELoss()

# Load best checkpoint into your existing `model`
ckpt = torch.load('runs_bimodal_h5/best.pt', map_location=device)
model.load_state_dict(ckpt['model'])
model.to(device).eval()

# Build test dataset/loader (uses the NaN-safe dataset you added)
test_ds = MoseiBimodalH5Safe(LANGUAGE, ACOUSTIC, LABELS, split='test')
test_ld = DataLoader(test_ds, batch_size=64, shuffle=False, num_workers=0, pin_memory=True)

# Eval loop with NaN/Inf guards
def _eval_loop(m, ld):
    m.eval(); tot=0.0; n=0; preds=[]; trues=[]
    with torch.no_grad():
        for xb, yb in ld:
            xb = torch.nan_to_num(xb, nan=0.0, posinf=0.0, neginf=0.0).to(device)
            yb = torch.nan_to_num(yb, nan=0.0, posinf=0.0, neginf=0.0).to(device)
            mask = torch.isfinite(xb).all(dim=1) & torch.isfinite(yb).squeeze(1)
            if not mask.any():
                continue
            xb, yb = xb[mask], yb[mask]
            out = m(xb)
            loss = loss_fn(out, yb)
            bs = xb.size(0)
            tot += loss.item() * bs; n += bs
            preds.append(out.detach().cpu().numpy().ravel())
            trues.append(yb.detach().cpu().numpy().ravel())
    preds = np.concatenate(preds) if preds else np.array([])
    trues = np.concatenate(trues) if trues else np.array([])
    return (tot/n if n else float('nan')), preds, trues

mse, preds, trues = _eval_loop(model, test_ld)
mask = np.isfinite(preds) & np.isfinite(trues)
r = pearsonr(preds[mask], trues[mask])[0] if mask.any() else float('nan')

print(f"Test MSE: {mse:.4f} | Pearson r: {r:.3f} | Samples: {mask.sum()}/{len(test_ds)}")
print("Sample preds vs gold:", list(zip(preds[:5].round(3), trues[:5].round(3))))

[2025-11-06 08:50:38] Dataset(test) ‚Äî kept 330 samples | lang_dim=300, acou_dim=74
Test MSE: 0.0205 | Pearson r: 0.370 | Samples: 330/330
Sample preds vs gold: [(np.float32(0.141), np.float32(0.286)), (np.float32(0.263), np.float32(0.231)), (np.float32(0.146), np.float32(0.238)), (np.float32(0.17), np.float32(0.017)), (np.float32(0.232), np.float32(0.196))]


In [18]:
!pip install -q gradio librosa transformers soundfile torch
import gradio as gr, torch, librosa, numpy as np
from transformers import AutoTokenizer, AutoModel

# Load text embedding model
tok = AutoTokenizer.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")
txt_model = AutoModel.from_pretrained("sentence-transformers/all-MiniLM-L6-v2").eval().to("cuda" if torch.cuda.is_available() else "cpu")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.eval().to(device)

def voice_to_emotion(audio, text):
    # 1Ô∏è‚É£ Extract audio features (MFCC mean)
    if audio is None:
        return "Please record or upload an audio clip."
    y, sr = librosa.load(audio, sr=16000)
    mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=74)
    audio_feat = np.mean(mfcc, axis=1)

    # 2Ô∏è‚É£ Get text embedding (optional)
    if text.strip():
        inputs = tok(text, return_tensors="pt", truncation=True, padding=True).to(device)
        with torch.no_grad():
            text_emb = txt_model(**inputs).last_hidden_state.mean(dim=1).cpu().numpy()[0]
    else:
        text_emb = np.zeros(300)  # same dim as language feature space

    # 3Ô∏è‚É£ Concatenate & predict
    x = np.concatenate([text_emb, audio_feat]).astype(np.float32)
    x = torch.tensor(x).unsqueeze(0).to(device)
    with torch.no_grad():
        pred = model(x).cpu().item()

    # 4Ô∏è‚É£ Interpret score
    emo = "üòä Positive" if pred > 0.6 else "üòê Neutral" if pred > 0.4 else "üòû Negative"
    return f"{emo}  (score = {pred:.2f})"

demo = gr.Interface(
    fn=voice_to_emotion,
    inputs=[
        gr.Audio(label="üéôÔ∏è Speak or Upload Audio", type="filepath"),
        gr.Textbox(label="‚úçÔ∏è Transcript (optional)")
    ],
    outputs="text",
    title="Bimodal Voice + Text Emotion Demo"
)

demo.launch(share=True)


Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://97b4199f180a99de50.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


