In [None]:
%%capture
!pip install pydub

In [None]:
import os
import json
import numpy as np
import joblib
import cv2
from tqdm import tqdm
import matplotlib.pyplot as plt
import seaborn as sns
import librosa
import pandas as pd

import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
from mediapipe import solutions
from mediapipe.framework.formats import landmark_pb2

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Input, GRU, LSTM, Dropout, Dense, Conv1D, MaxPooling1D, BatchNormalization, Flatten
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import SparseCategoricalCrossentropy
from tensorflow.keras.metrics import SparseCategoricalAccuracy
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.models import load_model
from tensorflow.keras.models import Model

from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, accuracy_score
from sklearn.preprocessing import MinMaxScaler

import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn

from pydub.utils import which

from pydub import AudioSegment

os.system('apt-get install -y ffmpeg')

AudioSegment.converter = which("ffmpeg")


# Load Dataset

In [None]:
import os

def load_dataset(directory):
    """Loads the datasets from the folders "./datasets/ravdess-emotional-speech-video".

    Args:
        directory: The name of the dataset directory.

    Returns:
        A tuple containing:
        - A list of paths to the videos in the dataset.
        - A list of relative labels corresponding to each video.
    """
    paths = []  # List to store the paths of the videos
    labels = []  # List to store the labels corresponding to each video

    # Iterate over each subject directory in the dataset
    for subject in os.listdir(directory):
        # Skip the ".DS_Store" file if present
        if subject == ".DS_Store":
            continue

        # Construct the path to the subject directory
        percorso_dir_subject = os.path.join(directory, subject)

        # Iterate over each element (e.g., video files) in the subject directory
        for elem in os.listdir(percorso_dir_subject):
            # Skip the ".DS_Store" file if present
            if elem == ".DS_Store":
                continue

            # Construct the path to the element
            percorso_elem = os.path.join(percorso_dir_subject, elem)

            # Iterate over each file in the element directory
            for file in os.listdir(percorso_elem):
                # Construct the full path to the file
                percorso_file = os.path.join(percorso_elem, file)

                # Check if the file is not a ".db" file and starts with "01"
                if not percorso_file.endswith(".db") and file.startswith("01"):
                    # Add the file path to the list of paths
                    paths.append(percorso_file)

                    # Extract the label from the file name and add it to the list of labels
                    label = percorso_file.split('/')[-1][6:8]
                    labels.append(int(label) - 1)

    return paths, labels  # Return the list of paths and labels


# Save video features

In [None]:
def save_dataset(features, labels, filepath):
    """Save the processed dataset with all features extracted

    Args:
        features: list of features for each sample
        labels: list of label for each sample
        filepath: target directory of the file
    """
    joblib.dump((features, labels), filepath)
    print(f"Dataset saved in  {filepath}")

def load_dataset_jlb(filepath):
    """Load the dataset with all features extracted from the file
    
    Args: 
        filepath: file from which load the dataset
    
    Return:
        features: (list) of features for each sample
        labels: (list) of label for each sample
    """
    features, labels = joblib.load(filepath)
    print(f"Dataset loaded from {filepath}")
    return features, labels

# Video Utils

In [None]:
def load_data(paths):
    """Loads features files and obtain the relative labels for each video path

    Args:
        paths: list of .csv files

    Return:
        features: array of features of files
        labels: array of labels of files
    """
    
    features = []
    labels = []

    sequence_length = 50

    for video_path in tqdm(paths, desc = "loading video features"):
        data = pd.read_csv(video_path)
        data = data.drop(columns=['timestamp', 'frame', 'confidence'])
        h = data.shape[0]
        skip_interval = max(int(h/sequence_length), 1)
        
        data = data[:skip_interval*sequence_length:skip_interval]
        features.append(data.values)
        
        label = video_path.split('/')[-1][6:8]
        labels.append(int(label)-1)

    features = np.array(features, dtype='float32')
    labels = np.array(labels, dtype='int8')
    
    return features, labels

In [None]:
def from_audiopath_to_videopath(paths_audio):
    """ Obtain features paths from video/audio paths

    Args:
        paths_audio: list of video/audio paths

    Return:
        paths_video: list of features video paths
    """

    """
        From original video path: /kaggle/input/ravdess-emotional-speech-video/RAVDESS dataset/Video_Speech_Actor_01/Actor_01/01-01-01-01-01-01-01.mp4
        To features video path: /kaggle/input/ravdess-facial-tracking/01-01-01-01-01-01-01.csv
    """
    
    dir_video = '/kaggle/input/ravdess-facial-tracking/'
    paths_video = []
    for path in paths_audio:
        name = path.split('/')[-1].split('.')[0]
        path_video = dir_video + name + '.csv'
        paths_video.append(path_video)

    return paths_video

# Video Model

### GRU Model

In [None]:
class GRUEmotionRecognitionModel:
    def __init__(self, input_shape, num_classes, gru_units=64, dense_units=64, dropout_rate=0.5, learning_rate=0.001):
        self.input_shape = input_shape
        self.num_classes = num_classes
        self.gru_units = gru_units
        self.dense_units = dense_units
        self.dropout_rate = dropout_rate
        self.learning_rate = learning_rate
        self.model = self._build_model()
    
    def _build_model(self):
        model = Sequential()
        
        # Convolutional Layer
        model.add(Conv1D(filters=32, kernel_size=3, activation='relu', input_shape=self.input_shape))
        model.add(MaxPooling1D(pool_size=2))
        
        # GRU Layer
        model.add(GRU(self.gru_units, return_sequences=True))
        model.add(Dropout(self.dropout_rate))
        model.add(BatchNormalization())

        # Dense Layers
        model.add(Flatten())
        model.add(Dense(self.dense_units * 2, activation='relu'))
        model.add(Dropout(self.dropout_rate))
        model.add(BatchNormalization())
        model.add(Dense(self.num_classes, activation='softmax'))
        
        model.compile(optimizer=Adam(learning_rate=self.learning_rate),
                      loss=SparseCategoricalCrossentropy(),
                      metrics=[SparseCategoricalAccuracy()])
        return model

    def summary(self):
        self.model.summary()
    
    def train(self, X_train, y_train,  X_val, y_val, epochs=100, batch_size=64):
        checkpoint_callback = ModelCheckpoint(
            filepath='/kaggle/working/models/video_best_model_GRU.keras', 
            monitor='val_sparse_categorical_accuracy',   
            save_best_only=True,       
            mode='max',                
            verbose=1                 
        )
        
        history = self.model.fit(
            X_train, y_train,
            epochs=epochs,
            batch_size=batch_size,
            validation_data=(X_val, y_val),
            callbacks=[checkpoint_callback]  
        )
        
        return history
    
    def evaluate(self, X_test, y_test):
        test_loss, test_accuracy = self.model.evaluate(X_test, y_test)
        return test_loss, test_accuracy
    
    def predict(self, X):
        predictions = self.model.predict(X)
        predicted_classes = np.argmax(predictions, axis=1)
        return predicted_classes
    
    def predict_prob(self, X):
        predictions = self.model.predict(X)
        predicted_classes = np.argmax(predictions, axis=1)
        return predictions, predicted_classes
    
    def plot_confusion_matrix(self, y_true, y_pred, class_names, title):
        cm = confusion_matrix(y_true, y_pred)
        cm = cm / np.sum(cm, axis = 1, keepdims = True)
        plt.figure(figsize=(10, 7))
        sns.heatmap(cm, annot=True, fmt='.2%', xticklabels=class_names, yticklabels=class_names)
        plt.xlabel('Predicted')
        plt.ylabel('True')
        plt.xticks(rotation=90)
        plt.yticks(rotation=0)
        plt.title(title)
        plt.show()

