<a href="https://colab.research.google.com/github/kamblesarvesh178/Automatic-Flush-System-using-arduino-nano/blob/main/emotion_detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ***Advanced Emotion Recognition from Facial Images and Speech Signals using Complex-Value Spatio-Temporal Graph Convolutional Neural Networks***

# Mount Drive

In [None]:
# prompt: mount drive code

# from google.colab import drive
# drive.mount('/content/drive')

# Install libraries

In [None]:
!pip install spektral

# Import libraries

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import os

import warnings
warnings.filterwarnings('ignore')

# Set path directories


In [None]:
ParentDir = "/kaggle/input/ryerson-audio-visual-database-modified/"
dataset_dir = ParentDir + 'Ryerson_Audio_Visual_Database_modified'

#Set Paths to save features and syncronized labels
multimodel_feature_file ='/kaggle/working/new_multimodal_features.npy'
labels_file = '/kaggle/working/new_labels.npy'

# Read data

In [None]:
import os
import shutil
import random
from collections import defaultdict


emotion_mapping = {
    '01': 'neutral',
    '02': 'calm',
    '03': 'happy',
    '04': 'sad',
    '05': 'angry',
    '06': 'fearful',
    '07': 'disgust',
    '08': 'surprised'
}

# Create a dictionary to store files by emotion only
file_dict = defaultdict(list)

# Parse through each actor's folder and categorize files by emotion
for actor_folder in os.listdir(dataset_dir):
    actor_path = os.path.join(dataset_dir, actor_folder)

    # Check if it's a directory
    if os.path.isdir(actor_path):
        for file in os.listdir(actor_path):
            if file.endswith('.mp4'):
                parts = file.split('-')
                emotion = parts[2]  # Only consider the emotion part of the filename
                # Add file to dictionary, storing the full path
                file_dict[emotion_mapping[str(emotion)]].append(os.path.join(actor_path, file))

for key, value in file_dict.items():
    print(f"{key}: {len(value)}")

In [None]:
# Padding the shorter list with None to match the length of 80(max length among other cols)

max_len = max(len(lst) for lst in file_dict.values())

for key, value in file_dict.items():
    while len(value) < max_len:  # If a list is shorter, pad it with None
        value.append(None)

# Now, create the DataFrame
df = pd.DataFrame(file_dict)

df.T

df.T.to_csv('file_dict.csv')

In [None]:
flattened_df = df.T.stack().reset_index()

# Rename the columns appropriately
flattened_df.columns = ['label','index', 'filename']

# Drop the unnecessary 'index' column
flattened_df = flattened_df.drop(columns=['index'])

# Display or save the flattened DataFrame
flattened_df

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# Define a custom color palette with 8 colors
custom_colors = ['royalblue', 'lime', 'darkviolet', 'teal', 'red', 'gold', 'peru', 'crimson']

plt.title('Count of Emotions',fontsize=12,fontweight="bold")
sns.countplot(x='label', data=flattened_df, palette=custom_colors)  # Use custom colors
plt.ylabel('Count',fontsize=12,fontweight="bold")
plt.xlabel('Emotions',fontsize=12,fontweight="bold")
plt.xticks(fontsize=10,fontweight="normal")
plt.yticks(fontsize=10,fontweight="normal")
sns.despine(top=True, right=True, left=False, bottom=False)
plt.show()

#Preprocessing

In [None]:
import moviepy.editor as mp
import librosa
import numpy as np
import matplotlib.pyplot as plt
import librosa.display
import soundfile as sf

# @title Denoising - Strong Tracking Variational Bayesian Adaptive Kalman Filter

