### Feature extraction data

In [None]:
!pip install torch torchaudio pandas scikit-learn
import os
import torch
import torchaudio
import pandas as pd
import numpy as np
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from sklearn.preprocessing import LabelEncoder
from torch.nn import functional as F
from transformers import Wav2Vec2Model, Wav2Vec2Processor

In [None]:
def custom_collate_fn(batch):
    input_values = [item[0] for item in batch]
    labels = torch.stack([item[1] for item in batch])
    return input_values, labels

In [None]:
# load and test dataset
df = pd.read_csv('featuresAndLabels.csv')
df.head()

In [None]:
emotion_labels = ['A', 'D', 'F', 'S', 'H', 'N']
df['soft_label'] = df[emotion_labels].values.tolist()

In [None]:
# Handling class imbalance
df['dominant_label'] = df[emotion_labels].idxmax(axis=1)
class_counts = df['dominant_label'].value_counts().sort_index()

# class_counts is not negative
class_counts = class_counts.clip(lower=0)

# Add a small value to avoid division by zero
class_weights = (1. / (class_counts + 1e-6)).astype(np.float32)

# Map weights to samples and convert to PyTorch tensor
weights = torch.tensor(df['dominant_label'].map(class_weights).values, dtype=torch.float32)

# Replace NaN or infinite values with a small positive value
# For older PyTorch versions, use this:
weights[torch.isnan(weights)] = 1e-6
weights[torch.isinf(weights)] = 1e-6


weights = torch.clamp(weights, min=1e-6)

sampler = WeightedRandomSampler(weights, num_samples=len(weights), replacement=True)

In [None]:
from sklearn.preprocessing import LabelEncoder

# Initialize LabelEncoder
label_encoder = LabelEncoder()

df['dominant_label_encoded'] = label_encoder.fit_transform(df['dominant_label'])
df['soft_label_tokenized'] = df['soft_label'].apply(lambda x: ' '.join(map(str, x)))

# Now you can use df['dominant_label_encoded'] and df['soft_label_tokenized']
print(df[['dominant_label', 'dominant_label_encoded', 'soft_label', 'soft_label_tokenized']].head())

### Audio data

In [None]:
!pip install datasets
from google.colab import drive
from datasets import load_dataset

In [None]:
# Mount Google Drive
drive.mount('/content/drive')

# Set cache directory to a folder in your Google Drive, e.g., huggingface_cache
cache_dir = '/content/drive/MyDrive/CREMA-D'


# Load the dataset using the cache directory
ds = load_dataset('/content/drive/MyDrive/CREMA-D')

# Access the training split
data = ds["train"]

In [None]:
# test if crema d loaded correctly
file = [f for f in os.listdir(cache_dir) if f.endswith('.wav')]
audio = [torchaudio.load(os.path.join(cache_dir, f))[0] for f in file]
print(f"Total files: {len(file)}")
print("Sample file:", file[0])
print("Sample audio:", audio[0])

### Model

In [None]:
from torch.nn.utils.rnn import pad_sequence
def collate_fn(batch):
    input_values, labels = zip(*batch)

    # convert input_values to torch tensors
    input_values = [x if isinstance(x, torch.Tensor) else torch.tensor(x) for x in input_values]

    # Pad audio sequences
    input_values_padded = pad_sequence(input_values, batch_first=True)

    # stack multi labels
    labels = torch.stack([torch.tensor(label) if not isinstance(label, torch.Tensor) else label for label in labels])

    return input_values_padded, labels

In [None]:
class CremaAudioDataset(Dataset):
    def __init__(self, dataframe, audio_dir, processor):
        self.df = dataframe
        self.cache_dir = cache_dir
        self.processor = processor
        self.labels_df = df
        # CREMA-D on drive does not come with labels only file names
        # Hence needs to get a list directory using file index
        self.files = [f for f in os.listdir(self.cache_dir) if f.endswith('.wav')]

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        label_row = self.labels_df.iloc[idx]
        filename = self.files[idx]
        ath = os.path.join(self.cache_dir, filename)
        waveform, sr = torchaudio.load(ath)

        inputs = self.processor(waveform.squeeze(), sampling_rate=sr, return_tensors="pt", padding=True)
        input_values = inputs.input_values.squeeze(0)
        label = torch.tensor(label_row['soft_label'], dtype=torch.float)

        return input_values, label