### LSTM Model

In [None]:
class LSTMEmotionRecognitionModel:
    def __init__(self, input_shape, num_classes, lstm_units=64, dense_units=64, dropout_rate=0.5, learning_rate=0.001):
        self.input_shape = input_shape
        self.num_classes = num_classes
        self.lstm_units = lstm_units
        self.dense_units = dense_units
        self.dropout_rate = dropout_rate
        self.learning_rate = learning_rate
        self.model = self._build_model()
    
    def _build_model(self):
        model = Sequential()
        
        # Convolutional Layer
        model.add(Conv1D(filters=32, kernel_size=3, activation='relu', input_shape=self.input_shape))
        model.add(MaxPooling1D(pool_size=2))
        
        # First LSTM Layer
        model.add(LSTM(self.lstm_units, return_sequences=True))
        model.add(Dropout(self.dropout_rate))
        model.add(BatchNormalization())
    
        # Dense Layers
        model.add(Flatten())
        model.add(Dense(self.dense_units * 2, activation='relu'))
        model.add(Dropout(self.dropout_rate))
        model.add(BatchNormalization())
        model.add(Dense(self.num_classes, activation='softmax'))
        
        model.compile(optimizer=Adam(learning_rate=self.learning_rate),
                      loss=SparseCategoricalCrossentropy(),
                      metrics=[SparseCategoricalAccuracy()])
        return model

    def summary(self):
        self.model.summary()
    
    def train(self, X_train, y_train, X_val, y_val, epochs=100, batch_size=64):
        checkpoint_callback = ModelCheckpoint(
            filepath='/kaggle/working/models/video_best_model_LSTM.keras', 
            monitor='val_sparse_categorical_accuracy',   
            save_best_only=True,       
            mode='max',                
            verbose=1                 
        )
        
        history = self.model.fit(
            X_train, y_train,
            epochs=epochs,
            batch_size=batch_size,
            validation_data=(X_val, y_val),
            callbacks=[checkpoint_callback]  
        )
        
        return history
    
    def evaluate(self, X_test, y_test):
        test_loss, test_accuracy = self.model.evaluate(X_test, y_test)
        return test_loss, test_accuracy
    
    def predict(self, X):
        predictions = self.model.predict(X)
        predicted_classes = np.argmax(predictions, axis=1)
        return predicted_classes
    
    def predict_prob(self, X):
        predictions = self.model.predict(X)
        predicted_classes = np.argmax(predictions, axis=1)
        return predictions, predicted_classes
    
    def plot_confusion_matrix(self, y_true, y_pred, class_names, title):
        cm = confusion_matrix(y_true, y_pred)
        cm = cm / np.sum(cm, axis=1, keepdims=True)
        plt.figure(figsize=(10, 7))
        sns.heatmap(cm, annot=True, fmt='.2%', xticklabels=class_names, yticklabels=class_names)
        plt.xlabel('Predicted')
        plt.ylabel('True')
        plt.xticks(rotation=90)
        plt.yticks(rotation=0)
        plt.title(title)
        plt.show()


# Train and Test Video Model

In [None]:
def train_video_model(train_path, test_path, emotions_to_idx):
    """Load and normalize the train and test dataset and train GRU and LSTM video model.

    Args:
        train_path: paths of the .csv files used to train the video model
        test_path: paths of the .csv files used to test the video model
        emotions_to_idx: dictonary to convert the name of the emotion to the relative index

    Return:
        scaler: the Scaler object to normalize train and test data
    """
    
    train_dataset_filepath = '/kaggle/working/Video_track/subtrain_features_rav.pkl'
    test_dataset_filepath = '/kaggle/working/Video_track/subtest_features_rav.pkl'
    os.makedirs('/kaggle/working/Video_track/', exist_ok = True)

    # Load train and test dataset
    if os.path.exists(train_dataset_filepath):
        train_point_list, train_labels = load_dataset_jlb(train_dataset_filepath)
        test_point_list, test_labels = load_dataset_jlb(test_dataset_filepath)

    else:
        train_point_list, train_labels = load_data(train_path)
        save_dataset(train_point_list, train_labels, train_dataset_filepath)
        test_point_list, test_labels = load_data(test_path)
        save_dataset(test_point_list, test_labels, test_dataset_filepath)

    # Normalize dataset
    scaler = MinMaxScaler()
    train_point_list = scaler.fit_transform(train_point_list.reshape(-1, train_point_list.shape[-1])).reshape(train_point_list.shape)
    test_point_list = scaler.transform(test_point_list.reshape(-1, test_point_list.shape[-1])).reshape(test_point_list.shape)
    
    NUM_CLASSES = len(emotions_to_idx.keys())

    os.makedirs('/kaggle/working/models/', exist_ok = True)

    # If the model file doesn't exist, train GRU video model and save on the file
    if not os.path.exists('/kaggle/working/models/video_best_model_GRU.keras'):
        gru_emotion_model = GRUEmotionRecognitionModel(input_shape=(50, 709), num_classes=NUM_CLASSES)
        gru_emotion_model.summary()
        history = gru_emotion_model.train(train_point_list, train_labels, test_point_list, test_labels, epochs=100, batch_size=64)

    # If the model file doesn't exist, train LSTM video model and save on the file
    if not os.path.exists('/kaggle/working/models/video_best_model_LSTM.keras'):
        lstm_emotion_model = LSTMEmotionRecognitionModel(input_shape=(50, 709), num_classes=NUM_CLASSES)
        lstm_emotion_model.summary()
        history = lstm_emotion_model.train(train_point_list, train_labels, test_point_list, test_labels, epochs=100, batch_size=64)

    return scaler

