In [None]:
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import pandas as pd
from tqdm import tqdm
from pathlib import Path
from torch.utils.data import Dataset, DataLoader
from transformers import (
    Wav2Vec2Processor, Wav2Vec2Model,
    AutoFeatureExtractor, HubertModel,
    WhisperFeatureExtractor, WhisperModel
)
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import (
    confusion_matrix, accuracy_score, precision_score, 
    recall_score, f1_score
)
from sklearn.model_selection import StratifiedShuffleSplit
import torchaudio
import json
import warnings
warnings.filterwarnings('ignore')

# ============================================================
# DEVICE & CONSTANTS
# ============================================================
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"DEVICE = {DEVICE}")

DATA_DIR = "/kaggle/input/vietnamese-speech-emotion-recognition-dataset"
OUTPUT_DIR = "models"
os.makedirs(OUTPUT_DIR, exist_ok=True)

EMOTION_CLASSES = ['happy', 'neutral', 'sad', 'angry']
EMOTION_MAP = {label: idx for idx, label in enumerate(EMOTION_CLASSES)}

# Config for training
CONFIG = {
    'min_duration': 0.5,
    'max_duration': 15.0,
    'sample_rate': 16000,
    'batch_size': 16,
    'epochs': 5,
    'warmup_steps': 500,
    'learning_rate': 1e-5,
    'weight_decay': 0.01,
    'patience': 5,
    'random_seed': 42,
}

torch.manual_seed(CONFIG['random_seed'])
np.random.seed(CONFIG['random_seed'])

In [None]:
print("\n" + "="*60)
print("STEP 1: LOAD DATA FROM FOLDER STRUCTURE")
print("="*60)

def load_audio_dataset(data_dir):
    datasets = {}
    
    for split in ['train/phase1', 'train/phase2', 'val', 'test']:
        split_path = os.path.join(data_dir, split)
        data = []
        
        if not os.path.exists(split_path):
            print(f"Warning: {split_path} not found!")
            continue
        
        for emotion in EMOTION_CLASSES:
            emotion_dir = os.path.join(split_path, emotion)
            
            if not os.path.exists(emotion_dir):
                continue
            
            for filename in os.listdir(emotion_dir):
                if filename.endswith('.wav'):
                    filepath = os.path.join(emotion_dir, filename)
                    try:
                        # Load audio & calculate duration
                        waveform, sr = torchaudio.load(filepath)
                        
                        # Resample if needed
                        if sr != CONFIG['sample_rate']:
                            waveform = torchaudio.functional.resample(
                                waveform, sr, CONFIG['sample_rate']
                            )
                            sr = CONFIG['sample_rate']
                        
                        duration = waveform.shape[1] / sr
                        
                        # Filter by duration
                        if CONFIG['min_duration'] <= duration <= CONFIG['max_duration']:
                            data.append({
                                'audio_path': filepath,
                                'emotion': emotion,
                                'emotion_id': EMOTION_MAP[emotion],
                                'duration': duration,
                                'split': split.split('/') if '/' in split else split
                            })
                    except Exception as e:
                        print(f"Error loading {filepath}: {e}")
        
        if data:
            datasets[split] = pd.DataFrame(data)
            print(f"{split}: {len(data)} files")
            print(f"  Emotion distribution: {datasets[split]['emotion'].value_counts().to_dict()}")
    
    return datasets

datasets = load_audio_dataset(DATA_DIR)

# Check if we have data
df_phase1 = datasets.get('train/phase1', pd.DataFrame())
df_phase2 = datasets.get('train/phase2', pd.DataFrame())
df_val = datasets.get('val', pd.DataFrame())
df_test = datasets.get('test', pd.DataFrame())

print(f"\nTotal samples:")
print(f"  Phase1: {len(df_phase1)}")
print(f"  Phase2: {len(df_phase2)}")
print(f"  Val: {len(df_val)}")
print(f"  Test: {len(df_test)}")

In [None]:
print("\n" + "="*60)
print("STEP 2: DATASET CLASS")
print("="*60)

class AudioEmotionDataset(Dataset):
    """Load audio files and compute features"""
    def __init__(self, df, processor=None, feature_extractor=None, mode='wav2vec'):
        self.df = df.reset_index(drop=True)
        self.processor = processor
        self.feature_extractor = feature_extractor
        self.mode = mode
    
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        audio_path = row['audio_path']
        label = row['emotion_id']
        
        # Load audio
        waveform, sr = torchaudio.load(audio_path)
        
        # Ensure mono
        if waveform.shape[0] > 1:
            waveform = waveform.mean(0, keepdim=True)
        
        waveform = waveform.squeeze(0).numpy().astype(np.float32)
        
        # Resample if needed
        if sr != CONFIG['sample_rate']:
            waveform = torchaudio.functional.resample(
                torch.from_numpy(waveform), sr, CONFIG['sample_rate']
            ).numpy()
        
        return {
            'waveform': waveform,
            'labels': label
        }

In [None]:
print("\nSetting up collate functions...")

# WAV2VEC2 COLLATE
wav2vec_processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base")

def collate_wav2vec(batch):
    waveforms = [item['waveform'] for item in batch]
    labels = [item['labels'] for item in batch]
    
    # Process with padding
    padded = wav2vec_processor(
        waveforms,
        sampling_rate=CONFIG['sample_rate'],
        return_tensors="pt",
        padding=True
    )
    
    return {
        'input_values': padded['input_values'],
        'labels': torch.tensor(labels, dtype=torch.long)
    }

