In [30]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

import librosa
import numpy as np
import pandas as pd

from transformers import AutoTokenizer, AutoModel
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split

import os

In [31]:
# # Set your speech audio folder
# audio_dir = 'dataset/speech'
# data = []

# # Go through each WAV file
# for file in os.listdir(audio_dir):
#     if file.endswith('.wav'):
#         parts = file.split('_')
#         if len(parts) == 3:
#             word = parts[1]
#             emotion = parts[2].replace('.wav', '')
#             data.append({
#                 'word': word,
#                 'emotion': emotion,
#                 'speech_path': os.path.join(audio_dir, file)
#             })

# # Create and save the DataFrame
# df = pd.DataFrame(data)
# df.to_csv('speech_word_dataset.csv', index=False)
# print(df.head())


In [32]:
# -----------------------
# 1. Preprocessing
# -----------------------

def extract_mfcc(file_path, n_mfcc=40, max_len=100):
    y, sr = librosa.load(file_path, sr=None)
    mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=n_mfcc)
    delta = librosa.feature.delta(mfcc)
    delta2 = librosa.feature.delta(mfcc, order=2)
    combined = np.vstack([mfcc, delta, delta2]).T  # (time, feature_dim)

    if combined.shape[0] < max_len:
        pad_width = max_len - combined.shape[0]
        combined = np.pad(combined, ((0, pad_width), (0, 0)), mode='constant')
    else:
        combined = combined[:max_len, :]
    return (combined - np.mean(combined)) / np.std(combined)

In [33]:
# # Load CSV
df = pd.read_csv('speech_word_dataset.csv')

# # Encode emotions
label_encoder = LabelEncoder()
df['label'] = label_encoder.fit_transform(df['emotion'])

# # Create word2idx dictionary for simple word embeddings
word_list = sorted(df['word'].unique())
word2idx = {word: idx for idx, word in enumerate(word_list)}


In [34]:
# -----------------------
# 2. Dataset
# -----------------------

class MultimodalDataset(Dataset):
    def __init__(self, df, word2idx, label_encoder, max_len=1):
        self.df = df
        self.word2idx = word2idx
        self.label_encoder = label_encoder
        self.max_len = max_len

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        # Text (word index)
        word_idx = self.word2idx[row['word']]
        word_tensor = torch.tensor([word_idx], dtype=torch.long)
        # Audio (MFCC)
        mfcc = extract_mfcc(row['speech_path'])
        # Label
        label = int(row['emotion'])
        return {
            'word': word_tensor,
            'mfcc': torch.tensor(mfcc, dtype=torch.float32),
            'label': torch.tensor(label)
        }

In [35]:
# -----------------------
# 3. Model
# -----------------------