In [None]:
def video_predict(scaler, test_path, features_file_name, emotions_to_idx, cm_title):
    """Loads the datasets from the folders "./datasets/ravdess-emotional-speech-video"

    Args:
        scaler: the Scaler object to normalize train and test data
        test_path: paths of the .csv files used to test the video model
        features_file_name: path of the file of features
        emotions_to_idx: dictonary to convert the name of the emotion to the relative index
        cm_title: the title for the confusion matrix

    Return:
        gru_test_predict_probs: the logits predicted by the GRU model
        gru_test_predict: the classes predicted by the GRU model
        lstm_test_predict_probs: the logits predicted by the LSTM model
        lstm_test_predict: the classes predicted by the LSTM model
    """
    
    os.makedirs('/kaggle/working/Video_track/', exist_ok = True)
    labels_name = ['Neutral', 'Calm', 'Happy', 'Sad', 'Angry', 'Fearful', 'Disgusted', 'Surprised']
    
    lstm_path = '/kaggle/working/models/video_best_model_LSTM.keras'
    gru_path = '/kaggle/working/models/video_best_model_GRU.keras'

    # Load test dataset
    if os.path.exists(features_file_name):
        test_point_list, test_labels = load_dataset_jlb(features_file_name)
    else:
        test_point_list, test_labels = load_data(test_path)
        save_dataset(test_point_list, test_labels, features_file_name)
        
    NUM_CLASSES = len(emotions_to_idx.keys())

    # Normalize test data
    test_point_list = scaler.transform(test_point_list.reshape(-1, test_point_list.shape[-1])).reshape(test_point_list.shape)

    # Load parameters of GRU video model, make the predictions and plot the confusion matrix
    gru_model = GRUEmotionRecognitionModel(input_shape=(50, 709), num_classes=NUM_CLASSES)
    gru_model.model.load_weights(gru_path)
    gru_model.summary()
    gru_test_predict_probs, gru_test_predict = gru_model.predict_prob(test_point_list)
    gru_model.plot_confusion_matrix(test_labels,gru_test_predict, labels_name, cm_title)
    print("GRU Video model accuracy: ", accuracy_score(test_labels, gru_test_predict), "\n\n")

    # Load parameters of LSTM video model, make the predictions and plot the confusion matrix
    lstm_model = LSTMEmotionRecognitionModel(input_shape=(50, 709), num_classes=NUM_CLASSES)
    lstm_model.model.load_weights(lstm_path)
    lstm_test_predict_probs, lstm_test_predict = lstm_model.predict_prob(test_point_list)
    lstm_model.plot_confusion_matrix(test_labels,lstm_test_predict, labels_name, cm_title)
    print("LSTM Video model accuracy: ",accuracy_score(test_labels, lstm_test_predict), "\n\n")
    
    return gru_test_predict_probs, gru_test_predict, lstm_test_predict_probs, lstm_test_predict

# Audio extraction by Video

In [None]:
def extract_audio_from_videos(files, output_directory):
    """Extracts audios from videos and it saves them into the output directory

    Args:
        files: list of video's paths  
        output_directory: directory where to save audios
        
    Return:
        out_paths (list): audio paths extracted and saved
    """
    out_paths = []
    rav = output_directory + "/" + "ravdess"
    
    os.makedirs(output_directory, exist_ok=True)
    os.makedirs(rav, exist_ok=True)
    
    i = 0
    for video_path in tqdm(files, desc = "Extracting audios"):
        filename = video_path.split("/")[-1]
        try:
            
            audio_path = os.path.join(rav, filename.split(".")[0]+"_"+ str(i) +".wav")
            out_paths.append(audio_path)
            
            # if already exists the audio in the directory don't re-extract it
            if not os.path.exists(audio_path):
                audio = AudioSegment.from_file(video_path, format="mp4")
                audio.export(audio_path, format="wav")
            i += 1
        except Exception as e:
            print(f"Failed to process {video_path}: {e}")
    return out_paths



In [None]:
def load_audio(file_path):
    """Loads the audio from the .wav file

    Args:
        file_path: audio file path
        
    Return:
        tuple: A tuple containing:
            - y: The audio time series as a numpy array.
            - sr: The sampling rate of the audio file.

    """
    # Set the sampling rate to 22050 Hz
    sr=22050

    # Extract the dataset name from the file path
    dataset = file_path.split("/")[-2]

     # Set offset and duration for the "ravdess" dataset
    if dataset == "ravdess":
        offset = 1.8
        duration = 2
    try:
        # Load the audio file with the specified parameters
        y, sr = librosa.load(file_path, sr=sr, duration=duration, offset=offset)
        return y, sr
    except Exception as e:
        print("Error encountered while parsing file: ", file_path)
        return None, None

def extract_MFCC(file_path, n_mfcc=64):
    """Extracts Mel-Frequency Cepstral Coefficients (MFCC) from an audio file.

    Args:
        file_path: Path to the audio file from which to extract MFCCs.
        n_mfcc: Number of MFCCs to return. Defaults to 64.

    Returns:
        A 2D numpy array containing the MFCC spectrogram, or None if the audio file could not be loaded.
    """
    # Load the audio file
    y, sr = load_audio(file_path)

    # Check if the audio file was loaded successfully
    if y is None:
        return None

    # Extract MFCC features from the audio signal
    mfcc_spectrogram = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=n_mfcc)

    # Ensure the MFCC spectrogram has the desired shape
    if mfcc_spectrogram is not None:
        if mfcc_spectrogram.shape != (64, 87):
            # Trim or pad the spectrogram to match the desired shape
            if mfcc_spectrogram.shape[1] > 87:
                mfcc_spectrogram = mfcc_spectrogram[:, :87]
            else:
                mfcc_spectrogram = np.pad(mfcc_spectrogram,
                                          ((0, 0), (0, 87 - mfcc_spectrogram.shape[1])),
                                          mode='constant')
    return mfcc_spectrogram