# HUBERT COLLATE
hubert_extractor = AutoFeatureExtractor.from_pretrained("facebook/hubert-base-ls960")

def collate_hubert(batch):
    waveforms = [item['waveform'] for item in batch]
    labels = [item['labels'] for item in batch]
    
    # Extract features with padding
    features = hubert_extractor(
        waveforms,
        sampling_rate=CONFIG['sample_rate'],
        return_tensors="pt",
        padding=True
    )
    
    return {
        'input_values': features['input_values'],
        'labels': torch.tensor(labels, dtype=torch.long)
    }

# WHISPER COLLATE
whisper_extractor = WhisperFeatureExtractor.from_pretrained("openai/whisper-tiny")

def collate_whisper(batch):
    waveforms = [item['waveform'] for item in batch]
    labels = [item['labels'] for item in batch]
    
    # Extract mel spectrogram
    features = whisper_extractor(
        waveforms,
        sampling_rate=CONFIG['sample_rate'],
        return_tensors="pt"
    )
    
    # Pad to max length
    input_features = features['input_features']
    max_len = 3000
    
    padded = []
    for feat in input_features:
        if feat.shape[-1] < max_len:
            pad = torch.nn.functional.pad(
                feat, (0, max_len - feat.shape[-1]), mode='constant', value=0
            )
        else:
            pad = feat[:, :max_len]
        padded.append(pad)
    
    return {
        'input_features': torch.stack(padded),
        'labels': torch.tensor(labels, dtype=torch.long)
    }

In [None]:
print("\nLoading model architectures...")

# --- MODEL 1: Wav2Vec2 + MLP ---
class Wav2VecClassifier(nn.Module):
    def __init__(self, num_labels=4):
        super().__init__()
        self.encoder = Wav2Vec2Model.from_pretrained("facebook/wav2vec2-base")
        hidden_size = self.encoder.config.hidden_size  # 768
        
        self.classifier = nn.Sequential(
            nn.Linear(hidden_size, 256),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(128, num_labels)
        )
    
    def forward(self, input_values, attention_mask=None, labels=None):
        # Encode
        out = self.encoder(input_values=input_values, attention_mask=attention_mask)
        hidden = out.last_hidden_state  # [B, T, 768]
        
        # Mean pooling
        if attention_mask is not None:
            # Mask padding tokens
            mask = attention_mask.unsqueeze(-1).float()  # [B, T, 1]
            pooled = (hidden * mask).sum(1) / mask.sum(1)
        else:
            pooled = hidden.mean(dim=1)
        
        # Classify
        logits = self.classifier(pooled)
        
        loss = None
        if labels is not None:
            loss = nn.CrossEntropyLoss()(logits, labels)
        
        return {'logits': logits, 'loss': loss}