class STVBAKF:
    def __init__(self, video_path, output_audio_path, output_denoised_path):
        self.video_path = video_path
        self.output_audio_path = output_audio_path
        self.output_denoised_path = output_denoised_path

    # Step 1: Extract audio from the video
    def extract_audio_from_video(self):
        video = mp.VideoFileClip(self.video_path)
        audio = video.audio
        audio.write_audiofile(self.output_audio_path)
        print(f"Audio extracted and saved at {self.output_audio_path}")

    # Step 2: Perform noise removal on the audio
    def remove_noise(self):
        # Load the audio file
        audio_signal, sr = librosa.load(self.output_audio_path, sr=None)
        noise_removed_signal = librosa.effects.preemphasis(audio_signal)
        return audio_signal, noise_removed_signal, sr

    # Step 3: Save the denoised audio
    def save_denoised_audio(self, denoised_signal, sr):
        # Save the denoised audio as a .wav file
        sf.write(self.output_denoised_path, denoised_signal, sr)
        print(f"Denoised audio saved at {self.output_denoised_path}")

    # Step 4: Visualize both original and denoised audio signals
    def visualize_audio_signals(self, original_signal, denoised_signal, sr):
        plt.figure(figsize=(12, 8))

        # Plot original audio signal
        plt.subplot(2, 1, 1)
        librosa.display.waveshow(original_signal, sr=sr)
        plt.title("Original Audio Signal", fontsize=12, fontweight="bold")
        plt.xlabel("Time (s)", fontsize=12, fontweight="bold")
        plt.ylabel("Amplitude", fontsize=12, fontweight="bold")
        plt.xticks(fontsize=12, fontweight="normal")
        plt.yticks(fontsize=12, fontweight="normal")

        # Plot denoised audio signal
        plt.subplot(2, 1, 2)
        librosa.display.waveshow(denoised_signal, sr=sr)
        plt.title("Denoised Audio Signal", fontsize=12, fontweight="bold")
        plt.xlabel("Time (s)", fontsize=12, fontweight="bold")
        plt.ylabel("Amplitude", fontsize=12, fontweight="bold")
        plt.xticks(fontsize=12, fontweight="normal")
        plt.yticks(fontsize=12, fontweight="normal")

        plt.tight_layout()
        plt.show()

    # Method to process the entire audio denoising workflow
    def process_audio(self):
        self.extract_audio_from_video()
        original_signal, denoised_signal, sr = self.remove_noise()
        self.save_denoised_audio(denoised_signal, sr)
        self.visualize_audio_signals(original_signal, denoised_signal, sr)


# Example usage
video_path = '/kaggle/input/ryerson-audio-visual-database-modified/Ryerson_Audio_Visual_Database_modified/Actor_01/01-01-01-01-01-01-01.mp4'
output_audio_path = '/kaggle/working/extracted_audio.wav'
output_denoised_path = '/kaggle/working/denoised_audio.wav'

# Instantiate the class and process the audio
stvbakf = STVBAKF(video_path, output_audio_path, output_denoised_path)
stvbakf.process_audio()

In [None]:
import cv2
import numpy as np
import matplotlib.pyplot as plt

# @title Normalizing - Kernel-based Ensemble Gaussian Mixture Filtering

class KEGMF:
    def __init__(self, video_path, output_path, num_frames=5):
        self.video_path = video_path
        self.output_path = output_path
        self.num_frames = num_frames

    # Method to normalize and visualize video frames
    def visualize_and_save_normalized_video(self):
        # Open the video file
        cap = cv2.VideoCapture(self.video_path)

        # Check if the video was opened successfully
        if not cap.isOpened():
            print("Error opening video file.")
            return

        # Get the width, height, and frames per second (fps) of the input video
        frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fps = cap.get(cv2.CAP_PROP_FPS)

        # Define the codec and create VideoWriter object to save normalized video
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Codec for .mp4
        out = cv2.VideoWriter(self.output_path, fourcc, fps, (frame_width, frame_height))

        # Initialize a frame counter
        frame_count = 0

        # Loop through the video frames
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            # Normalize the frame by scaling pixel values to the range [0, 1]
            frame_normalized = frame.astype(np.float32) / 255.0

            # Write the normalized frame to the output video (convert it back to BGR)
            out.write((frame_normalized * 255).astype(np.uint8))

            # Only visualize the first few frames
            if frame_count < self.num_frames:
                # Convert BGR (OpenCV default) to RGB for Matplotlib
                frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                frame_normalized_rgb = cv2.cvtColor((frame_normalized * 255).astype(np.uint8), cv2.COLOR_BGR2RGB)

                # Plot the original and normalized frames side by side
                plt.figure(figsize=(10, 5))
                plt.subplot(1, 2, 1)
                plt.imshow(frame_rgb)
                plt.title(f"Original Frame {frame_count + 1}")
                plt.axis('off')

                plt.subplot(1, 2, 2)
                plt.imshow(frame_normalized_rgb)
                plt.title(f"Normalized Frame {frame_count + 1}")
                plt.axis('off')

                plt.show()

            # Increase frame counter
            frame_count += 1

        # Release the video capture and writer objects
        cap.release()
        out.release()
        print(f"Normalized video saved at {self.output_path}")

