# Single Modality AutoEncoder ( T + A )

## Imports 

In [85]:
import torch
from torch.utils.data import Dataset

import os
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import numpy as np
from collections import defaultdict
import json
import torch.optim as optim
from torch.nn import TransformerEncoder, TransformerEncoderLayer

# Dataset

In [86]:
class MultimodalDataset(Dataset):
    def __init__(self, base_path, split, bert_feature_size='bert_text_features_128'):
        """
        Initialize the dataset by loading the tensor files.

        :param base_path: The path where the .pt files are stored
        :param split: The data split to load ('train', 'validate', or 'test')
        :param bert_feature_size: The size of the BERT features to load
        """
        self.audio_features = torch.load(f'{base_path}/{split}_audio_features.pt')
        self.facial_features = torch.load(f'{base_path}/{split}_facial_features.pt')
        self.bert_features = torch.load(f'{base_path}/{split}_{bert_feature_size}.pt')
        self.labels = torch.load(f'{base_path}/{split}_labels.pt')

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return {
            'audio_features': self.audio_features[idx],
            'facial_features': self.facial_features[idx],
            'bert_features': self.bert_features[idx],
            'label': self.labels[idx]
        }

In [107]:
# Assuming the data is saved in './tensor_data' directory
base_path = './tensor_data'
bert_feature_size = 'bert_text_features_512'  # or 256, 128 based on what is needed

### Common Trainer Class 

In [126]:
class ModelTrainer:
    def __init__(self, model, train_dataset, val_dataset, model_name, epochs, save_interval, lr=1e-3, device='cuda'):
        self.model = model.to(device)
        self.train_dataset = train_dataset
        self.val_dataset = val_dataset
        self.model_name = model_name
        self.start_epoch = 0
        self.epochs = epochs
        self.save_interval = save_interval
        self.lr = lr
        self.device = device
        self.history = defaultdict(list)
        self.checkpoint_dir = f'modelCheckPoints/{self.model_name}'
        os.makedirs(self.checkpoint_dir, exist_ok=True)
        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=self.lr)

    def save_checkpoint(self, epoch):
        state = {'epoch': epoch, 'state_dict': self.model.state_dict()}
        torch.save(state, f'{self.checkpoint_dir}/{epoch}.pt')

    def load_checkpoint(self):
        checkpoints = [ckpt for ckpt in os.listdir(self.checkpoint_dir) if ckpt.endswith('.pt')]
        if checkpoints:
            latest_checkpoint = max(checkpoints, key=lambda x: int(x.split('.')[0]))
            checkpoint = torch.load(f'{self.checkpoint_dir}/{latest_checkpoint}', map_location=self.device)
            self.model.load_state_dict(checkpoint['state_dict'])
            self.start_epoch = checkpoint['epoch'] + 1
            print(f"Loaded checkpoint: {latest_checkpoint} at epoch {checkpoint['epoch']}")
        else:
            print("No checkpoints found, starting from scratch.")

    def save_history(self):
        with open(f'{self.checkpoint_dir}/history.json', 'w') as f:
            json.dump(self.history, f)

    def train_one_epoch(self, dataloader, criterion, max_grad_norm=1.0):
        self.model.train()
        total_loss = 0
        correct_predictions = 0
    
        for batch in dataloader:
            audio_features = batch['audio_features'].to(self.device)
            bert_features = batch['bert_features'].to(self.device)
            labels = batch['label'].to(self.device)
    
            self.optimizer.zero_grad()
            outputs = self.model(audio_features, bert_features)
    
            loss = criterion(outputs, labels)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_grad_norm)
            self.optimizer.step()
    
            total_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct_predictions += (predicted == labels).sum().item()
    
        avg_loss = total_loss / len(dataloader.dataset)
        accuracy = correct_predictions / len(dataloader.dataset)
        return avg_loss, accuracy

    def validate(self, dataloader, criterion):
        self.model.eval()
        total_loss = 0
        correct_predictions = 0
        
        with torch.no_grad():
            for batch in dataloader:
                audio = batch['audio_features'].to(self.device)
                # vision = batch['vision'].to(self.device)
                text_bert = batch['bert_features'].to(self.device)
                labels = batch['label'].to(self.device)
        
                outputs = self.model(audio, text_bert)
                loss = criterion(outputs, labels)
                total_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                correct_predictions += (predicted == labels).sum().item()
        
        avg_loss = total_loss / len(dataloader.dataset)
        accuracy = correct_predictions / len(dataloader.dataset)
        return avg_loss, accuracy


    def train(self, criterion):
        self.load_checkpoint()
        train_dataloader = DataLoader(train_dataset, batch_size=512, shuffle=True, drop_last=True)
        val_dataloader = DataLoader(val_dataset, batch_size=512, shuffle=False, drop_last=True)

        for epoch in range(self.start_epoch, self.epochs):
            train_loss, train_acc = self.train_one_epoch(train_dataloader, criterion)
            val_loss, val_acc = self.validate(val_dataloader, criterion)
    
            self.history['train_loss'].append(train_loss)
            self.history['train_acc'].append(train_acc)
            self.history['val_loss'].append(val_loss)
            self.history['val_acc'].append(val_acc)
    
            print(f"Epoch {epoch+1}/{self.epochs}, Train Loss: {train_loss:.4f}, Train Accuracy: {train_acc:.4f}, Val Loss: {val_loss:.4f}, Val Accuracy: {val_acc:.4f}")
    
            if (epoch + 1) % self.save_interval == 0:
                self.save_checkpoint(epoch + 1)
    
            self.save_history()

