In [None]:
import os
import numpy as np
import cv2
import tensorflow as tf
import random
import numpy as np
import cv2
import face_recognition
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Multiply
from tensorflow.keras.models import Model
from tensorflow.keras.utils import to_categorical

from sklearn.metrics import confusion_matrix, f1_score, accuracy_score
from scipy.stats import mode
import gc  # Import the garbage collection module
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
import sklearn.metrics as metrics
IMG_HEIGHT, IMG_WIDTH = 480, 640
from scipy.stats import mode

NEW_HEIGHT, NEW_WIDTH = IMG_HEIGHT // 2, IMG_WIDTH // 2
start_row, start_col = (IMG_HEIGHT - NEW_HEIGHT) // 2, (IMG_WIDTH - NEW_WIDTH) // 2
end_row, end_col = start_row + NEW_HEIGHT, start_col + NEW_WIDTH

class ChannelAttention(tf.keras.layers.Layer):
    def __init__(self, ratio=8):
        super(ChannelAttention, self).__init__()
        self.ratio = ratio

    def build(self, input_shape):
        self.shared_layer_one = Dense(input_shape[-1] // self.ratio,
                                      activation='relu', 
                                      kernel_initializer='he_normal',
                                      use_bias=True, 
                                      bias_initializer='zeros')
        self.shared_layer_two = Dense(input_shape[-1],
                                      kernel_initializer='he_normal',
                                      use_bias=True, 
                                      bias_initializer='zeros')

    def call(self, inputs):
        avg_pool = tf.keras.layers.GlobalAveragePooling2D()(inputs)
        max_pool = tf.keras.layers.GlobalMaxPooling2D()(inputs)

        avg_pool = self.shared_layer_one(avg_pool)
        max_pool = self.shared_layer_one(max_pool)

        avg_pool = self.shared_layer_two(avg_pool)
        max_pool = self.shared_layer_two(max_pool)

        return avg_pool + max_pool

def sort_cmp (key):
    ans = 0
    for c in key:
        if (c.isdigit() == True):
            ans = ans * 10 + int (c)
    return ans

acc = []
fscore = []
confusion = []

root_folder = "../Dropbox/EAV"
emotions = ["Anger", "Neutral", "Sadness", "Calmness", "Happiness"]
IMG_HEIGHT, IMG_WIDTH = 480, 640
for subfolder in sorted(os.listdir(root_folder), key = sort_cmp):
    subfolder_path = os.path.join(root_folder, subfolder)
    audio_video_folder_path = os.path.join(subfolder_path, "")
    video_path = os.path.join(subfolder_path, 'Video')
    data_array = np.zeros(shape=(20000, 224, 224, 3, 1))
    data, labels = [], []
    datav, labelsv = [], []
    if os.path.exists(video_path):
        
        all_files = sorted(os.listdir(video_path))
        categorized_files_v = {emotion: [] for emotion in emotions}
        for file in all_files:
            if "Speaking" in file and file.endswith(".mp4"):
                for emotion in emotions:
                    if emotion in file:
                        video_path_cur = os.path.join(video_path, file)
                        categorized_files_v[emotion].append(video_path_cur)
        
        print(categorized_files_v["Anger"])
        idx = 0
        for class_index, emotion in enumerate(["Anger", "Neutral", "Sadness", "Calmness", "Happiness"]):
            for file_index, video_path in enumerate(categorized_files_v[emotion]):
                cap = cv2.VideoCapture(video_path)
                total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
                if cap.isOpened():
                    frame_index = 1                    
                    while True:                        
                        ret, frame = cap.read()
                        if not ret:
                            break
                        if frame_index % 3 == 0 and frame_index < 601:
                            resizedImg = cv2.resize(frame, (224, 224)).reshape(224, 224, 3, 1) / 255.0
                            data_array[idx] = resizedImg                            
                            idx += 1   
                            labelsv.append(class_index)
                        frame_index += 1
                    cap.release()
                else:
                    print(f"Error opening video file: {video_path}")
        x_train = []
        y_train = []
        x_test = []
        y_test = []
        
        x_trainv = []
        y_trainv = []
        x_testv = []
        y_testv = []
        
        for i, emotion in enumerate (emotions):
            class_start = i * 4000
            class_middle = class_start + 2000
            class_end = class_middle + 2000

            x_trainv.extend(data_array[class_start:class_middle])
            x_testv.extend(data_array[class_middle:class_end])

            y_trainv.extend(labelsv[class_start:class_middle])
            y_testv.extend(labelsv[class_middle:class_end])
            
        identity_matrix = np.eye(len(emotions))
        train_labels_one_hot = np.array([identity_matrix[label] for label in y_trainv])
        test_labels_one_hot = np.array([identity_matrix[label] for label in y_testv])
        
        x_trainv = np.array(x_trainv)
        x_testv = np.array(x_testv)
        y_trainv = np.array(y_trainv)
        y_testv = np.array(y_testv)
        base_model = ResNet50(weights='imagenet', include_top=False)
        # Add custom layers for 5-class classification
        x = base_model.output
        x = GlobalAveragePooling2D()(x)
        attention = ChannelAttention()(base_model.output)
        x = Multiply()([x, attention])
        x = Dense(1024, activation='relu')(x)
        predictions = Dense(5, activation='softmax')(x)
        model = Model(inputs=base_model.input, outputs=predictions)

        # Freeze the layers of the base model (so they don't get trained)
        for layer in base_model.layers:
            layer.trainable = False

        # Compile the model
        model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
        
        # Train the model
        model.fit(x_trainv, train_labels_one_hot, epochs=100, batch_size=64)

        # Evaluate the model (with test data and labels)
        # Replace resized_data and labels with your test data and labels
        test_loss, test_acc = model.evaluate(x_testv, test_labels_one_hot, verbose=2)
        
        print('\nTest accuracy:', test_acc)
        loss, accuracy = model.evaluate(x_testv, test_labels_one_hot)
        print(f"Video Accuracy: {accuracy * 100:.2f}%")
        y_pred = model.predict (x_testv)
        confusion_emotions = ["Anger", "Neutral", "Sadness", "Calmness", "Happiness"]
        predicted_classes = np.argmax(model.predict(x_testv), axis=1)
        true_classes = np.argmax(test_labels_one_hot, axis=1)
        trials_predicted = predicted_classes.reshape(-1, 50)
        predicted_most_frequent_classes = mode(trials_predicted, axis=1)[0].flatten()
        print(predicted_most_frequent_classes)
        trials_true = true_classes.reshape(-1, 50)
        true_most_frequent_classes = mode(trials_true, axis=1)[0].flatten()
        print(true_most_frequent_classes)
        accur = accuracy_score(true_most_frequent_classes, predicted_most_frequent_classes)
        f1 = f1_score(true_most_frequent_classes, predicted_most_frequent_classes, average='weighted')
        # Confusion Matrix
        cm = confusion_matrix(true_most_frequent_classes, predicted_most_frequent_classes)
        acc.append (accur)
        fscore.append (f1)
        confusion.append (cm)