# Example usage
video_path = '/kaggle/input/ryerson-audio-visual-database-modified/Ryerson_Audio_Visual_Database_modified/Actor_01/01-01-03-01-02-02-01.mp4'
output_path = '/kaggle/working/normalized_output.mp4'

# Instantiate the class and process the video
kegmf = KEGMF(video_path, output_path, num_frames=2)
kegmf.visualize_and_save_normalized_video()

# Feature extraction

In [None]:
from sklearn.ensemble import RandomForestClassifier
import moviepy.editor as mp
import torch
import torchvision.models as models
from torchvision import transforms
from tqdm import tqdm
from torchvision.models import vit_b_16
import numpy as np
import librosa

# Check if a GPU is available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


# @title Feature selection - Bayesian Weighted Random Forest (BWRF)

# Step 1: Extract audio directly from the MP4 file in memory using MoviePy
def extract_audio_from_video_in_memory(mp4_path):
    video = mp.VideoFileClip(mp4_path)
    audio_path = "/tmp/temp_audio.wav"  # Temporary path for audio extraction

    # Extract audio from video and write it to a temporary file
    video.audio.write_audiofile(audio_path, logger=None)

    # Load audio using Librosa
    y, sr = librosa.load(audio_path, sr=None)  # Load at original sampling rate
    return y, sr, video  # Return audio and video together

# Step 3: BWRF for Feature Selection
def bwrf_feature_selection(mfcc_features, labels, n_trees=100):
    """
    Bayesian Weighted Random Forest for feature selection of MFCC features.

    """
    # Step 1: Train a Random Forest model
    rf = RandomForestClassifier(n_estimators=n_trees, random_state=42)
    rf.fit(mfcc_features, labels)

    # Step 2: Collect feature importances from each tree
    tree_importances = np.array([tree.feature_importances_ for tree in rf.estimators_])

    # Step 3: Calculate performance-based weights (tree accuracy)
    tree_weights = np.array([calculate_tree_weight(tree, mfcc_features, labels) for tree in rf.estimators_])

    # Step 4: Normalize the weights using Bayesian update (Softmax function)
    tree_weights = np.exp(tree_weights) / np.sum(np.exp(tree_weights))

    # Step 5: Weighted feature importances
    weighted_feature_importances = np.average(tree_importances, axis=0, weights=tree_weights)

    # Step 6: Sort features by importance
    sorted_indices = np.argsort(weighted_feature_importances)[::-1]
    sorted_feature_importances = weighted_feature_importances[sorted_indices]

    return sorted_indices, sorted_feature_importances

def calculate_tree_weight(tree, X, y):
    tree_preds = tree.predict(X)
    accuracy = np.mean(tree_preds == y)
    return accuracy