class MultimodalEmotionModel(nn.Module):
    def __init__(self, vocab_size, embed_dim=128, lstm_hidden=128, audio_feature_dim=120, num_classes=5):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.lstm = nn.LSTM(embed_dim, lstm_hidden, batch_first=True)
        self.text_fc = nn.Linear(lstm_hidden, 128)

        self.audio_cnn = nn.Sequential(
            nn.Conv1d(audio_feature_dim, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.AdaptiveAvgPool1d(1),
            nn.Flatten()
        )
        self.audio_fc = nn.Linear(128, 128)

        self.fusion = nn.Linear(256, 64)
        self.classifier = nn.Linear(64, num_classes)

    def forward(self, word, mfcc):
        x = self.embedding(word)
        _, (h_n, _) = self.lstm(x)
        text_embed = self.text_fc(h_n[-1])

        mfcc = mfcc.permute(0, 2, 1)
        audio_embed = self.audio_fc(self.audio_cnn(mfcc))

        combined = torch.cat((text_embed, audio_embed), dim=1)
        fusion_out = F.relu(self.fusion(combined))
        logits = self.classifier(fusion_out)
        return logits

In [36]:
# -----------------------
# 4. Training & Evaluation
# -----------------------

def train(model, dataloader, optimizer, criterion, device):
    model.train()
    total_loss = 0
    for batch in dataloader:
        word = batch['word'].to(device)
        mfcc = batch['mfcc'].to(device)
        labels = batch['label'].to(device)

        optimizer.zero_grad()
        outputs = model(word, mfcc)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(dataloader)

def evaluate(model, dataloader, criterion, device):
    model.eval()
    total_correct = 0
    total = 0
    total_loss = 0
    with torch.no_grad():
        for batch in dataloader:
            word = batch['word'].to(device)
            mfcc = batch['mfcc'].to(device)
            labels = batch['label'].to(device)

            outputs = model(word, mfcc)
            loss = criterion(outputs, labels)
            preds = torch.argmax(outputs, dim=1)
            total_correct += (preds == labels).sum().item()
            total += labels.size(0)
            total_loss += loss.item()
    acc = total_correct / total
    avg_loss = total_loss / len(dataloader)
    return acc, avg_loss


In [37]:
# -----------------------
# 5. Main Execution
# -----------------------

df = pd.read_csv("speech_word_dataset.csv")
label_encoder = LabelEncoder()
df['emotion'] = label_encoder.fit_transform(df['emotion'])

train_df, val_df = train_test_split(df, test_size=0.2, random_state=42)
word_list = sorted(df['word'].unique())
word2idx = {word: idx for idx, word in enumerate(word_list)}

train_dataset = MultimodalDataset(train_df, word2idx, label_encoder)
val_dataset = MultimodalDataset(val_df, word2idx, label_encoder)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = MultimodalEmotionModel(
    vocab_size=len(word2idx),
    num_classes=len(label_encoder.classes_)
).to(device)
optimizer = optim.AdamW(model.parameters(), lr=2e-5)
criterion = nn.CrossEntropyLoss()

for epoch in range(50):
    train_loss = train(model, train_loader, optimizer, criterion, device)
    # Calculate training accuracy
    train_acc, _ = evaluate(model, train_loader, criterion, device)
    val_acc, val_loss = evaluate(model, val_loader, criterion, device)
    print(f"Epoch {epoch+1}, "
          f"Train Loss: {train_loss:.4f}, "
          f"Train Acc: {train_acc*100:.2f}%, "
          f"Val Loss: {val_loss:.4f}, "
          f"Val Acc: {val_acc*100:.2f}%")


Epoch 1, Train Loss: 1.7883, Train Acc: 16.88%, Val Loss: 1.7915, Val Acc: 15.83%
Epoch 2, Train Loss: 1.7827, Train Acc: 16.88%, Val Loss: 1.7875, Val Acc: 15.83%
Epoch 3, Train Loss: 1.7766, Train Acc: 19.17%, Val Loss: 1.7822, Val Acc: 16.67%
Epoch 4, Train Loss: 1.7686, Train Acc: 31.04%, Val Loss: 1.7740, Val Acc: 25.00%
Epoch 5, Train Loss: 1.7574, Train Acc: 33.96%, Val Loss: 1.7626, Val Acc: 28.75%
Epoch 6, Train Loss: 1.7429, Train Acc: 34.27%, Val Loss: 1.7479, Val Acc: 30.00%
Epoch 7, Train Loss: 1.7236, Train Acc: 39.17%, Val Loss: 1.7278, Val Acc: 35.42%
Epoch 8, Train Loss: 1.6999, Train Acc: 43.65%, Val Loss: 1.7055, Val Acc: 41.25%
Epoch 9, Train Loss: 1.6705, Train Acc: 50.42%, Val Loss: 1.6755, Val Acc: 46.67%
Epoch 10, Train Loss: 1.6349, Train Acc: 52.81%, Val Loss: 1.6413, Val Acc: 50.83%
Epoch 11, Train Loss: 1.5935, Train Acc: 53.65%, Val Loss: 1.6000, Val Acc: 49.17%
Epoch 12, Train Loss: 1.5460, Train Acc: 54.69%, Val Loss: 1.5538, Val Acc: 52.50%
Epoch 13, Tra