def dump_spectrogram(mel_spectrogram, output_file):
    """Saves a Mel spectrogram to a file.

    Args:
        mel_spectrogram: The Mel spectrogram to save.
        output_file: The path to the output file where the spectrogram will be saved.
    """
    # Create the output directory if it does not exist
    os.makedirs(os.path.dirname(output_file), exist_ok=True)

    # Save the Mel spectrogram to the specified file
    np.save(output_file, mel_spectrogram)



In [None]:
def preprocess_dataset(files, output_folder):
    """Preprocesses a dataset of audio files and saves the extracted features.

    Args:
        files: List of file paths to the audio files to be processed.
        output_folder: The folder where the extracted features will be saved.

    Returns:
        features_path: A list of paths to the saved feature files.
    """
    cont = 0  # Counter for the number of processed files
    ravdess_dict = {0: 'ne', 1: 'ca', 2: 'ha', 3: 'sa', 4: 'an', 5: 'fe', 6: 'di', 7: 'su'}
    features_path = []  # List to store the paths of the saved feature files

    for f in files:
        directory = f.split("/")[-2]  # Extract the directory name from the file path
        filename = f.split("/")[-1].split(".")[0]  # Extract the filename without extension

        if directory == "ravdess":  # Process only if the directory is "ravdess"
            part = filename.split('-')  
            emotion = ravdess_dict[int(part[2]) - 1]  # Determine the emotion from the filename
            audio_file = f  

            # Define the output file path for the extracted features
            output_file = os.path.join(
                output_folder, emotion,
                filename + ".npy",
            )

            # Check if the output file already exists
            if not os.path.exists(output_file):
                # Extract MFCC features from the audio file
                mfcc_spectrogram = extract_MFCC(audio_file)
                # Save the extracted features to the output file
                dump_spectrogram(mfcc_spectrogram, output_file)

            cont += 1  

        features_path.append(output_file)  

    return features_path  


In [None]:
import soundfile as sf
from librosa.effects import pitch_shift, time_stretch

def augmentation(audio_paths):
    """Performs data augmentation on a list of audio files.

    Args:
        audio_paths: List of paths to the audio files to be augmented.

    Returns:
        new_audios_paths: A list of paths to the augmented audio files, including the originals.
    """
    new_audios_paths = []  # List to store paths of the augmented audio files

    for audio in tqdm(audio_paths, desc="Audio augmentation.."):
        # Load the audio file with a fixed sampling rate of 22050 Hz
        y, sr = librosa.load(audio, sr=22050)
        base_name = os.path.splitext(audio)[0]  # Base name of the audio file without extension

        # 1. Pitch Shifting: Shifts the pitch by ±4 semitones (excluding 0)
        for steps in range(-4, 5):
            if steps == 0:
                continue
            output_path = f"{base_name}_pitch_{steps}.wav"
            if not os.path.exists(output_path):
                y_shifted = pitch_shift(y, sr=sr, n_steps=steps)
                sf.write(output_path, y_shifted, sr)
            new_audios_paths.append(output_path)

        # 2. Time Stretching: Modifies the playback speed
        for rate in [0.9, 1.1]:  # 0.9 slows down, 1.1 speeds up
            output_path = f"{base_name}_timestretch_{rate}.wav"
            if not os.path.exists(output_path):
                y_stretched = time_stretch(y, rate=rate)
                sf.write(output_path, y_stretched, sr)
            new_audios_paths.append(output_path)

        # 3. Noise Injection: Adds Gaussian noise to the audio
        noise_factor = 0.005  
        output_path = f"{base_name}_noise.wav"
        if not os.path.exists(output_path):
            noise = np.random.randn(len(y))
            y_noisy = y + noise_factor * noise
            sf.write(output_path, y_noisy, sr)
        new_audios_paths.append(output_path)

        # 4. Volume Adjustment: Modifies the volume by scaling
        for factor in [0.8, 1.2]:  # 0.8 reduces volume, 1.2 increases volume
            output_path = f"{base_name}_vol_{factor}.wav"
            if not os.path.exists(output_path):
                y_vol = y * factor
                sf.write(output_path, y_vol, sr)
            new_audios_paths.append(output_path)

    # Add the original audio files to the output list
    new_audios_paths += audio_paths
    return new_audios_paths


# Dataset and Dataloader

In [None]:
import torch
from torch.utils.data import Dataset

class MultilabelSpectrogramDataset(Dataset):
    """A custom dataset class for handling spectrogram data with multi-labels.

    Args:
        file_paths: List of file paths to the spectrogram data.
        labels: List of labels corresponding to each spectrogram.
    """

    def __init__(self, file_paths, labels):
        """Initializes the dataset with file paths and corresponding labels."""
        self.file_paths = file_paths  # List of paths to the spectrogram files
        self.labels = labels  # List of labels for each spectrogram

    def __len__(self):
        """Returns the total number of samples in the dataset."""
        return len(self.file_paths)

    def __getitem__(self, idx):
        """Retrieves a sample from the dataset at the given index.

        Args:
            idx: Index of the sample to retrieve.

        Returns:
            A tuple containing the spectrogram and its corresponding label as tensors.
        """
        file_path = self.file_paths[idx]  # Get the file path for the current index
        spectrogram = np.load(file_path)  # Load the spectrogram from the file
        label = self.labels[idx]  # Get the corresponding label

        # Convert the spectrogram and label to PyTorch tensors
        return torch.tensor(spectrogram, dtype=torch.float32), torch.tensor(label, dtype=torch.long)

In [None]:
def load_audio_dataset(files):
    """Loads labels for an audio dataset based on file paths.

    Args:
        files: List of file paths to the audio files.

    Returns:
        labels: A list of labels corresponding to each audio file.
    """
    labels = []
    for file in files:
        label_name = file.split("/")[-2]
        label = emotions_to_idx[label_name]
        labels.append(label)
    return labels

In [None]:
def create_audio_dataloader(data_audio):
    

# Audio model

In [None]:
import torch
import torch.nn as nn