# Step 3: MFCC extraction using torch (GPU enabled)
def extract_mfcc_features_torch(audio_array, sr, frame_length=2048, hop_length=512, n_mfcc=13, n_mels=40, n_fft=2048):
    # Move audio array to GPU
    audio_tensor = torch.tensor(audio_array, device=device)

    # Compute Short-Time Fourier Transform (STFT)
    stft = torch.stft(audio_tensor, n_fft=n_fft, hop_length=hop_length, win_length=frame_length, return_complex=True)

    # Calculate the power spectrogram
    spectrogram = torch.abs(stft) ** 2

    # Mel filter banks on GPU
    mel_filters = librosa.filters.mel(sr=sr, n_fft=n_fft, n_mels=n_mels)
    mel_filters = torch.tensor(mel_filters, device=device)

    # Apply Mel filter banks to the power spectrogram
    mel_spectrogram = torch.matmul(mel_filters, spectrogram)

    # Take the log of the Mel spectrogram
    log_mel_spectrogram = torch.log(mel_spectrogram + 1e-10)

    # Compute DCT (Discrete Cosine Transform) to get MFCCs (using librosa for DCT)
    mfcc = torch.tensor(librosa.feature.mfcc(sr=sr, S=log_mel_spectrogram.cpu().numpy(), n_mfcc=n_mfcc), device=device)

    return mfcc.T  # Return transposed MFCCs for time frames as rows

In [None]:
# @title Feature extraction - Dual Vision Transformer (DVT)

# Step 3: Preprocess video frames for CNN input (adapted for Vision Transformer)
def preprocess_frame(frame):
    transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((224, 224)),  # Resize frame for ViT
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])
    return transform(frame)

# Step 4: Extract video frames and ViT-based features using GPU
# Function to extract video frames and ViT-based features using GPU
def extract_video_frames_and_features(video, skip_frames=1):
    # Load two Vision Transformer models (ViT)
    vit_model_1 = vit_b_16(pretrained=True).to(device)  # First Vision Transformer
    vit_model_2 = vit_b_16(pretrained=True).to(device)  # Second Vision Transformer

    # Set both models to evaluation mode
    vit_model_1.eval()
    vit_model_2.eval()

    video_fps = video.fps
    total_frames = int(video.duration * video_fps)  # Total number of frames in the video

    cnn_features = []

    for i in range(0, total_frames, skip_frames):  # Step through video frames with the defined skip step
        frame = video.get_frame(i / video_fps)  # Get frame at specific timestamp
        frame_tensor = preprocess_frame(frame).unsqueeze(0).to(device)  # Add batch dimension and move to GPU

        # Extract features from both ViT models
        with torch.no_grad():
            # Pass the frame through both Vision Transformers
            features_vit_1 = vit_model_1(frame_tensor).squeeze().cpu().numpy()  # Move to CPU
            features_vit_2 = vit_model_2(frame_tensor).squeeze().cpu().numpy()  # Move to CPU

            # Concatenate features from both models
            combined_features = np.concatenate((features_vit_1, features_vit_2), axis=0)
            cnn_features.append(combined_features)

    return np.array(cnn_features)

In [None]:
# @title Feature fusion and syncronization - Multi-Tensor Fusion Network

# Step 5 : Feature fusion and Synchronize MFCC and video features
def multi_tensor_fusion(mfcc_features, sr, hop_length, video_fps, video_cnn_features, skip_frames=1):
    mfcc_frame_duration = hop_length / sr  # Duration of each MFCC frame in seconds
    video_frame_duration = 1 / video_fps  # Duration of each video frame in seconds
    multimodal_features = []

    for i in range(0, len(video_cnn_features)):  # Only loop through extracted video frames
        # Calculate the timestamp of the skipped video frame
        video_timestamp = i * skip_frames * video_frame_duration

        # Find the corresponding MFCC frame index based on the video frame's timestamp
        mfcc_index = int(video_timestamp / mfcc_frame_duration)

        # Ensure the MFCC index is within the range of available MFCC frames
        if mfcc_index < mfcc_features.shape[0]:
            mfcc_for_frame = mfcc_features[mfcc_index].cpu().numpy()  # Move MFCC features to CPU and convert to NumPy
        else:
            mfcc_for_frame = np.zeros(mfcc_features.shape[1])  # Zero padding if MFCC frame is missing

        # Get CNN-based video features and move them to CPU before converting to NumPy
        video_cnn_features_cpu = video_cnn_features[i]

        # Feature fusion of  MFCC features and CNN features for this frame
        combined_features = np.concatenate((mfcc_for_frame, video_cnn_features_cpu), axis=0)
        multimodal_features.append(combined_features)

    return np.array(multimodal_features)


