In [1]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [None]:
import numpy as np
x = np.load("/content/drive/MyDrive/CSCI535 Project/Dataset/Processed/concatenated_data/dia1001utt10_concatenated.npy")
x.shape

(1, 2560)

In [None]:
import os
import numpy as np
import json

# Step 1: Load Embeddings

def load_embeddings_from_directory(embeddings_dir):
    embeddings = {}
    for filename in os.listdir(embeddings_dir):
        if filename.endswith(".npy"):
            utterance_id = os.path.splitext(filename)[0]
            utterance_id = utterance_id.split('_')[0]
            # print(utterance_id)
            # break
            embedding_path = os.path.join(embeddings_dir, filename)
            embedding = np.load(embedding_path)
            embeddings[utterance_id] = embedding
            break
    return embeddings

# Example usage
embeddings_dir = '/content/drive/MyDrive/CSCI535 Project/Dataset/Processed/concatenated_data'
embeddings = load_embeddings_from_directory(embeddings_dir)
print(embeddings)

{'dia530utt12': array([[-0.02330273, -0.1057745 , -0.02929482, ..., -0.13061267,
        -0.17919499,  0.12389535]], dtype=float32)}


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torch.nn import TransformerEncoder, TransformerEncoderLayer
import torch.nn.functional as F
from sklearn.preprocessing import OneHotEncoder, LabelEncoder

class ConversationDataset(Dataset):
    def __init__(self, data_dir, embeddings_dir):
        self.data_dir = data_dir
        self.embeddings_dir = embeddings_dir
        self.conversations = self.load_conversations()
        self.emotion_list = sorted(['anger', 'disgust', 'fear', 'joy', 'neutral', 'sadness','surprise'])
        self.encoder = LabelEncoder()
        self.encoder.fit(self.emotion_list)

    def encode_emotion(self, emotion):
        return self.encoder.transform([emotion])[0]

    def load_conversations(self):
        with open(self.data_dir, 'r') as file:
            conversations = json.load(file)
        return conversations

    def load_embeddings(self, video_name):
        video_name = video_name.split('.')[0] + '_concatenated'

        embedding_file = os.path.join(self.embeddings_dir, f'{video_name}.npy')
        embedding = np.load(embedding_file)
        return embedding

    def positional_encoding(self, embeddings):
        seq_length = embeddings.shape[0]
        embedding_dim = embeddings.shape[1]
        position_enc = torch.zeros(seq_length, embedding_dim)
        position = torch.arange(0, seq_length, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, embedding_dim, 2).float() * (-np.log(10000.0) / embedding_dim))
        position_enc[:, 0::2] = torch.sin(position * div_term)
        position_enc[:, 1::2] = torch.cos(position * div_term)
        # print(position_enc.shape)
        return position_enc

    def __len__(self):
        return len(self.conversations)

    def __getitem__(self, idx):
        conversation = self.conversations[idx]
        context_embeddings = []
        emotions = []

        for utterance in conversation['conversation']:
            video_name = utterance['video_name']
            embedding = self.load_embeddings(video_name)
            context_embeddings.append(torch.from_numpy(embedding))
            emotions.append(self.encode_emotion(utterance['emotion']))

        max_seq_length = 33
        padded_embeddings = []
        if len(context_embeddings) < max_seq_length:
            num_to_add = max_seq_length - len(context_embeddings)
            zero_tensor = torch.zeros((1,2560), dtype=torch.float32)
            context_embeddings += [zero_tensor] * num_to_add

        context_embeddings_padded = torch.cat(context_embeddings, dim=0)

        positional_encodings = self.positional_encoding(context_embeddings_padded)
        context_embeddings_with_pos = context_embeddings_padded + positional_encodings

        emotions += [self.encode_emotion('neutral')] * num_to_add
        encoded_emotions_tensor = torch.tensor(emotions, dtype=torch.long)

        return context_embeddings_with_pos, encoded_emotions_tensor

class EmotionDetector(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_emotions, n_layers=2, dropout=0.2):
        super(EmotionDetector, self).__init__()
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.num_emotions = num_emotions
        self.n_layers = n_layers

        encoder_layers = TransformerEncoderLayer(d_model=input_dim, nhead=4, dim_feedforward=hidden_dim, dropout=dropout, batch_first=True)
        self.transformer_encoder = TransformerEncoder(encoder_layers, num_layers=n_layers)

        self.decoder_emotion = nn.Linear(input_dim, num_emotions)

    def forward(self, context_embeddings):
        encoded_context = self.transformer_encoder(context_embeddings)
        prediction_emotion = self.decoder_emotion(encoded_context)  # (batch_size, seq_length, num_emotions)
        return prediction_emotion