class AudioModel(nn.Module):
    """A convolutional neural network model for audio classification.

    Args:
        num_labels: Number of output labels (default is 8).
    """

    def __init__(self, num_labels=8):
        """Initializes the AudioModel with specified number of output labels."""
        super(AudioModel, self).__init__()

        # Convolutional layers
        self.conv1 = nn.Conv2d(1, 16, kernel_size=3, padding="same")
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding="same")
        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, padding="same")
        self.conv4 = nn.Conv2d(64, 128, kernel_size=3, padding="same")

        # Max pooling layer
        self.pool = nn.MaxPool2d(2, 2)

        # Activation function
        self.relu = nn.ReLU()

        # Fully connected layers
        self.fc1 = nn.Linear(128 * 4 * 5, 512)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, 128)

        # Dropout layer for regularization
        self.dropout = nn.Dropout(0.1)

        # Output layer
        self.fc4 = nn.Linear(128, num_labels)

        # Softmax activation for multi-class classification
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        
        # Add a channel dimension to the input tensor
        x = x.unsqueeze(1)

        # Convolutional layers with ReLU activation and max pooling
        x = self.relu(self.conv1(x))
        x = self.pool(x)

        x = self.relu(self.conv2(x))
        x = self.pool(x)

        x = self.relu(self.conv3(x))
        x = self.pool(x)

        x = self.relu(self.conv4(x))
        x = self.pool(x)

        # Flatten the tensor for the fully connected layers
        x = x.flatten(start_dim=1)

        # Fully connected layers with ReLU activation and dropout
        x1 = self.fc1(x)
        x1 = self.relu(x1)
        x1 = self.dropout(x1)

        x2 = self.fc2(x1)
        x2 = self.relu(x2)
        x2 = self.dropout(x2)

        x3 = self.fc3(x2)
        x3 = self.relu(x3)
        x3 = self.dropout(x3)

        # Final prediction layer with softmax activation
        x4 = self.fc4(x3)
        x5 = self.softmax(x4)

        # Return the penultimate layer output and the final softmax output
        return x3, x5

# Train loop audio model

In [None]:
import os
import torch
import torch.nn as nn
from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, accuracy_score

def train_audio_model(subtrain_dataloader, subtest_dataloader):
    """Trains the audio classification model and evaluates its performance.

    Args:
        subtrain_dataloader: DataLoader for the training dataset.
        subtest_dataloader: DataLoader for the validation dataset.
    """
    # Check if the model file already exists
    if not os.path.exists('/kaggle/working/models/audio_model.pth'):

        # Initialize the model
        model = AudioModel()

        # Label names for plotting
        labels_list = ['Neutral', 'Calm', 'Happy', 'Sad', 'Angry', 'Fearful', 'Disgusted', 'Surprised']
        num_labels = len(labels_list)

        # Initialize a confusion matrix
        confusion_matrix_ = np.zeros((num_labels, num_labels))

        # Create the model directory if it does not exist
        os.makedirs("/kaggle/working/models", exist_ok=True)

        # Set the device for training (GPU if available, otherwise CPU)
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        model.to(device)

        # Define the loss function and optimizer
        criterion = nn.CrossEntropyLoss()
        optimizer = torch.optim.Adam(model.parameters(), lr=0.0001, weight_decay=0.0001)

        # Lists to store training losses and accuracy scores
        losses = []
        accuracy_scores = dict()

        # Number of training epochs
        num_epochs = 150

        # Training loop
        for epoch in range(num_epochs):
            model.train()  # Set the model to training mode

            total_loss = 0.0

            # Iterate over the training dataset
            for audios, labels in tqdm(subtrain_dataloader):
                audios = audios.to(device)
                labels = labels.to(device)

                # Forward pass
                features, outputs = model(audios)
                loss = criterion(outputs, labels)

                # Backward pass and optimization
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

                total_loss += loss.item()

            print(f'Epoch {epoch+1}/{num_epochs}, Loss: {total_loss}')

            model.eval()  # Set the model to evaluation mode

            all_preds = []
            all_labels = []

            # Evaluate the model on the validation dataset
            for audios, labels in subtest_dataloader:
                audios = audios.to(device)
                labels = labels.to(device)

                # Forward pass
                features, outputs = model(audios)
                _, preds = torch.max(outputs, 1)

                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())

                # Update the confusion matrix
                confusion_matrix_ += confusion_matrix(labels.cpu().numpy(), preds.cpu().numpy(), labels=range(num_labels))

            # Calculate validation accuracy
            accuracy = accuracy_score(all_labels, all_preds)
            print(f'Validation accuracy: {accuracy}')

            # Store the model state with the highest accuracy
            accuracy_scores[accuracy] = model.state_dict()

            losses.append(total_loss)

        # Find the best model based on validation accuracy
        best_accuracy = max(accuracy_scores.keys())
        best_model = accuracy_scores[best_accuracy]
        print("\nBest validation accuracy: ", best_accuracy, "\n")

        # Save the best model
        torch.save(best_model, '/kaggle/working/models/audio_model.pth')
        accuracy_scores = list(accuracy_scores.keys())

        # Plot training loss
        plt.plot(losses)
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.title('Audio Training Loss')
        plt.show()

        # Plot validation accuracy
        plt.plot(accuracy_scores)
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.title('Audio Validation Accuracy')
        plt.show()

        # Normalize the confusion matrix
        confusion_matrix_ = confusion_matrix_ / np.sum(confusion_matrix_, axis=1, keepdims=True)

        # Plot the confusion matrix
        plt.figure(figsize=(10, 8))
        sns.heatmap(confusion_matrix_, annot=True, fmt=".2%", xticklabels=labels_list, yticklabels=labels_list, cmap='Purples')
        plt.title('Audio Prediction on Subtest Set in Training Mode')
        plt.ylabel('True Label')
        plt.xlabel('Predicted Label')
        plt.show()


In [None]:
from torch.utils.data import DataLoader