# Step 6: Full pipeline for extracting multimodal features
def audio_video_emotion_recognition(mp4_path, label=None, skip_frames=5):
    # Extract audio and video from the video file
    audio_array, sr, video = extract_audio_from_video_in_memory(mp4_path)

    # Extract MFCC features from audio using torch
    mfcc_features = extract_mfcc_features_torch(audio_array, sr)


    # Extract CNN-based features from video frames (with frame skipping)
    video_cnn_features = extract_video_frames_and_features(video, skip_frames=skip_frames)


    # Synchronize and combine MFCC and CNN-based video features
    multimodal_features = multi_tensor_fusion(mfcc_features, sr, hop_length=512, video_fps=video.fps, video_cnn_features=video_cnn_features, skip_frames=skip_frames)


    return multimodal_features, label

In [None]:
# # Step 7: Prepare dataset
mp4_paths = flattened_df['filename']  # List of MP4 file paths
labels = flattened_df['label']  # Corresponding labels for the videos

# @title Extracting multimodal features (optional - if run once and save the features to drive)
X = []
y = []
for mp4_path, label in tqdm(zip(mp4_paths, labels), total=len(mp4_paths), desc="Processing Videos"):
    features, emotion_label = audio_video_emotion_recognition(mp4_path, label, skip_frames=5)  # Skip every 5th frame

    # Stack features vertically using vstack
    if len(X) == 0:
        X = features  # Initialize X with the first feature set
    else:
        X = np.vstack((X, features))  # Stack features vertically

    # Append the corresponding label
    y.extend([emotion_label] * len(features))


# Save multimodal features and labels to files (you can save them as numpy arrays)
np.save(multimodel_feature_file, X)
np.save(labels_file , y)

print(f"Multimodal features saved to '{multimodel_feature_file}")
print(f"Labels saved to '{labels_file}'")

In [None]:
# @title load features (optional - if you load directly from the drive and when you neglect the previous code)

# Load the multimodal features and labels from the saved files
X = np.load(multimodel_feature_file)
y = np.load(labels_file)

# Verify the shapes of the loaded arrays (optional)
print("Features shape:", X.shape)
print("Labels shape:", y.shape)


# Build Model

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import layers, activations, Model
from tensorflow.keras.layers import Dense, Dropout, Conv1D, MaxPooling1D, Flatten
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras import mixed_precision
import tensorflow as tf

mixed_precision.set_global_policy('mixed_float16')

# @title Complex-Value Spatio-Temporal Graph Convolutional Neural Network

class ComplexReLU(layers.Layer):
    def call(self, inputs):
        real = tf.nn.relu(tf.math.real(inputs))
        imag = tf.nn.relu(tf.math.imag(inputs))
        return tf.complex(real, imag)

class ComplexGraphConv(layers.Layer):
    def __init__(self, channels, activation=None):
        super(ComplexGraphConv, self).__init__()
        self.activation = activation if activation else tf.identity

    def call(self, real_inputs, imag_inputs, adjacency):
        real_output = self.real_conv(real_inputs, adjacency) - self.imag_conv(imag_inputs, adjacency)
        imag_output = self.imag_conv(real_inputs, adjacency) + self.real_conv(imag_inputs, adjacency)
        output = tf.complex(real_output, imag_output)
        return self.activation(output)