### Dynamic Encoder 

In [127]:
class DynamicEncoder(nn.Module):
    def __init__(self, input_size, encoded_size=128, depth=1, dropout_rate=0.5):
        super(DynamicEncoder, self).__init__()
        layers = []

        current_size = input_size
        for i in range(depth - 1):  # Reserve the last layer to ensure it outputs encoded_size
            next_size = max(encoded_size, current_size // 2)
            layers.append(nn.Linear(current_size, next_size))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(dropout_rate))
            current_size = next_size

        # Ensure the last layer has the correct output size
        layers.append(nn.Linear(current_size, encoded_size))
        layers.append(nn.ReLU())
        layers.append(nn.Dropout(dropout_rate))

        self.encoder = nn.Sequential(*layers)

    def forward(self, x):
        return self.encoder(x)


class CombinedClassifier(nn.Module):
    def __init__(self, bert_feature_size, audio_feature_size, num_classes, encoded_audio_size=128, encoded_text_size=128, audio_depth=1, text_depth=1, classifier_size=512, classifier_depth=1, audio_dropout_rate=0.5, text_dropout_rate=0.5, classifier_dropout_rate=0.5):
        super(CombinedClassifier, self).__init__()
        self.audio_encoder = DynamicEncoder(audio_feature_size, encoded_audio_size, audio_depth, audio_dropout_rate)
        self.text_encoder = DynamicEncoder(bert_feature_size, encoded_text_size, text_depth, text_dropout_rate)
        
        combined_feature_size = encoded_audio_size + encoded_text_size 
        classifier_layers = [nn.Linear(combined_feature_size, classifier_size), nn.ReLU(), nn.Dropout(classifier_dropout_rate)]
        
        for _ in range(1, classifier_depth):
            classifier_layers.append(nn.Linear(classifier_size, classifier_size))
            classifier_layers.append(nn.ReLU())
            classifier_layers.append(nn.Dropout(classifier_dropout_rate))

        classifier_layers.append(nn.Linear(classifier_size, num_classes))
        self.classifier = nn.Sequential(*classifier_layers)

    def forward(self, audio_features, bert_features):
        encoded_audio = self.audio_encoder(audio_features)
        encoded_text = self.text_encoder(bert_features)

        if encoded_audio.dim() == 1:
            encoded_audio = encoded_audio.unsqueeze(0) 
        if encoded_text.dim() == 1:
            encoded_text = encoded_text.unsqueeze(0)

        combined_features = torch.cat((encoded_text, encoded_audio), dim=1)
        return self.classifier(combined_features)