def create_audio_model(subtrain_data, subtest_data, test_data):
    """Creates and trains an audio classification model using video data.

    Args:
        subtrain_data: List of video paths for the training set.
        subtest_data: List of video paths for the validation set.
        test_data: List of video paths for the test set.
    Returns:
        Dataloaders of training, validation and test set
    """
    # Extract audio from videos in the subtrain set
    print("Audio extraction by video of subtrain set...\n")
    audio_paths_subtrain = extract_audio_from_videos(subtrain_data, '/kaggle/working/Audios/subtrain')
    
    # Perform data augmentation on the extracted audio files
    #audio_paths_subtrain = augmentation(audio_paths_subtrain)

    # Extract audio from videos in the subtest set
    print("Audio extraction by video of subtest set...\n")
    audio_paths_subtest = extract_audio_from_videos(subtest_data, '/kaggle/working/Audios/subtest')

    print("Audio extraction by video of test set...\n")
    audio_paths_test = extract_audio_from_videos(test_data, '/kaggle/working/Audios/test')

    # Extract features from the audio files in the subtrain set
    print("Features extraction by audio of subtrain set...\n")
    output_folder = "/kaggle/working/Dataset/subtrain"
    features_paths_subtrain = preprocess_dataset(audio_paths_subtrain, output_folder)

    # Extract features from the audio files in the subtest set
    print("Features extraction by audio of subtest set...\n")
    output_folder = "/kaggle/working/Dataset/subtest"
    features_paths_subtest = preprocess_dataset(audio_paths_subtest, output_folder)

    # Extract features from the audio files in the test set
    print("Features extraction by audio of test set...\n")
    output_folder = "/kaggle/working/Dataset/test"
    features_paths_test = preprocess_dataset(audio_paths_test, output_folder)

    # Extract labels for the features
    subtrain_labels = load_audio_dataset(features_paths_subtrain)
    subtest_labels = load_audio_dataset(features_paths_subtest)
    test_labels = load_audio_dataset(features_paths_test)
    
    # Define the batch size for training and evaluation
    BATCH_SIZE = 64

    # Create datasets for the subtrain and subtest sets
    subtrain_dataset = MultilabelSpectrogramDataset(features_paths_subtrain, subtrain_labels)
    subtest_dataset = MultilabelSpectrogramDataset(features_paths_subtest, subtest_labels)
    test_dataset = MultilabelSpectrogramDataset(features_paths_test, test_labels)
    
    # Create DataLoaders for the subtrain and subtest datasets
    subtrain_dataloader = DataLoader(subtrain_dataset, batch_size=BATCH_SIZE, shuffle=True)
    subtest_dataloader = DataLoader(subtest_dataset, batch_size=BATCH_SIZE, shuffle=False)
    test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

    # Train the audio model
    print("Training audio model...\n")
    train_audio_model(subtrain_dataloader, subtest_dataloader)

    # Return the DataLoaders
    return subtrain_dataloader, subtest_dataloader, test_dataloader


In [None]:
def audio_predict(dataloader_audio, name="Test"):
    """Predicts labels for audio data using a pre-trained model and evaluates its performance.

    Args:
        dataloader_audio: DataLoader audio dataset to test.
        name: name of the set

    Returns:
        A tuple containing predictions and probabilities for the set tested
    """
    # Load the pre-trained audio model
    print("Loading of audio model by file...\n")
    model_path = '/kaggle/working/models/audio_model.pth'
    model = AudioModel()
    model.load_state_dict(torch.load(model_path))

    # Set the device for evaluation (GPU if available, otherwise CPU)
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    model.eval()

    # Label names for plotting
    labels_list = ['Neutral', 'Calm', 'Happy', 'Sad', 'Angry', 'Fearful', 'Disgusted', 'Surprised']
    num_labels = len(labels_list)

    # Initialize variables for storing predictions and features
    confusion_matrix_ = np.zeros((num_labels, num_labels))
    all_labels = []
    all_outputs = []
    all_preds = []

    # Evaluate the model on the subtest dataset
    for audios, labels in dataloader_audio:
        audios = audios.to(device)
        labels = labels.to(device)

        # Forward pass
        features, outputs = model(audios)
        all_outputs.extend(outputs.cpu().detach().numpy())
        _, preds = torch.max(outputs, 1)

        # Store predictions, labels, and features
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

        # Update the confusion matrix
        confusion_matrix_ += confusion_matrix(labels.cpu().numpy(), preds.cpu().numpy(), labels=range(num_labels))

    # Calculate accuracy for the subtest set
    accuracy = accuracy_score(all_labels, all_preds)
    print(f'\n\nAccuracy of audio model on {name} set: {accuracy}\n')

    # Normalize the confusion matrix
    confusion_matrix_ = confusion_matrix_ / np.sum(confusion_matrix_, axis=1, keepdims=True)

    # Plot the confusion matrix for the subtest set
    plt.figure(figsize=(10, 8))
    sns.heatmap(confusion_matrix_, annot=True, fmt=".2%", xticklabels=labels_list, yticklabels=labels_list, cmap="Purples")
    plt.title('Audio Prediction on', name,'Set')
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.show()

    # Returns predictions and probabilities
    return all_preds, all_outputs


# Video Audio Dataset

In [None]:
from torch.utils.data import Dataset
import torch

class VideoAudioDataset(Dataset):
    """A custom dataset class for handling predictions and corresponding labels.

    Args:
        preds: List of predictions.
        labels: List of labels corresponding to each prediction.
    """

    def __init__(self, preds, labels):
        """Initializes the dataset with predictions and corresponding labels."""
        self.preds = preds  # List of predictions
        self.labels = labels  # List of labels

    def __len__(self):
        """Returns the total number of samples in the dataset."""
        return len(self.labels)

    def __getitem__(self, idx):
        """Retrieves a sample from the dataset at the given index.

        Args:
            idx: Index of the sample to retrieve.

        Returns:
            A tuple containing the prediction and its corresponding label as tensors.
        """
        pred = self.preds[idx]  # Get the prediction for the current index
        label = self.labels[idx]  # Get the corresponding label

        # Convert the prediction and label to PyTorch tensors
        return torch.tensor(pred, dtype=torch.float32), torch.tensor(label, dtype=torch.long)


# Video Audio Model

In [None]:
import torch
import torch.nn as nn
from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, accuracy_score