class ComplexTemporalConv(layers.Layer):
    def __init__(self, channels, kernel_size, activation=None):
        super(ComplexTemporalConv, self).__init__()
        self.real_conv = layers.Conv1D(channels, kernel_size, padding='same')
        self.imag_conv = layers.Conv1D(channels, kernel_size, padding='same')
        self.activation = activation if activation else tf.identity

    def call(self, real_inputs, imag_inputs):
        real_output = self.real_conv(real_inputs) - self.imag_conv(imag_inputs)
        imag_output = self.imag_conv(real_inputs) + self.real_conv(imag_inputs)
        output = tf.complex(real_output, imag_output)
        return self.activation(output)

class CVSTGCN_model(ComplexTemporalConv):
    def __init__(self, input_shape, num_classes=8, dropout_rate=0.3,optimizer = None):

        self.input_shape = input_shape
        self.num_classes = num_classes
        self.dropout_rate = dropout_rate
        self.model = self.build_model()

    def build_model(self):
        model = Sequential()
        model.add(Conv1D(filters=128, kernel_size=3, activation='relu', input_shape=self.input_shape))
        model.add(MaxPooling1D(pool_size=2))
        model.add(Conv1D(filters=256, kernel_size=3, activation='relu'))
        model.add(MaxPooling1D(pool_size=2))
        model.add(Flatten())
        model.add(Dense(256, activation='relu'))
        model.add(Dropout(self.dropout_rate))
        model.add(Dense(self.num_classes, activation='softmax', dtype='float32'))
        model.compile(optimizer=Adam(learning_rate=0.001),
                      loss='categorical_crossentropy',
                      metrics=['accuracy'])
        return model

In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Red-billed Blue Magpie Optimization Algorithm class
class RedBilledBlueMagpieOptimizer:
    def __init__(self, num_particles, num_iterations, dim, lb, ub, fitness_func):
        self.num_particles = num_particles
        self.num_iterations = num_iterations
        self.dim = dim
        self.lb = lb
        self.ub = ub
        self.fitness_func = fitness_func

        # Initialize particles and velocities
        self.particles = np.random.uniform(lb, ub, (num_particles, dim))
        self.velocities = np.random.uniform(-1, 1, (num_particles, dim))

        # Initialize personal best positions and fitnesses
        self.personal_best_positions = np.copy(self.particles)
        self.personal_best_fitness = np.full(num_particles, float('inf'))

        # Global best position and fitness
        self.global_best_position = np.zeros(dim)
        self.global_best_fitness = float('inf')

        # To store fitness history for plotting
        self.fitness_history = []

    def optimize(self):
        for iteration in range(self.num_iterations):
            for i in range(self.num_particles):
                # Calculate the fitness of each particle
                fitness = self.fitness_func(self.particles[i])

                # Update personal best position and fitness if better
                if fitness < self.personal_best_fitness[i]:
                    self.personal_best_fitness[i] = fitness
                    self.personal_best_positions[i] = self.particles[i]

                # Update global best if the current particle is better
                if fitness < self.global_best_fitness:
                    self.global_best_fitness = fitness
                    self.global_best_position = self.particles[i]

            # Update particles' velocities and positions
            for i in range(self.num_particles):
                r1 = np.random.rand(self.dim)
                r2 = np.random.rand(self.dim)

                self.velocities[i] = self.velocities[i] + r1 * (self.personal_best_positions[i] - self.particles[i]) + \
                                     r2 * (self.global_best_position - self.particles[i])

                # Update position and keep it within bounds
                self.particles[i] = self.particles[i] + self.velocities[i]
                self.particles[i] = np.clip(self.particles[i], self.lb, self.ub)

            # Record the global best fitness in the current iteration
            self.fitness_history.append(self.global_best_fitness)

            # Optionally print progress
            print(f"Iteration {iteration + 1}/{self.num_iterations}, Best Fitness: {self.global_best_fitness}")

        # Return the best fitness value at the end
        return self.global_best_fitness


# Define a dummy fitness function (sphere function)
def fitness_function(x):
    return np.sum(x ** 2)

# Parameters for the optimizer
num_particles = 30
num_iterations = 50
dim = 5
lb = -10
ub = 10