In [None]:
class EmotionClassifier(torch.nn.Module):
    def __init__(self, num_emotions=6):
        super(EmotionClassifier, self).__init__()
        self.wav2vec = Wav2Vec2Model.from_pretrained("facebook/wav2vec2-base")
        self.classifier = torch.nn.Linear(self.wav2vec.config.hidden_size, num_emotions)

    def forward(self, input_values):
        features = self.wav2vec(input_values).last_hidden_state
        pooled = features.mean(dim=1)
        logits = self.classifier(pooled)
        # For KLDivLoss
        log_probs = F.log_softmax(logits, dim=1)
        return log_probs

In [None]:
# Create DataLoader
from transformers import Wav2Vec2Processor
# Initialize the processor
processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base")

# Create DataLoader
crema_dataset = CremaAudioDataset(data.to_pandas(), cache_dir, processor)
dataloader = DataLoader(crema_dataset, batch_size=4, collate_fn=collate_fn)

### Training

In [None]:
!pip install flash-attn --upgrade
from transformers import Wav2Vec2Model

model = EmotionClassifier(num_emotions=6)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

import torch.optim as optim
optimizer = optim.Adam(model.parameters(), lr=1e-3)

In [None]:
num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    total_loss = 0

    for batch in dataloader:
        input_values, labels = batch
        input_values = input_values.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs = model(input_values)
        # Use KLDivLoss for multi-label classification
        loss_fn = torch.nn.KLDivLoss(reduction='batchmean')
        loss = loss_fn(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    average_loss = total_loss / len(dataloader)
    print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {average_loss}")

# Save the trained model
torch.save(model.state_dict(), '/content/drive/MyDrive/emotion_classifier.pth')
        del input_values, labels, outputs, loss
        torch.cuda.empty_cache()

### Evaluation

In [None]:
def evaluate_model(model, dataloader, emotion_labels):
    model.eval()
    true_emotions = []
    pred_emotions = []
    true_strengths = []
    pred_strengths = []

    with torch.no_grad():
        for input_values, labels in dataloader:
            input_values = input_values.to("cuda")
            labels = labels.to("cuda")
            outputs = model(input_values)

            # Convert soft labels to class indices
            true_class = torch.argmax(labels, dim=1).cpu().numpy()
            pred_class = torch.argmax(outputs, dim=1).cpu().numpy()

            # Strength: low/medium/high based on probabilities in df
            batch_indices = range(len(true_emotions), len(true_emotions) + len(true_class))
            strength_true = df.iloc[batch_indices][['low', 'medium', 'high']].values
            strength_pred = strength_true.copy()  # Placeholder: assume perfect strength prediction
            strength_true_class = strength_true.argmax(axis=1)
            strength_pred_class = strength_pred.argmax(axis=1)

            true_emotions.extend(true_class)
            pred_emotions.extend(pred_class)
            true_strengths.extend(strength_true_class)
            pred_strengths.extend(strength_pred_class)
        # Removed 'loss' from del statement as it was not defined or used within the with block.
        del input_values, labels, outputs
        torch.cuda.empty_cache()
    # Return the evaluation results
    return true_emotions, pred_emotions, true_strengths, pred_strengths # Added return statement
        del input_values, labels, outputs, loss
        torch.cuda.empty_cache()

### Visualization

Strength prediction

In [None]:
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

# Call the evaluate_model function and store the results
true_emotions, pred_emotions, true_strengths, pred_strengths = evaluate_model(model, dataloader, emotion_labels=['A', 'D', 'F', 'S', 'H', 'N'])

# Generate the confusion matrix
cm = confusion_matrix(true_strengths, pred_strengths)

# Create the heatmap visualization
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues",
            xticklabels=['Low', 'Medium', 'High'],
            yticklabels=['Low', 'Medium', 'High'])
plt.xlabel('Predicted Strength')
plt.ylabel('True Strength')
plt.title('Confusion Matrix for Strength Prediction')
plt.show()

Emotion prediction

In [None]:
cm_emotion = confusion_matrix(true_emotions, pred_emotions)

plt.figure(figsize=(8, 6))
sns.heatmap(cm_emotion, annot=True, fmt="d", cmap="Blues",
            xticklabels=emotion_labels, yticklabels=emotion_labels)
plt.xlabel('Predicted Emotion')
plt.ylabel('True Emotion')
plt.title('Confusion Matrix for Emotion Prediction')
plt.show()