# --- MODEL 2: HuBERT + CNN ---
class HubertCNNClassifier(nn.Module):
    def __init__(self, num_labels=4):
        super().__init__()
        self.encoder = HubertModel.from_pretrained("facebook/hubert-base-ls960")
        hidden_size = self.encoder.config.hidden_size  # 768
        
        # CNN 1D head
        self.conv1 = nn.Conv1d(hidden_size, 256, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm1d(256)
        self.conv2 = nn.Conv1d(256, 128, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm1d(128)
        
        self.fc = nn.Sequential(
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(64, num_labels)
        )
    
    def forward(self, input_values, attention_mask=None, labels=None):
        # Encode
        out = self.encoder(input_values=input_values, attention_mask=attention_mask)
        hidden = out.last_hidden_state  # [B, T, 768]
        
        # CNN: transpose [B, T, 768] -> [B, 768, T]
        x = hidden.transpose(1, 2)
        
        # Conv blocks
        x = self.conv1(x)
        x = self.bn1(x)
        x = F.relu(x)
        
        x = self.conv2(x)
        x = self.bn2(x)
        x = F.relu(x)
        
        # Global max pooling
        x = torch.max(x, dim=2).values  # [B, 128]
        
        # Classify
        logits = self.fc(x)
        
        loss = None
        if labels is not None:
            loss = nn.CrossEntropyLoss()(logits, labels)
        
        return {'logits': logits, 'loss': loss}

# --- MODEL 3: Whisper + Attention ---
class WhisperAttentionClassifier(nn.Module):
    def __init__(self, num_labels=4):
        super().__init__()
        self.encoder = WhisperModel.from_pretrained("openai/whisper-tiny").encoder
        hidden_size = 384  # Whisper-tiny
        
        # Attention layer
        self.attn_query = nn.Linear(hidden_size, 1, bias=False)
        
        # Classification head
        self.fc = nn.Sequential(
            nn.Linear(hidden_size, 128),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(128, num_labels)
        )
    
    def forward(self, input_features, labels=None):
        # Encode
        out = self.encoder(input_features=input_features)
        hidden = out.last_hidden_state  # [B, T, 384]
        
        # Attention
        attn_scores = self.attn_query(hidden)  # [B, T, 1]
        attn_weights = F.softmax(attn_scores, dim=1)  # [B, T, 1]
        
        # Context vector
        context = (attn_weights * hidden).sum(dim=1)  # [B, 384]
        
        # Classify
        logits = self.fc(context)
        
        loss = None
        if labels is not None:
            loss = nn.CrossEntropyLoss()(logits, labels)
        
        return {'logits': logits, 'loss': loss}



In [None]:
def plot_confusion_matrix(self, labels, preds, normalize=True):
    cm = confusion_matrix(labels, preds)

    if normalize:
        cm = cm.astype("float") / cm.sum(axis=1, keepdims=True)

    plt.figure(figsize=(6, 5))
    sns.heatmap(
        cm,
        annot=True,
        fmt=".2f" if normalize else "d",
        cmap="Blues",
        xticklabels=EMOTION_CLASSES,
        yticklabels=EMOTION_CLASSES
    )
    plt.xlabel("Predicted Label")
    plt.ylabel("True Label")
    plt.title(f"{self.model_name} - Confusion Matrix")

    plt.tight_layout()
    plt.savefig(f"{OUTPUT_DIR}/{self.model_name}_confusion_matrix.png")
    plt.close()


def plot_learning_curves(self):
    epochs = range(1, len(self.history['train_loss']) + 1)

    plt.figure(figsize=(14, 4))

    # Loss
    plt.subplot(1, 3, 1)
    plt.plot(epochs, self.history['train_loss'], marker='o', label='Train')
    plt.plot(epochs, self.history['val_loss'], marker='o', label='Val')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Loss')
    plt.legend()

    # Accuracy
    plt.subplot(1, 3, 2)
    plt.plot(epochs, self.history['train_acc'], marker='o', label='Train')
    plt.plot(epochs, self.history['val_acc'], marker='o', label='Val')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.title('Accuracy')
    plt.legend()

    # F1
    plt.subplot(1, 3, 3)
    plt.plot(epochs, self.history['train_f1'], marker='o', label='Train')
    plt.plot(epochs, self.history['val_f1'], marker='o', label='Val')
    plt.xlabel('Epoch')
    plt.ylabel('F1-score')
    plt.title('F1-score')
    plt.legend()

    plt.tight_layout()
    plt.savefig(f"{OUTPUT_DIR}/{self.model_name}_learning_curve.png")
    plt.close()


In [None]:
print("\nDefining training function...")

import matplotlib.pyplot as plt

class Trainer:
    def __init__(self, model, train_loader, val_loader, optimizer, device, model_name, patience=5):
        self.model = model
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.optimizer = optimizer
        self.device = device
        self.model_name = model_name
        self.patience = patience
        self.history = {
            'train_loss': [], 'train_acc': [], 'train_f1': [],
            'val_loss': [], 'val_acc': [], 'val_f1': []
        }
        self.best_val_acc = 0
        self.patience_counter = 0
        self.last_val_preds = None
        self.last_val_labels = None

    
    def train_epoch(self):
        self.model.train()
        train_loss = 0
        all_preds = []
        all_labels = []
        
        for batch in tqdm(self.train_loader, desc="Training", leave=False):
            batch = {k: v.to(self.device) for k, v in batch.items() if k != 'attention_mask'}
            
            self.optimizer.zero_grad()
            outputs = self.model(**batch)
            loss = outputs['loss']
            
            loss.backward()
            torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
            self.optimizer.step()
            
            train_loss += loss.item()
            preds = torch.argmax(outputs['logits'], dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(batch['labels'].cpu().numpy())
        
        train_acc = accuracy_score(all_labels, all_preds)
        train_f1 = f1_score(all_labels, all_preds, average='macro', zero_division=0)
        
        return train_loss / len(self.train_loader), train_acc, train_f1
    
    def eval_epoch(self):
        self.model.eval()
        val_loss = 0
        all_preds = []
        all_labels = []
        
        with torch.no_grad():
            for batch in tqdm(self.val_loader, desc="Evaluating", leave=False):
                batch = {k: v.to(self.device) for k, v in batch.items() if k != 'attention_mask'}
                
                outputs = self.model(**batch)
                loss = outputs['loss']
                
                val_loss += loss.item()
                preds = torch.argmax(outputs['logits'], dim=1)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(batch['labels'].cpu().numpy())
        
        val_acc = accuracy_score(all_labels, all_preds)
        val_f1 = f1_score(all_labels, all_preds, average='macro', zero_division=0)
        
        return val_loss / len(self.val_loader), val_acc, val_f1, all_preds, all_labels
    
    def plot_learning_curves(self):
        epochs = range(1, len(self.history['train_loss']) + 1)
        
        plt.figure(figsize=(12, 4))
        
        # --------- Loss ----------
        plt.subplot(1, 3, 1)
        plt.plot(epochs, self.history['train_loss'], label='Train Loss')
        plt.plot(epochs, self.history['val_loss'], label='Val Loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.title(f'{self.model_name} - Loss')
        plt.legend()
        
        # --------- Accuracy ----------
        plt.subplot(1, 3, 2)
        plt.plot(epochs, self.history['train_acc'], label='Train Acc')
        plt.plot(epochs, self.history['val_acc'], label='Val Acc')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.title(f'{self.model_name} - Accuracy')
        plt.legend()
        
        # --------- F1 ----------
        plt.subplot(1, 3, 3)
        plt.plot(epochs, self.history['train_f1'], label='Train F1')
        plt.plot(epochs, self.history['val_f1'], label='Val F1')
        plt.xlabel('Epoch')
        plt.ylabel('F1-score')
        plt.title(f'{self.model_name} - F1')
        plt.legend()
        
        plt.tight_layout()
        plt.savefig(f"{OUTPUT_DIR}/{self.model_name}_learning_curve.png")
        plt.close()
    
    def train(self, epochs):
        for epoch in range(epochs):
            train_loss, train_acc, train_f1 = self.train_epoch()
            val_loss, val_acc, val_f1, _, _ = self.eval_epoch()
            
            self.history['train_loss'].append(train_loss)
            self.history['train_acc'].append(train_acc)
            self.history['train_f1'].append(train_f1)
            self.history['val_loss'].append(val_loss)
            self.history['val_acc'].append(val_acc)
            self.history['val_f1'].append(val_f1)
            
            print(f"Epoch {epoch+1}/{epochs} | "
                  f"Train Loss: {train_loss:.4f} Acc: {train_acc:.4f} F1: {train_f1:.4f} | "
                  f"Val Loss: {val_loss:.4f} Acc: {val_acc:.4f} F1: {val_f1:.4f}")
            
            # Early stopping
            if val_acc > self.best_val_acc:
                self.best_val_acc = val_acc
                self.patience_counter = 0
                torch.save(self.model.state_dict(), f"{OUTPUT_DIR}/{self.model_name}_best.pt")
            else:
                self.patience_counter += 1
                if self.patience_counter >= self.patience:
                    print(f"Early stopping at epoch {epoch+1}")
                    break
        
        # vẽ learning curve sau khi train xong
        self.plot_learning_curves()
        
        return self.history


In [None]:
# --- Freezing backbone và unfreeze 3 layers cuối ---

def freeze_backbone_except_last_n(model, n=3, model_type='wav2vec'):
    """
    model: nn.Module (Wav2Vec2Classifier / HubertCNNClassifier / WhisperAttentionClassifier)
    n: số layer cuối để unfreeze
    model_type: 'wav2vec', 'hubert', 'whisper'
    """
    #todo demo
    if model_type == 'wav2vec':
        for param in model.encoder.parameters():
            param.requires_grad = False
        for layer in model.encoder.encoder.layers[-n:]:
            for param in layer.parameters():
                param.requires_grad = True
        for param in model.classifier.parameters():
            param.requires_grad = True
    
    elif model_type == 'hubert':
        for param in model.encoder.parameters():
            param.requires_grad = False
        for layer in model.encoder.encoder.layers[-n:]:
            for param in layer.parameters():
                param.requires_grad = True
        for param in model.fc.parameters():
            param.requires_grad = True
    
    elif model_type == 'whisper':
        for param in model.encoder.parameters():
            param.requires_grad = False
        # mở n layer cuối encoder
        for layer in model.encoder.layers[-n:]:
            for param in layer.parameters():
                param.requires_grad = True
        for param in model.fc.parameters():
            param.requires_grad = True

In [None]:
print("\n" + "="*60)
print("STEP 6: SINGLE RANDOM SHUFFLE SPLIT ON PHASE1 (NO STRATIFY)")
print("="*60)

# ============================================================
# SINGLE RANDOM SPLIT (ONE TIME ONLY)
# ============================================================
test_size = 0.2  # 20% for evaluation
n_samples = len(df_phase1)
n_eval = int(n_samples * test_size)

rng = np.random.RandomState(CONFIG['random_seed'])
indices = rng.permutation(n_samples)

eval_idx = indices[:n_eval]
train_idx = indices[n_eval:]

df_train_split = df_phase1.iloc[train_idx].reset_index(drop=True)
df_eval_split = df_phase1.iloc[eval_idx].reset_index(drop=True)

print(f"Train: {len(df_train_split)} samples")
print(f"Eval: {len(df_eval_split)} samples")

# ============================================================
# INIT RESULT CONTAINER (NO LISTS)
# ============================================================
results = {
    'wav2vec': {},
    'hubert': {},
    'whisper': {}
}

# ============================================================
# MODEL 1: WAV2VEC 2.0
# ============================================================
print(f"\n{'*'*40}")
print("MODEL 1: WAV2VEC 2.0")
print(f"{'*'*40}")

dataset_train = AudioEmotionDataset(
    df_train_split, processor=wav2vec_processor, mode='wav2vec'
)
dataset_eval = AudioEmotionDataset(
    df_eval_split, processor=wav2vec_processor, mode='wav2vec'
)

loader_train = DataLoader(
    dataset_train,
    batch_size=CONFIG['batch_size'],
    shuffle=True,
    collate_fn=collate_wav2vec
)
loader_eval = DataLoader(
    dataset_eval,
    batch_size=CONFIG['batch_size'],
    shuffle=False,
    collate_fn=collate_wav2vec
)

model1 = Wav2VecClassifier(num_labels=4).to(DEVICE)
freeze_backbone_except_last_n(model1, n=1, model_type='wav2vec')

optimizer1 = torch.optim.AdamW(
    model1.parameters(),
    lr=CONFIG['learning_rate'],
    weight_decay=CONFIG['weight_decay']
)

trainer1 = Trainer(
    model1,
    loader_train,
    loader_eval,
    optimizer1,
    DEVICE,
    "wav2vec_single_split",
    patience=CONFIG['patience']
)
trainer1.train(CONFIG['epochs'])

model1.eval()
all_preds, all_labels = [], []

with torch.no_grad():
    for batch in loader_eval:
        batch = {k: v.to(DEVICE) for k, v in batch.items() if k != 'attention_mask'}
        outputs = model1(**batch)
        preds = torch.argmax(outputs['logits'], dim=1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(batch['labels'].cpu().numpy())

results['wav2vec']['accuracy'] = accuracy_score(all_labels, all_preds)
results['wav2vec']['precision'] = precision_score(all_labels, all_preds, average='macro', zero_division=0)
results['wav2vec']['recall'] = recall_score(all_labels, all_preds, average='macro', zero_division=0)
results['wav2vec']['f1'] = f1_score(all_labels, all_preds, average='macro', zero_division=0)

print(f"WAV2VEC Acc: {results['wav2vec']['accuracy']:.4f} | "
      f"F1: {results['wav2vec']['f1']:.4f}")

# ============================================================
# MODEL 2: HUBERT + CNN
# ============================================================
print(f"\n{'*'*40}")
print("MODEL 2: HUBERT + CNN")
print(f"{'*'*40}")

dataset_train = AudioEmotionDataset(
    df_train_split, feature_extractor=hubert_extractor, mode='hubert'
)
dataset_eval = AudioEmotionDataset(
    df_eval_split, feature_extractor=hubert_extractor, mode='hubert'
)

loader_train = DataLoader(
    dataset_train,
    batch_size=CONFIG['batch_size'],
    shuffle=True,
    collate_fn=collate_hubert
)
loader_eval = DataLoader(
    dataset_eval,
    batch_size=CONFIG['batch_size'],
    shuffle=False,
    collate_fn=collate_hubert
)

model2 = HubertCNNClassifier(num_labels=4).to(DEVICE)
freeze_backbone_except_last_n(model2, n=5, model_type='hubert')

optimizer2 = torch.optim.AdamW(
    model2.parameters(),
    lr=CONFIG['learning_rate'],
    weight_decay=CONFIG['weight_decay']
)

trainer2 = Trainer(
    model2,
    loader_train,
    loader_eval,
    optimizer2,
    DEVICE,
    "hubert_single_split",
    patience=CONFIG['patience']
)
trainer2.train(CONFIG['epochs'])

model2.eval()
all_preds, all_labels = [], []

with torch.no_grad():
    for batch in loader_eval:
        batch = {k: v.to(DEVICE) for k, v in batch.items() if k != 'attention_mask'}
        outputs = model2(**batch)
        preds = torch.argmax(outputs['logits'], dim=1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(batch['labels'].cpu().numpy())

results['hubert']['accuracy'] = accuracy_score(all_labels, all_preds)
results['hubert']['precision'] = precision_score(all_labels, all_preds, average='macro', zero_division=0)
results['hubert']['recall'] = recall_score(all_labels, all_preds, average='macro', zero_division=0)
results['hubert']['f1'] = f1_score(all_labels, all_preds, average='macro', zero_division=0)

print(f"HUBERT Acc: {results['hubert']['accuracy']:.4f} | "
      f"F1: {results['hubert']['f1']:.4f}")

# ============================================================
# MODEL 3: WHISPER + ATTENTION
# ============================================================
print(f"\n{'*'*40}")
print("MODEL 3: WHISPER + ATTENTION")
print(f"{'*'*40}")

dataset_train = AudioEmotionDataset(
    df_train_split, feature_extractor=whisper_extractor, mode='whisper'
)
dataset_eval = AudioEmotionDataset(
    df_eval_split, feature_extractor=whisper_extractor, mode='whisper'
)

loader_train = DataLoader(
    dataset_train,
    batch_size=CONFIG['batch_size'],
    shuffle=True,
    collate_fn=collate_whisper
)
loader_eval = DataLoader(
    dataset_eval,
    batch_size=CONFIG['batch_size'],
    shuffle=False,
    collate_fn=collate_whisper
)

model3 = WhisperAttentionClassifier(num_labels=4).to(DEVICE)
freeze_backbone_except_last_n(model3, n=3, model_type='whisper')

optimizer3 = torch.optim.AdamW(
    model3.parameters(),
    lr=CONFIG['learning_rate'],
    weight_decay=CONFIG['weight_decay']
)

trainer3 = Trainer(
    model3,
    loader_train,
    loader_eval,
    optimizer3,
    DEVICE,
    "whisper_single_split",
    patience=CONFIG['patience']
)
trainer3.train(CONFIG['epochs'])

model3.eval()
all_preds, all_labels = [], []

with torch.no_grad():
    for batch in loader_eval:
        batch = {k: v.to(DEVICE) for k, v in batch.items()}
        outputs = model3(**batch)
        preds = torch.argmax(outputs['logits'], dim=1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(batch['labels'].cpu().numpy())

results['whisper']['accuracy'] = accuracy_score(all_labels, all_preds)
results['whisper']['precision'] = precision_score(all_labels, all_preds, average='macro', zero_division=0)
results['whisper']['recall'] = recall_score(all_labels, all_preds, average='macro', zero_division=0)
results['whisper']['f1'] = f1_score(all_labels, all_preds, average='macro', zero_division=0)

print(f"WHISPER Acc: {results['whisper']['accuracy']:.4f} | "
      f"F1: {results['whisper']['f1']:.4f}")

# ============================================================
# SUMMARY + SAVE
# ============================================================
print("\n" + "="*60)
print("SUMMARY: SINGLE RANDOM HOLD-OUT RESULTS")
print("="*60)

for model, metrics in results.items():
    print(f"\n{model.upper()}:")
    for k, v in metrics.items():
        print(f"  {k:10s}: {v:.4f}")

with open(f"{OUTPUT_DIR}/single_random_holdout_results.json", "w") as f:
    json.dump(results, f, indent=2)

print(f"\nResults saved to: {OUTPUT_DIR}/single_random_holdout_results.json")


In [None]:

# ============================================================
# SUMMARY + SAVE
# ============================================================
print("\n" + "="*60)
print("SUMMARY: SINGLE RANDOM HOLD-OUT RESULTS")
print("="*60)

for model, metrics in results.items():
    print(f"\n{model.upper()}:")
    for k, v in metrics.items():
        print(f"  {k:10s}: {v:.4f}")

with open(f"{OUTPUT_DIR}/single_random_holdout_results.json", "w") as f:
    json.dump(results, f, indent=2)

print(f"\nResults saved to: {OUTPUT_DIR}/single_random_holdout_results.json")


In [None]:
vis_data = []

for model_name in ['wav2vec', 'hubert', 'whisper']:
    for metric in ['accuracy', 'precision', 'recall', 'f1']:
        vis_data.append({
            'Model': model_name.upper(),
            'Metric': metric,
            'Value': results[model_name][metric]
        })

df_vis = pd.DataFrame(vis_data)

plt.figure(figsize=(10, 6))
sns.barplot(data=df_vis, x='Metric', y='Value', hue='Model')
plt.ylim(0, 1)
plt.title("Evaluation Metrics Comparison Across Models (Single Hold-out)")
plt.ylabel("Score")
plt.xlabel("Metric")
plt.legend(title="Model")
plt.tight_layout()
plt.show()


In [None]:
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
import torch

def make_test_loader(df, processor, collate_fn, batch_size=16):
    dataset = AudioEmotionDataset(df, processor=processor, mode=processor.mode)
    return DataLoader(dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

test_loaders = {
    'Wav2Vec2': DataLoader(AudioEmotionDataset(df_test, processor=wav2vec_processor, mode='wav2vec'),
                            batch_size=CONFIG['batch_size'], shuffle=False, collate_fn=collate_wav2vec),
    'HubertCNN': DataLoader(AudioEmotionDataset(df_test, feature_extractor=hubert_extractor, mode='hubert'),
                            batch_size=CONFIG['batch_size'], shuffle=False, collate_fn=collate_hubert),
    'WhisperAttention': DataLoader(AudioEmotionDataset(df_test, feature_extractor=whisper_extractor, mode='whisper'),
                                   batch_size=CONFIG['batch_size'], shuffle=False, collate_fn=collate_whisper)
}

models = {
    'Wav2Vec2': model1,
    'HubertCNN': model2,
    'WhisperAttention': model3
}

# Đánh giá trên test
for model_name, model in models.items():
    model.eval()
    all_preds, all_labels = [], []

    loader = test_loaders[model_name]
    with torch.no_grad():
        for batch in loader:
            batch_device = {k: v.to(DEVICE) for k, v in batch.items() if k != 'attention_mask'}
            outputs = model(**batch_device)
            preds = torch.argmax(outputs['logits'], dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(batch['labels'].cpu().numpy())

    # Confusion matrix
    cm = confusion_matrix(all_labels, all_preds)
    cm_norm = cm.astype("float") / cm.sum(axis=1, keepdims=True)

    plt.figure(figsize=(6,5))
    sns.heatmap(cm_norm, annot=True, fmt=".2f", cmap="Blues",
                xticklabels=EMOTION_CLASSES, yticklabels=EMOTION_CLASSES)
    plt.xlabel("Predicted Label")
    plt.ylabel("True Label")
    plt.title(f"{model_name} - Test Confusion Matrix")
    plt.tight_layout()
    plt.savefig(f"{OUTPUT_DIR}/{model_name}_test_confusion_matrix.png")
    plt.show()


In [None]:
from IPython.display import Image, display

for model_name in ['wav2vec_single_split', 'hubert_single_split', 'whisper_single_split']:
    print(f"=== {model_name} ===")
    display(Image(filename=f"{OUTPUT_DIR}/{model_name}_learning_curve.png"))
 #   display(Image(filename=f"{OUTPUT_DIR}/{model_name}_confusion_matrix.png"))


In [None]:
CONFIG['batch_size'] = 2
CONFIG['epochs'] = 6


print("\n" + "="*60)
print("PHASE 2: CONTINUED TRAINING ON DF_PHASE2 (IN-MEMORY)")
print("="*60)

df_train_split = df_phase2.iloc[train_idx].reset_index(drop=True)
df_eval_split  = df_phase2.iloc[eval_idx].reset_index(drop=True)

print(f"Phase2 Train: {len(df_train_split)} samples")
print(f"Phase2 Eval : {len(df_eval_split)} samples")

print("\n***** PHASE 2: WAV2VEC *****")

dataset_train = AudioEmotionDataset(
    df_train_split, processor=wav2vec_processor, mode='wav2vec'
)
dataset_eval = AudioEmotionDataset(
    df_eval_split, processor=wav2vec_processor, mode='wav2vec'
)

trainer1.train_loader = DataLoader(
    dataset_train,
    batch_size=CONFIG['batch_size'],
    shuffle=True,
    collate_fn=collate_wav2vec
)

trainer1.val_loader = DataLoader(
    dataset_eval,
    batch_size=CONFIG['batch_size'],
    shuffle=False,
    collate_fn=collate_wav2vec
)

trainer1.model_name = "wav2vec_phase2_continued"
trainer1.train(CONFIG['epochs'] // 2)


In [None]:

CONFIG['epochs'] = 4

CONFIG['batch_size'] = 2

print("\n***** PHASE 2: HUBERT *****")

dataset_train = AudioEmotionDataset(
    df_train_split, feature_extractor=hubert_extractor, mode='hubert'
)
dataset_eval = AudioEmotionDataset(
    df_eval_split, feature_extractor=hubert_extractor, mode='hubert'
)

trainer2.train_loader = DataLoader(
    dataset_train,
    batch_size=CONFIG['batch_size'],
    shuffle=True,
    collate_fn=collate_hubert
)

trainer2.val_loader = DataLoader(
    dataset_eval,
    batch_size=CONFIG['batch_size'],
    shuffle=False,
    collate_fn=collate_hubert
)

trainer2.model_name = "hubert_phase2_continued"
trainer2.train(CONFIG['epochs'] // 2)


print("\n***** PHASE 2: WHISPER *****")

dataset_train = AudioEmotionDataset(
    df_train_split, feature_extractor=whisper_extractor, mode='whisper'
)
dataset_eval = AudioEmotionDataset(
    df_eval_split, feature_extractor=whisper_extractor, mode='whisper'
)

trainer3.train_loader = DataLoader(
    dataset_train,
    batch_size=CONFIG['batch_size'],
    shuffle=True,
    collate_fn=collate_whisper
)

trainer3.val_loader = DataLoader(
    dataset_eval,
    batch_size=CONFIG['batch_size'],
    shuffle=False,
    collate_fn=collate_whisper
)

trainer3.model_name = "whisper_phase2_continued"
trainer3.train(CONFIG['epochs'] // 2)



In [None]:
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
import torch

def make_test_loader(df, processor, collate_fn, batch_size=16):
    dataset = AudioEmotionDataset(df, processor=processor, mode=processor.mode)
    return DataLoader(dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

test_loaders = {
    'Wav2Vec2': DataLoader(AudioEmotionDataset(df_test, processor=wav2vec_processor, mode='wav2vec'),
                            batch_size=CONFIG['batch_size'], shuffle=False, collate_fn=collate_wav2vec),
    'HubertCNN': DataLoader(AudioEmotionDataset(df_test, feature_extractor=hubert_extractor, mode='hubert'),
                            batch_size=CONFIG['batch_size'], shuffle=False, collate_fn=collate_hubert),
    'WhisperAttention': DataLoader(AudioEmotionDataset(df_test, feature_extractor=whisper_extractor, mode='whisper'),
                                   batch_size=CONFIG['batch_size'], shuffle=False, collate_fn=collate_whisper)
}

models = {
    'Wav2Vec2': model1,
    'HubertCNN': model2,
    'WhisperAttention': model3
}

# Đánh giá trên test
for model_name, model in models.items():
    model.eval()
    all_preds, all_labels = [], []

    loader = test_loaders[model_name]
    with torch.no_grad():
        for batch in loader:
            batch_device = {k: v.to(DEVICE) for k, v in batch.items() if k != 'attention_mask'}
            outputs = model(**batch_device)
            preds = torch.argmax(outputs['logits'], dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(batch['labels'].cpu().numpy())

    # Confusion matrix
    cm = confusion_matrix(all_labels, all_preds)
    cm_norm = cm.astype("float") / cm.sum(axis=1, keepdims=True)

    plt.figure(figsize=(6,5))
    sns.heatmap(cm_norm, annot=True, fmt=".2f", cmap="Blues",
                xticklabels=EMOTION_CLASSES, yticklabels=EMOTION_CLASSES)
    plt.xlabel("Predicted Label")
    plt.ylabel("True Label")
    plt.title(f"{model_name} - Test Confusion Matrix")
    plt.tight_layout()
    plt.savefig(f"{OUTPUT_DIR}/{model_name}_phase2_test_confusion_matrix.png")
    plt.show()


In [None]:
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score
import matplotlib.pyplot as plt
import seaborn as sns
import torch
from torch.utils.data import DataLoader

# --- Hàm tạo DataLoader cho test ---
def make_test_loader(df, processor, collate_fn, batch_size=16):
    dataset = AudioEmotionDataset(df, processor=processor, mode=processor.mode)
    return DataLoader(dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

# --- Test loaders cho 3 mô hình ---
test_loaders = {
    'Wav2Vec2': DataLoader(
        AudioEmotionDataset(df_test, processor=wav2vec_processor, mode='wav2vec'),
        batch_size=CONFIG['batch_size'], shuffle=False, collate_fn=collate_wav2vec
    ),
    'HubertCNN': DataLoader(
        AudioEmotionDataset(df_test, feature_extractor=hubert_extractor, mode='hubert'),
        batch_size=CONFIG['batch_size'], shuffle=False, collate_fn=collate_hubert
    ),
    'WhisperAttention': DataLoader(
        AudioEmotionDataset(df_test, feature_extractor=whisper_extractor, mode='whisper'),
        batch_size=CONFIG['batch_size'], shuffle=False, collate_fn=collate_whisper
    )
}

# --- Các model ---
models = {
    'Wav2Vec2': model1,
    'HubertCNN': model2,
    'WhisperAttention': model3
}

# --- In summary header ---
print("SUMMARY: SINGLE RANDOM HOLD-OUT RESULTS")
print("="*60)

# --- Đánh giá ---
for model_name, model in models.items():
    model.eval()
    all_preds, all_labels = [], []

    loader = test_loaders[model_name]
    with torch.no_grad():
        for batch in loader:
            # Chuyển sang device
            batch_device = {k: v.to(DEVICE) for k, v in batch.items() if k != 'attention_mask'}
            outputs = model(**batch_device)
            preds = torch.argmax(outputs['logits'], dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(batch['labels'].cpu().numpy())

    # Tính metric
    acc = accuracy_score(all_labels, all_preds)
    prec = precision_score(all_labels, all_preds, average='macro', zero_division=0)
    rec = recall_score(all_labels, all_preds, average='macro', zero_division=0)
    f1 = f1_score(all_labels, all_preds, average='macro', zero_division=0)

    # In kết quả
    print(f"{model_name}:")
    print(f"  accuracy  : {acc:.4f}")
    print(f"  precision : {prec:.4f}")
    print(f"  recall    : {rec:.4f}")
    print(f"  f1        : {f1:.4f}")
    print()

    # Confusion matrix normalized
    cm = confusion_matrix(all_labels, all_preds)
    cm_norm = cm.astype("float") / cm.sum(axis=1, keepdims=True)

    plt.figure(figsize=(6,5))
    sns.heatmap(cm_norm, annot=True, fmt=".2f", cmap="Blues",
                xticklabels=EMOTION_CLASSES, yticklabels=EMOTION_CLASSES)
    plt.xlabel("Predicted Label")
    plt.ylabel("True Label")
    plt.title(f"{model_name} - Test Confusion Matrix")
    plt.tight_layout()
    plt.savefig(f"{OUTPUT_DIR}/{model_name}_phase2_test_confusion_matrix.png")
    plt.show()


In [None]:
from IPython.display import Image, display

for model_name in ['wav2vec_phase2_continued', 'hubert_phase2_continued', 'whisper_phase2_continued']:
    print(f"=== {model_name} ===")
    display(Image(filename=f"{OUTPUT_DIR}/{model_name}_learning_curve.png"))
    # display(Image(filename=f"{OUTPUT_DIR}/{model_name}_confusion_matrix.png"))


In [None]:
# Đổi tên hiển thị và tên file
trainer1.model_name = "Wav2Vec2"  # bỏ _phase2_continued

# Vẽ lại learning curve từ history cũ
trainer1.plot_learning_curves()

# Hiển thị
from IPython.display import Image, display
display(Image(filename=f"{OUTPUT_DIR}/{trainer1.model_name}_learning_curve.png"))

# Đổi tên hiển thị và tên file
trainer2.model_name = "HubertCNN"  # bỏ _phase2_continued

# Vẽ lại learning curve từ history cũ
trainer2.plot_learning_curves()

# Hiển thị
from IPython.display import Image, display
display(Image(filename=f"{OUTPUT_DIR}/{trainer2.model_name}_learning_curve.png"))



# Đổi tên hiển thị và tên file
trainer3.model_name = "WhisperAttention"  # bỏ _phase2_continued

# Vẽ lại learning curve từ history cũ
trainer3.plot_learning_curves()

# Hiển thị
from IPython.display import Image, display
display(Image(filename=f"{OUTPUT_DIR}/{trainer3.model_name}_learning_curve.png"))



In [None]:
import matplotlib.pyplot as plt

# Giả sử bạn vẫn còn trainer với history cũ
trainer.model_name = "model1"  # tên mới hiển thị trên biểu đồ

epochs = range(1, len(trainer.history['train_loss']) + 1)

plt.figure(figsize=(12, 4))

# Loss
plt.subplot(1, 3, 1)
plt.plot(epochs, trainer.history['train_loss'], label='Train Loss')
plt.plot(epochs, trainer.history['val_loss'], label='Val Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title(f'{trainer.model_name} - Loss')  # tên mới
plt.legend()

# Accuracy
plt.subplot(1, 3, 2)
plt.plot(epochs, trainer.history['train_acc'], label='Train Acc')
plt.plot(epochs, trainer.history['val_acc'], label='Val Acc')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title(f'{trainer.model_name} - Accuracy')
plt.legend()

# F1
plt.subplot(1, 3, 3)
plt.plot(epochs, trainer.history['train_f1'], label='Train F1')
plt.plot(epochs, trainer.history['val_f1'], label='Val F1')
plt.xlabel('Epoch')
plt.ylabel('F1-score')
plt.title(f'{trainer.model_name} - F1')
plt.legend()

plt.tight_layout()
plt.savefig(f"{OUTPUT_DIR}/{trainer.model_name}_learning_curve.png")  # lưu file mới
plt.show()


In [None]:
from sklearn.metrics import classification_report

# --- Đánh giá từng model ---
for model_name, model in models.items():
    model.eval()
    all_preds, all_labels = [], []

    loader = test_loaders[model_name]
    with torch.no_grad():
        for batch in loader:
            batch_device = {k: v.to(DEVICE) for k, v in batch.items() if k != 'attention_mask'}
            outputs = model(**batch_device)
            preds = torch.argmax(outputs['logits'], dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(batch['labels'].cpu().numpy())

    # --- Metric tổng quát ---
    acc = accuracy_score(all_labels, all_preds)
    prec = precision_score(all_labels, all_preds, average='macro', zero_division=0)
    rec = recall_score(all_labels, all_preds, average='macro', zero_division=0)
    f1 = f1_score(all_labels, all_preds, average='macro', zero_division=0)

    print("="*40)
    print(f"{model_name} - SUMMARY: SINGLE RANDOM HOLD-OUT RESULTS")
    print("="*40)
    print(f"accuracy  : {acc:.4f}")
    print(f"precision : {prec:.4f}")
    print(f"recall    : {rec:.4f}")
    print(f"f1        : {f1:.4f}")
    print()

    # --- Classification report chi tiết từng lớp ---
    print("="*40)
    print(f"{model_name} - BÁO CÁO CHI TIẾT TỪNG LỚP")
    print("="*40)
    print(classification_report(all_labels, all_preds, target_names=EMOTION_CLASSES, digits=2))
    print()


In [None]:
import os
import torch

# Tạo thư mục final nếu chưa có
FINAL_DIR = "final"
os.makedirs(FINAL_DIR, exist_ok=True)

# Lưu mô hình
torch.save(model1.state_dict(), os.path.join(FINAL_DIR, "wav2vec_phase2_final.pt"))
torch.save(model2.state_dict(), os.path.join(FINAL_DIR, "hubert_phase2_final.pt"))
torch.save(model3.state_dict(), os.path.join(FINAL_DIR, "whisper_phase2_final.pt"))

print(f"Models saved to {FINAL_DIR}/")