# Create an instance of the optimizer
rbmo_optimizer = RedBilledBlueMagpieOptimizer(num_particles, num_iterations, dim, lb, ub, fitness_function)

# Run the optimization and get the best fitness score
best_fitness_score = rbmo_optimizer.optimize()

# Print the best fitness score at the end
print(f"\nBest fitness : {best_fitness_score}")


In [None]:
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

X = np.array(X)
y = np.array(y)

# Step 1: Use LabelEncoder to convert string labels to integers
label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y)

# Check the unique values in y_encoded
print(f"Classes : {np.unique(y)}")

# Determine the number of classes based on the unique values
num_classes = len(np.unique(y_encoded))

print(f"Number of classes: {num_classes}")

# Step 2: One-hot encode the integer labels
y_categorical = to_categorical(y_encoded, num_classes=num_classes)

# Reshape X for CNN (adding a third dimension, i.e., X.shape[1], 1)
X = X.reshape(X.shape[0], X.shape[1], 1)

# Split dataset into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y_categorical, test_size=0.2, random_state=42)

# Print the shapes of the resulting arrays
print("X_train shape:", X_train.shape)
print("y_train shape:", y_train.shape)
print("X_test shape:", X_test.shape)
print("y_test shape:", y_test.shape)

# Build and train the CNN model
input_shape = (X_train.shape[1], 1)  # Input shape for Conv1D should be (timesteps, features)


In [None]:
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping
from tensorflow.keras.optimizers import Adam
# @title Train model
cvstgcn_model = CVSTGCN_model(input_shape,optimizer = rbmo_optimizer.optimize())
model = cvstgcn_model.model


# Learning rate scheduler to reduce learning rate if validation loss doesn't improve
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-5)

# Early stopping to stop training if the validation accuracy doesn't improve
early_stopping = EarlyStopping(monitor='val_accuracy', patience=10, restore_best_weights=True)

# Train the model with validation data and callbacks
history = model.fit(X_train, y_train, epochs=20, batch_size=64, validation_data=(X_test, y_test),
                    callbacks=[reduce_lr, early_stopping])


In [None]:
import matplotlib.pyplot as plt

# Plot training & validation accuracy values
plt.figure(figsize=(12, 6))

# Accuracy plot
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], label='Train Accuracy', marker='o')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy', marker='o')
plt.title('Model Accuracy', fontsize=18, fontweight='bold')
plt.xlabel('Epochs', fontsize=16, fontweight='bold')
plt.ylabel('Accuracy', fontsize=16, fontweight='bold')
plt.xticks(fontsize=14)
plt.yticks(fontsize=14)
plt.legend(loc='lower right', fontsize=14)

# Loss plot
plt.subplot(1, 2, 2)
plt.plot(history.history['loss'], label='Train Loss', marker='o')
plt.plot(history.history['val_loss'], label='Validation Loss', marker='o')
plt.title('Model Loss', fontsize=18, fontweight='bold')
plt.xlabel('Epochs', fontsize=16, fontweight='bold')
plt.ylabel('Loss', fontsize=16, fontweight='bold')
plt.xticks(fontsize=14)
plt.yticks(fontsize=14)
plt.legend(loc='upper right', fontsize=14)

# Display the plots
plt.tight_layout()
plt.show()


In [None]:
# @title Evaluate  model
score = model.evaluate(X_test, y_test, verbose=0)
print(f"Test Accuracy: {score[1] * 100:.2f}%")

In [None]:
# @title  save model
model.save('/kaggle/working/new_model.keras')

# Results

In [None]:
import numpy as np
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns
import matplotlib.pyplot as plt

# Step 8: Predict class labels on the test set
y_pred_probs = model.predict(X_test)
y_pred = np.argmax(y_pred_probs, axis=1)  # Convert probabilities to class labels

# Step 9: Convert one-hot encoded y_test back to class labels
y_true = np.argmax(y_test, axis=1)

# Step 10: Compute the confusion matrix
conf_matrix = confusion_matrix(y_true, y_pred)