class VideoAudioModel(nn.Module):
    """A neural network model for audio-video classification.

    Args:
        num_labels: Number of output labels (default is 8).
    """

    def __init__(self, num_labels=8):
        """Initializes the VideoAudioModel with specified number of output labels."""
        super(VideoAudioModel, self).__init__()

        # Fully connected layers
        self.fc1 = nn.Linear(24, 128)
        self.fc6 = nn.Linear(128, num_labels)

        # Dropout layer for regularization
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        """Defines the forward pass of the VideoAudioModel.

        Args:
            x: Input tensor.

        Returns:
            Output tensor after passing through the network.
        """
        x = self.dropout(torch.relu(self.fc1(x)))
        x = self.fc6(x)
        return x

    def train_model(self, train_dataloader, test_dataloader):
        """Trains the model and evaluates its performance.

        Args:
            train_dataloader: DataLoader for the training dataset.
            test_dataloader: DataLoader for the validation dataset.
        """
        # Label names for plotting
        labels_list = ['Neutral', 'Calm', 'Happy', 'Sad', 'Angry', 'Fearful', 'Disgusted', 'Surprised']
        num_labels = len(labels_list)

        # Initialize a confusion matrix
        confusion_matrix_ = np.zeros((num_labels, num_labels))

        # Set the device for training (GPU if available, otherwise CPU)
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.to(device)

        # Lists to store training losses and accuracy scores
        losses = []
        accuracy_scores = dict()

        # Number of training epochs
        num_epochs = 100

        # Define the loss function and optimizer
        criterion = nn.CrossEntropyLoss()
        optimizer = torch.optim.Adam(self.parameters(), lr=0.001, weight_decay=0.001)

        # Store the initial model state
        last_model = self.state_dict()

        # Training loop
        for epoch in range(num_epochs):
            self.train()  # Set the model to training mode

            total_loss = 0.0

            # Iterate over the training dataset
            for inputs, labels in tqdm(train_dataloader):
                pred = inputs.to(device)
                labels = labels.to(device)

                # Forward pass
                outputs = self(pred)
                loss = criterion(outputs, labels)

                # Backward pass and optimization
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

                total_loss += loss.item()

            print(f'Epoch {epoch+1}/{num_epochs}, Loss: {total_loss}')

            self.eval()  # Set the model to evaluation mode

            all_preds = []
            all_labels = []

            # Evaluate the model on the validation dataset
            for inputs, labels in test_dataloader:
                pred = inputs.to(device)
                labels = labels.to(device)

                # Forward pass
                outputs = self(pred)
                _, preds = torch.max(outputs, 1)

                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())

                # Update the confusion matrix
                confusion_matrix_ += confusion_matrix(labels.cpu().numpy(), preds.cpu().numpy(), labels=range(num_labels))

            # Calculate validation accuracy
            accuracy = accuracy_score(all_labels, all_preds)
            print(f'Validation accuracy: {accuracy}')

            # Store the model state with the highest accuracy
            accuracy_scores[accuracy] = self.state_dict()

            losses.append(total_loss)

            # Track the best model based on validation accuracy
            best_accuracy = max(accuracy_scores.keys())
            last_model = self.state_dict()

        # Save the best model
        best_model = accuracy_scores[best_accuracy]
        print("\n\nBest validation Audio-Video model accuracy: ", best_accuracy, "\n")
        torch.save(best_model, '/kaggle/working/models/audio_video_model.pth')

        # Save the last model state
        torch.save(last_model, '/kaggle/working/models/last_epoch_audio_video_model.pth')

        # Plot training loss
        plt.plot(losses)
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.title('Audio-Video Training Loss')
        plt.show()

        # Plot validation accuracy
        plt.plot(list(accuracy_scores.keys()))
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.title('Audio-Video Validation Accuracy')
        plt.show()

        # Normalize the confusion matrix
        confusion_matrix_ = confusion_matrix_ / np.sum(confusion_matrix_, axis=1, keepdims=True)

        # Plot the confusion matrix
        plt.figure(figsize=(10, 8))
        sns.heatmap(confusion_matrix_, annot=True, fmt=".2%", xticklabels=labels_list, yticklabels=labels_list, cmap="Blues")
        plt.title('Video-Audio Prediction on Test Set in Training Mode')
        plt.ylabel('True Label')
        plt.xlabel('Predicted Label')
        plt.show()

    def test_model(self, test_dataloader):
        """Evaluates the model on the test dataset.

        Args:
            test_dataloader: DataLoader for the test dataset.

        Returns:
            A tuple containing all predictions and outputs.
        """
        # Label names for plotting
        labels_list = ['Neutral', 'Calm', 'Happy', 'Sad', 'Angry', 'Fearful', 'Disgusted', 'Surprised']
        num_labels = len(labels_list)

        # Initialize a confusion matrix
        confusion_matrix_ = np.zeros((num_labels, num_labels))

        # Set the device for evaluation (GPU if available, otherwise CPU)
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.to(device)

        # Load the best model state
        model_path = '/kaggle/working/models/audio_video_model.pth'
        self.load_state_dict(torch.load(model_path))
        self.eval()

        all_preds = []
        all_labels = []
        all_outputs = []

        # Evaluate the model on the test dataset
        for inputs, labels in test_dataloader:
            pred = inputs.to(device)
            labels = labels.to(device)

            # Forward pass
            outputs = self(pred)
            all_outputs.append(outputs)
            _, preds = torch.max(outputs, 1)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

            # Update the confusion matrix
            confusion_matrix_ += confusion_matrix(labels.cpu().numpy(), preds.cpu().numpy(), labels=range(num_labels))

        # Calculate test accuracy
        accuracy = accuracy_score(all_labels, all_preds)
        print(f'\n\nAudio-Video model accuracy on test set: {accuracy}')

        # Normalize the confusion matrix
        confusion_matrix_ = confusion_matrix_ / np.sum(confusion_matrix_, axis=1, keepdims=True)

        # Plot the confusion matrix
        plt.figure(figsize=(10, 8))
        sns.heatmap(confusion_matrix_, annot=True, fmt=".2%", xticklabels=labels_list, yticklabels=labels_list, cmap="Blues")
        plt.title('Video-Audio Prediction on Test Set')
        plt.ylabel('True Label')
        plt.xlabel('Predicted Label')
        plt.show()

        # Return all predictions and outputs
        return all_preds, all_outputs


# Main Function

In [None]:
# Load video/audio paths from dataset
path_audio, labels = load_dataset('/kaggle/input/ravdess-emotional-speech-video/RAVDESS dataset')

# Divide dataset into subtra<in dataset to train audio and video models, 
# subtest dataset to test audio and video models and train the combined model, test the combined model
train_data_audio, test_data_audio, train_labels, test_labels = train_test_split(path_audio, labels, test_size=0.2, random_state= 15)
subtrain_data_audio, subtest_data_audio, subtrain_labels, subtest_labels = train_test_split(train_data_audio, train_labels, test_size=0.3, random_state= 15)

# Obtain from video/audio paths to features video paths
train_data_video = from_audiopath_to_videopath(train_data_audio)
test_data_video = from_audiopath_to_videopath(test_data_audio)
subtrain_data_video = from_audiopath_to_videopath(subtrain_data_audio)
subtest_data_video = from_audiopath_to_videopath(subtest_data_audio)

print(f"Audio-video model test size: audio {len(test_data_audio)}, video {len(test_data_video)}")
print(f"Submodels train size: audio {len(subtrain_data_audio)}, video {len(subtrain_data_video)}")
print(f"Submodels test size: audio {len(subtest_data_audio)}, video {len(subtest_data_video)}")

print()

emotions_to_idx = {'ne':0, 'ca': 1, 'ha': 2, 'sa': 3, 'an': 4, 'fe': 5, 'di': 6, 'su': 7}

# Train GRU and LSTM video model
scaler = train_video_model(subtrain_data_video, subtest_data_video, emotions_to_idx)