if torch.cuda.is_available():
    torch.cuda.empty_cache()

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

data_dir = "/content/drive/MyDrive/CSCI535 Project/Dataset/text/dev.json"
embeddings_dir = "/content/drive/MyDrive/CSCI535 Project/Dataset/Processed/concatenated_data"
num_emotions = 7  # Number of emotions for classification
input_dim = 2560  # Dimensionality of your embeddings
hidden_dim = 512  # Hidden dimension for the Transformer
n_layers = 3  # Number of layers in the Transformer
dropout = 0.25  # Dropout probability
batch_size = 32
num_epochs = 30
learning_rate = 0.0001
print_interval = 5

dataset = ConversationDataset(data_dir, embeddings_dir)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

model = EmotionDetector(input_dim, hidden_dim, num_emotions, n_layers=n_layers, dropout=dropout).to(device)

criterion_emotion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

for epoch in range(num_epochs):
    model.train()
    epoch_loss_emotion = 0.0
    for batch_idx, (context_embeddings, emotions) in enumerate(dataloader):

        optimizer.zero_grad()
        context_embeddings, emotions = context_embeddings.to(device), emotions.to(device)
        prediction_emotion_logits = model(context_embeddings)
        loss_emotion = criterion_emotion(prediction_emotion_logits.view(-1, num_emotions), emotions.view(-1))

        loss_emotion.backward()
        optimizer.step()

        epoch_loss_emotion += loss_emotion.item()
        print(loss_emotion.item())

        if (batch_idx + 1) % print_interval == 0:
            print(f'Epoch [{epoch + 1}/{num_epochs}], Batch [{batch_idx + 1}/{len(dataloader)}], '
                  f'Emotion Loss: {loss_emotion.item():.4f}')

    print(f'Epoch [{epoch + 1}/{num_epochs}], Average Emotion Loss: {epoch_loss_emotion / len(dataloader):.4f}')

torch.save(model.state_dict(), 'emotion_detection_model.pth')

1.8218151330947876
2.305945634841919
1.8957395553588867
1.494415521621704
Epoch [1/30], Average Emotion Loss: 1.8795
0.704918622970581
1.7075051069259644
0.7416472434997559
0.9643366932868958
Epoch [2/30], Average Emotion Loss: 1.0296
0.839963436126709
0.9937276244163513
1.0661314725875854
1.0811163187026978
Epoch [3/30], Average Emotion Loss: 0.9952
1.0471463203430176
0.8017507195472717
0.7013596296310425
0.5909005999565125
Epoch [4/30], Average Emotion Loss: 0.7853
0.6465285420417786
0.737175703048706
0.7775102257728577
0.7051157355308533
Epoch [5/30], Average Emotion Loss: 0.7166
0.712006688117981
0.7361847162246704
0.6434614062309265
0.7474231719970703
Epoch [6/30], Average Emotion Loss: 0.7098
0.6166574358940125
0.6047108173370361
0.6500586271286011
0.5543012619018555
Epoch [7/30], Average Emotion Loss: 0.6064
0.5282276272773743
0.597628653049469
0.5601637959480286
0.4945029616355896
Epoch [8/30], Average Emotion Loss: 0.5451
0.5569114089012146
0.614231526851654
0.4125037491321563

In [None]:
test = torch.randn(3, 5).softmax(dim=1)
test

tensor([[0.0719, 0.1026, 0.5198, 0.2240, 0.0817],
        [0.1155, 0.3262, 0.1952, 0.1305, 0.2327],
        [0.4642, 0.1308, 0.2186, 0.0577, 0.1287]])

In [None]:
def evaluate_model(model, dataloader, criterion):
    model.eval()
    total_loss = 0.0
    total_correct = 0
    total_samples = 0

    with torch.no_grad():
        for batch in dataloader:
            context_embeddings, target_emotions = batch
            output = model(context_embeddings)
            loss = criterion(output, target_emotions)
            total_loss += loss.item()

            _, predicted_emotions = torch.max(output, 1)
            total_correct += (predicted_emotions == target_emotions).sum().item()
            total_samples += target_emotions.size(0)

    average_loss = total_loss / len(dataloader)
    accuracy = total_correct / total_samples

    return average_loss, accuracy


In [None]:
# Example usage
validation_dataloader = DataLoader(validation_dataset, batch_size=64, shuffle=False)

val_loss, val_accuracy = evaluate_model(model, validation_dataloader, criterion)
print(f'Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}')