In [130]:
def grid_search(train_dataloader, val_dataloader, bert_feature_size, audio_feature_size, num_classes):
    text_encoder_sizes = [128,256, 512]
    text_encoder_depths = [1, 2,3]
    text_encoder_dropouts = [0.3, 0.5,0.7]

    audio_encoder_sizes = [64,128, 256]
    audio_encoder_depths = [1, 2,5]
    audio_encoder_dropouts = [0.3, 0.5,0.7]

    classifier_sizes = [128,256,512, 1024]
    classifier_depths = [1, 2,3]
    classifier_dropouts = [0.3, 0.5,0.7]

    best_model = None
    best_accuracy = 0
    best_params = {}

    for text_size in text_encoder_sizes:
        for text_depth in text_encoder_depths:
            for text_dropout in text_encoder_dropouts:
                for audio_size in audio_encoder_sizes:
                    for audio_depth in audio_encoder_depths:
                        for audio_dropout in audio_encoder_dropouts:
                            for classifier_size in classifier_sizes:
                                for classifier_depth in classifier_depths:
                                    for classifier_dropout in classifier_dropouts:
                                        model = CombinedClassifier(
                                            bert_feature_size=bert_feature_size,
                                            audio_feature_size=audio_feature_size,
                                            num_classes=num_classes,
                                            encoded_audio_size=audio_size,
                                            encoded_text_size=text_size,
                                            audio_depth=audio_depth,
                                            text_depth=text_depth,
                                            classifier_size=classifier_size,
                                            classifier_depth=classifier_depth,
                                            audio_dropout_rate=audio_dropout,
                                            text_dropout_rate=text_dropout,
                                            classifier_dropout_rate=classifier_dropout
                                        )
                                        trainer = ModelTrainer(model, train_dataloader, val_dataloader, f"model_{text_size}_{text_depth}_{text_dropout}_{audio_size}_{audio_depth}_{audio_dropout}_{classifier_size}_{classifier_depth}_{classifier_dropout}", 30, 5)
                                        criterion = nn.CrossEntropyLoss()

                                        trainer.train(criterion)
                                        val_dataloader = DataLoader(val_dataset, batch_size=512, shuffle=False, drop_last=True)
                                        val_loss, val_accuracy = trainer.validate(val_dataloader, criterion)

                                        if val_accuracy > best_accuracy:
                                            best_model = model
                                            best_accuracy = val_accuracy
                                            best_params = {
                                                'text_encoder_size': text_size,
                                                'text_encoder_depth': text_depth,
                                                'text_encoder_dropout': text_dropout,
                                                'audio_encoder_size': audio_size,
                                                'audio_encoder_depth': audio_depth,
                                                'audio_encoder_dropout': audio_dropout,
                                                'classifier_size': classifier_size,
                                                'classifier_depth': classifier_depth,
                                                'classifier_dropout': classifier_dropout
                                            }
                                        print(f"Model with params {best_params} achieved validation accuracy: {val_accuracy}")

    return best_model, best_params

In [None]:
bert_feature_size, audio_feature_size = 768 , 45 
num_classes = 3 
grid_search(train_dataset, val_dataset, bert_feature_size, audio_feature_size, num_classes)

Loaded checkpoint: 30.pt at epoch 30
Model with params {'text_encoder_size': 128, 'text_encoder_depth': 1, 'text_encoder_dropout': 0.3, 'audio_encoder_size': 64, 'audio_encoder_depth': 1, 'audio_encoder_dropout': 0.3, 'classifier_size': 128, 'classifier_depth': 1, 'classifier_dropout': 0.3} achieved validation accuracy: 0.4775481111903065
No checkpoints found, starting from scratch.
Epoch 1/30, Train Loss: 0.0020, Train Accuracy: 0.4899, Val Loss: 0.0012, Val Accuracy: 0.4462
Epoch 2/30, Train Loss: 0.0017, Train Accuracy: 0.6041, Val Loss: 0.0011, Val Accuracy: 0.4676
Epoch 3/30, Train Loss: 0.0016, Train Accuracy: 0.6308, Val Loss: 0.0011, Val Accuracy: 0.4797
Epoch 4/30, Train Loss: 0.0016, Train Accuracy: 0.6412, Val Loss: 0.0011, Val Accuracy: 0.4840
Epoch 5/30, Train Loss: 0.0015, Train Accuracy: 0.6496, Val Loss: 0.0011, Val Accuracy: 0.4783
Epoch 6/30, Train Loss: 0.0015, Train Accuracy: 0.6580, Val Loss: 0.0011, Val Accuracy: 0.4847
Epoch 7/30, Train Loss: 0.0015, Train Accura