# Test GRU and LSTM video model and obtain prediction logits for subtest and test dataset used by the combined model
gru_subtest_video_predict_probs, gru_subtest_video_predict, lstm_subtest_video_predict_probs, lstm_subtest_video_predict = video_predict(scaler, subtest_data_video, '/kaggle/working/Video_track/subtest_features_rav.pkl', emotions_to_idx, "Video prediction on subtest set")
gru_test_video_predict_probs, gru_test_video_predict, lstm_test_video_predict_probs, lstm_test_video_predict = video_predict(scaler, test_data_video, '/kaggle/working/Video_track/test_features_rav.pkl', emotions_to_idx, "Video prediction on subtest set")


# Train audio model
subtrain_dataloader_audio, subtest_dataloader_audio, test_dataloader_audio = create_audio_model(subtrain_data_audio, subtest_data_audio, test_data_audio)
# Test audio model and obtain prediction logits for subtest and test dataset used by the combined model
subtest_audio_predict, subtest_audio_predict_probs = audio_predict(subtest_dataloader_audio, name="Subtest")
test_audio_predict, test_audio_predict_probs = audio_predict(test_dataloader_audio, name="Test")

# Stack the video and audio predictions for subtest dataset to train the combined model
lstm_subtest_video_predict_numpy = np.array(lstm_subtest_video_predict_probs)
gru_subtest_video_predict_numpy = np.array(gru_subtest_video_predict_probs)
subtest_audio_predict_numpy = np.array(subtest_audio_predict_probs)
subtest_audioVideo_predict = np.column_stack((gru_subtest_video_predict_numpy, lstm_subtest_video_predict_numpy, subtest_audio_predict_numpy))

# Stack the video and audio predictions for test dataset to train the combined model
gru_test_video_predict_numpy = np.array(gru_test_video_predict_probs)
lstm_test_video_predict_numpy = np.array(lstm_test_video_predict_probs)
test_audio_predict_numpy = np.array(test_audio_predict_probs)
test_audioVideo_predict = np.column_stack((gru_test_video_predict_numpy, lstm_test_video_predict_numpy, test_audio_predict_numpy))

# Create VideoAudio dataset and dataloader for the combined model
subtest_dataset = VideoAudioDataset(subtest_audioVideo_predict, subtest_labels)
test_dataset = VideoAudioDataset(test_audioVideo_predict, test_labels)

BATCH_SIZE = 16

subtest_dataloader = DataLoader(subtest_dataset, batch_size=BATCH_SIZE, shuffle = False)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle = False)

# Train and Test audio model
audioVideo_model = VideoAudioModel()
audioVideo_model.train_model(subtest_dataloader, test_dataloader)
def_preds, def_outputs = audioVideo_model.test_model(test_dataloader)

# Show grafic for correctness prediction of the models
true_labels = np.array(test_labels)
predictions_model_1 = np.array(test_audio_predict)
predictions_model_2 = np.array(lstm_test_video_predict)
predictions_model_3 = np.array(gru_test_video_predict)
predictions_model_4 = np.array(def_preds)

correctness = np.zeros((len(test_labels), 4)) 

correctness[:, 0] = (predictions_model_1 == true_labels)
correctness[:, 1] = (predictions_model_2 == true_labels)
correctness[:, 2] = (predictions_model_3 == true_labels)
correctness[:, 3] = (predictions_model_4 == true_labels)

plt.figure(figsize=(12, 10))
sns.heatmap(correctness, annot=False, cmap='Greens', cbar=False, xticklabels=['Audio', 'Video lstm', 'Video gru', 'Audio-Video'], yticklabels=[])
plt.xlabel('Model')
plt.ylabel('Predictions')
plt.title('Correctness of Predictions by Models')
plt.show()

## Predictions Analysis

In [None]:
true_labels = np.array(test_labels)  # True labels
ensemble_preds = np.array(def_preds)  # Final predictions of the Ensemble

# Predictions of individual models
predictions = {
    "Audio": np.array(test_audio_predict),
    "LSTM": np.array(lstm_test_video_predict),
    "GRU": np.array(gru_test_video_predict),
}

total_samples = len(true_labels)

# Initialize counters
positive_contributions = {model: 0 for model in predictions}
negative_influence = {model: 0 for model in predictions}
positive_exclusive = {model: 0 for model in predictions}
negative_exclusive = {model: 0 for model in predictions}
model_correct_ensemble_wrong = {model: 0 for model in predictions}

# Analysis of predictions
for model, preds in predictions.items():
    positive_contributions[model] = np.sum((preds == ensemble_preds) & (ensemble_preds == true_labels))
    negative_influence[model] = np.sum((preds == ensemble_preds) & (ensemble_preds != true_labels))

    # Exclusive influence
    for i in range(total_samples):
        other_models = [predictions[m][i] for m in predictions if m != model]

        if preds[i] == ensemble_preds[i]:
            if ensemble_preds[i] == true_labels[i]:
                if all(other != preds[i] for other in other_models):
                    positive_exclusive[model] += 1
            else:
                if all(other != preds[i] for other in other_models):
                    negative_exclusive[model] += 1

        # New metric: the model was correct, but the ensemble was wrong
        if preds[i] == true_labels[i] and ensemble_preds[i] != true_labels[i]:
            model_correct_ensemble_wrong[model] += 1

print(f"The ensemble model predicted the correct label {np.sum(ensemble_preds == true_labels)/total_samples}% of the samples")
print("-" * 50)
# Convert to percentage
for model in predictions:
    positive_contributions[model] = (positive_contributions[model] / total_samples) * 100
    negative_influence[model] = (negative_influence[model] / total_samples) * 100
    positive_exclusive[model] = (positive_exclusive[model] / total_samples) * 100
    negative_exclusive[model] = (negative_exclusive[model] / total_samples) * 100
    model_correct_ensemble_wrong[model] = (model_correct_ensemble_wrong[model] / total_samples) * 100

    print(f"{model} contributed positively {positive_contributions[model]:.2f}% of the time")
    print(f"{model} influenced negatively {negative_influence[model]:.2f}% of the time")
    print(f"{model} influenced positively exclusively {positive_exclusive[model]:.2f}% of the time")
    print(f"{model} influenced negatively exclusively {negative_exclusive[model]:.2f}% of the time")
    print(f"{model} was correct while the Ensemble was wrong {model_correct_ensemble_wrong[model]:.2f}% of the time")
    print("-" * 50)