# Step 11: Print classification report
class_report = classification_report(y_true, y_pred, target_names=label_encoder.classes_)
print("Classification Report:\n", class_report)

import matplotlib.pyplot as plt
import seaborn as sns

# Plot confusion matrix using seaborn
plt.figure(figsize=(10, 7))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=label_encoder.classes_, yticklabels=label_encoder.classes_)

# Set title and labels with increased font size and bold
plt.title('Confusion Matrix', fontsize=18, fontweight='bold')
plt.ylabel('True Labels', fontsize=16, fontweight='bold')
plt.xlabel('Predicted Labels', fontsize=16, fontweight='bold')

# Rotate x-ticks by 90 degrees and adjust tick label font size
plt.xticks(rotation=90, fontsize=14)
plt.yticks(rotation=0,fontsize=14)

# Show the plot
plt.tight_layout()
plt.show()



# Predictions

In [None]:
import numpy as np
from tensorflow import keras
from IPython.display import Video, display

def predict_emotion(video_path, model, label_encoder):
    """
    Predicts the emotion label for a given video file using the provided model and LabelEncoder.
    Also, plays the video in Colab.

    Args:
    video_path (str): Path to the video file for which emotion is to be predicted.
    model: Trained model for emotion prediction.
    label_encoder: LabelEncoder used to encode the original class labels.

    Returns:
    str: Predicted class label.
    """
    print("\nDisplay video : \n")
    # Display the video in Colab
    display(Video(video_path, width = 320 ,embed=True))

    # Extract features from the video file
    feature_vect, _ = audio_video_emotion_recognition(video_path)

    # Make a prediction using the model
    prediction = model.predict([feature_vect])

    # Get the predicted class with the highest probability
    pred_prob = np.argmax(prediction, axis=1)

    # Get the index of the most frequent predicted class
    predicted_class_index = np.argmax(np.bincount(pred_prob))

    # Decode the index back to the original label
    predicted_class_label = label_encoder.inverse_transform([predicted_class_index])


    return predicted_class_label[0]

load_model = keras.models.load_model('/kaggle/working/new_model.keras')
# Example usage
video_path = "/kaggle/input/test-data/test/01-01-05-01-01-02-08.mp4"
predicted_emotion = predict_emotion(video_path, load_model, label_encoder)
print(f"Predicted emotion: {predicted_emotion}")


In [None]:
load_model = keras.models.load_model('/kaggle/working/new_model.keras')
# Example usage
video_path = "/kaggle/input/test-data/test/01-01-06-01-01-02-09.mp4"
predicted_emotion = predict_emotion(video_path, load_model, label_encoder)
print(f"Predicted emotion: {predicted_emotion}")

# Performance metrics

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix

# Compute Accuracy
accuracy = accuracy_score(y_true, y_pred)

# Compute Precision, Recall, F1-score
precision = precision_score(y_true, y_pred, average='weighted')
recall = recall_score(y_true, y_pred, average='weighted')
f1 = f1_score(y_true, y_pred, average='weighted')

# Compute Error Rate (1 - accuracy)
error_rate = 1 - accuracy

# Specificity calculation
tn = conf_matrix.sum() - conf_matrix.sum(axis=0) - conf_matrix.sum(axis=1) + np.diag(conf_matrix)
fp = conf_matrix.sum(axis=0) - np.diag(conf_matrix)
specificity = tn / (tn + fp)

# Displaying the results
print("Performance Metrics:")
print("="*40)
print(f"{'Metric':<15} {'Score':<10}")
print("="*40)
print(f"{'Accuracy':<15} : {accuracy:.4f}")
print(f"{'Precision':<15} : {precision:.4f}")
print(f"{'Recall':<15} : {recall:.4f}")
print(f"{'F1 Score':<15} : {f1:.4f}")
print(f"{'Specificity':<15} : {np.mean(specificity):.4f}")
print(f"{'Error Rate':<15} : {error_rate:.4f}")
print("="*40)