In [None]:
from my_utils import emotion_dict, sentiment_dict, film_frames_dict

import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.pipeline import make_pipeline, Pipeline

In [None]:
from google.colab import drive
drive.mount('/content/drive')

folder = '/content/drive/My Drive/Colab Notebooks/Dissertation'

os.chdir(folder)

output_dir = './Datasets'
results_dir = './Results'

film_keys = list(emotion_dict.keys())
datasets = ['frame_level', 'video_level']
results = ['eda', 'stats_analysis','modelling']
modelling_folders = ['classification_results','plots']
feature_group = ['rgb_hsv','audio', 'optical_flow']

for dataset in datasets:
    os.makedirs(os.path.join(results_dir, dataset), exist_ok=True)
    for results_type in results:
        os.makedirs(os.path.join(results_dir, dataset, results_type), exist_ok=True)
        if results_type == 'modelling':
            for folder in modelling_folders:
                os.makedirs(os.path.join(results_dir, dataset, results_type, folder), exist_ok=True)

#### Function to load feature sets

In [None]:
# obtain the frame-level and video-level feature sets
def load_datasets(frame_level_path, video_level_path):
    if os.path.exists(frame_level_path):
        frame_df = pd.read_csv(frame_level_path)
    else:
        frame_df = None
        print (f"File {frame_level_path} not found")

    if os.path.exists(video_level_path):
        video_df = pd.read_csv(video_level_path)
    else:
        video_df = None
        print (f"File {video_level_path} not found")

    return frame_df, video_df

# obtain the frame-level and video-level feature sets based on the feature group (rgb_hsv, audio, optical_flow)

def get_feature_group_datasets(feature_group):
    # call load_datasets above
    frame_df, video_df = load_datasets(os.path.join(output_dir, f"{datasets[0]}/features_{feature_group}_df.csv"),
                                       os.path.join(output_dir, f"{datasets[1]}/features_{feature_group}_df.csv"))

    v_shape = ()
    f_shape = ()
    feature_cols = []

    if video_df is not None:
        feature_cols = [col for col in video_df.columns if col not in ["video_id", "emotion", "sentiment"]]
        v_shape = video_df.shape

    if frame_df is not None:
        f_shape = frame_df.shape

        feature_cols = [col for col in frame_df.columns if col not in ["video_id", "frame_id", "emotion", "sentiment"]]

    print(f"Dataframes shape for {feature_group}: frame-level -> {f_shape}, video-level -> {v_shape}")

    return frame_df, video_df, feature_cols


#### Datasets concatenation

In [None]:
# combine different subsets of features by joining different data frames
def join_feature_datasets(feature_group_list):

    # initialise merged_df as None
    v_merged_df = None
    f_merged_df = None

    v_flag = True
    f_flag = True

    merged_feature_group = "_".join([feature_group for feature_group in feature_group_list])

    for feature_group in feature_group_list:
        frame_df, video_df = load_datasets(os.path.join(output_dir, f"{datasets[0]}/features_{feature_group}_df.csv"),
                                           os.path.join(output_dir, f"{datasets[1]}/features_{feature_group}_df.csv"))

        if video_df is not None: # joining video_level sets
            col_id = ["video_id", "emotion", "sentiment"]

            # join based on column ids
            if v_merged_df is None:
                v_merged_df = video_df
            else:
                v_merged_df = pd.merge(v_merged_df, video_df, on=col_id, how="inner")
        else:
            v_flag = False
            print(f"Dataframe for {feature_group} is empty.")

        if frame_df is not None:  # joining frame_level sets
            col_id = ["video_id", "frame_id", "emotion", "sentiment"]

            # join based on column ids
            if f_merged_df is None:
                f_merged_df = frame_df
            else:
                f_merged_df = pd.merge(f_merged_df, frame_df, on=col_id, how="inner")

        else:
            f_flag = False
            print(f"Dataframe for {feature_group} is empty.")


    if v_flag == True:
        file_path0 = os.path.join(output_dir, f"{datasets[0]}/features_{merged_feature_group}_df.csv")
        v_merged_df.to_csv(file_path0, index=False)
        print(f"Merged dataset saved at: {file_path0}")

    if f_flag == True:
        file_path1 = os.path.join(output_dir, f"{datasets[1]}/features_{merged_feature_group}_df.csv")
        f_merged_df.to_csv(file_path1, index=False)
        print(f"Merged dataset saved at: {file_path1}")

    return merged_feature_group

# 3. Modelling

#### Prepare dataset

In [None]:
def get_training_data(df, feature_cols, target_col):

    X = df[feature_cols]
    y = df[target_col]

    # encode emotion labels to numeric values
    le = LabelEncoder()
    y_encoded = le.fit_transform(y)

    # split the data (stratified by the training label to maintain balanced classes)
    test_size = 0.20  # for 80-20 split
    X_train, X_test, y_train, y_test = train_test_split(X, y_encoded, test_size=test_size, random_state=24, stratify=y_encoded)

    return X_train, X_test, y_train, y_test, le

#### Classification pipelines and hyperparameters

In [None]:
# defining classifier parameter grids for hyperparameter tuning

pipelines = {
    'Logistic Regression': Pipeline([
        ('scaler', StandardScaler()),
        ('clf', LogisticRegression(max_iter=3000, random_state=0))
    ]),
    'Random Forest': Pipeline([
        ('scaler', StandardScaler()),
        ('clf', RandomForestClassifier(random_state=0))
    ]),
    'K-Nearest Neighbors': Pipeline([
        ('scaler', StandardScaler()),
        ('clf', KNeighborsClassifier())
    ]),
    'Decision Tree': Pipeline([
        ('scaler', StandardScaler()),
        ('clf', DecisionTreeClassifier(random_state=0))
    ]),
    'SVM': Pipeline([
        ('scaler', StandardScaler()),
        ('clf', SVC(random_state=0))
    ])
}

param_grids = {
    'Logistic Regression': {
        'clf__C': [0.01, 0.1, 1, 10],
        'clf__penalty': ['l2'],
        'clf__solver': ['lbfgs', 'saga']
    },
    'Random Forest': {
        'clf__n_estimators': [50, 100, 200],
        'clf__max_depth': [None, 5, 10, 20],
    },
    'K-Nearest Neighbors': {
        'clf__n_neighbors': [3, 5, 7, 9, 11]
    },
    'Decision Tree': {
        'clf__max_depth': [None, 5, 10, 20],
        'clf__criterion': ['gini', 'entropy']
    },
    'SVM': {
        'clf__C': [0.1, 1, 10],
        'clf__gamma': ['scale', 'auto'],
        'clf__kernel': ['rbf', 'linear']
    }
}

#### Functions to Plot Confusion Matrix & Feature Importances

In [None]:
# generate confusion matrix for evaluating model performance
def plot_confusion_matrix(name, label, label_encoder, y_test, y_pred, feature_group, results_dir=results_dir):
    filename = f"{name}_{label}_{feature_group}_confusion_matrix.png"
    results_dir = os.path.join(results_dir, 'plots', filename)

    # plot confusion matrix
    cm = confusion_matrix(y_test, y_pred)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=label_encoder.classes_, yticklabels=label_encoder.classes_)
    plt.xlabel("Predicted Label")
    plt.ylabel("True Label")
    plt.title(f"Confusion Matrix: {name} ({feature_group})")
    plt.savefig(os.path.join(results_dir), bbox_inches='tight')
    plt.show()


# generate feature importances to visualise most predictive features
def plot_features_importance(name, clf, label, feature_names, feature_group, results_dir=results_dir, top_n=10):

    importance_df = None

    filename = f"{name}_{label}_{feature_group}_feature_importances.png"
    results_dir = os.path.join(results_dir, 'plots', filename)

    if hasattr(clf, "feature_importances_"): # plot feature importances if these are provided by the chosen classifier
        importances = clf.feature_importances_

        # create a dataframe to display and sort feature importances
        importance_df = pd.DataFrame({
            'Feature': feature_names,
            'Importance': importances
        }).sort_values(by='Importance', ascending=False)

        # Print the top 10 important features
        print("Top 10 Important Features:")
        print(importance_df.head(top_n))

        if top_n is not None or top_n < importance_df.shape[0]:
            plot_data = importance_df.head(top_n)
        else:
            plot_data = importance_df

        plt.figure(figsize=(8, 6))
        sns.barplot(x='Importance', y='Feature', data=plot_data, hue='Feature', palette='viridis')
        plt.title(f"Feature Importances: {name} ({feature_group})")
        plt.xlabel("Importance Score")
        plt.ylabel("Feature")
        plt.tight_layout()
        plt.savefig(os.path.join(results_dir), bbox_inches='tight')
        plt.show()

    elif hasattr(clf, "coef_"): # for Logistic Regression / SVM which output coifficients instead of feature importances
        coefs = clf.coef_
        importances = np.mean(np.abs(coefs), axis=0) # calculate avg coeff across all emotion classes

        # create a dataframe to display and sort feature importances
        importance_df = pd.DataFrame({
            'Feature': feature_names,
            'Importance': importances
        }).sort_values(by='Importance', ascending=False)

        # Print the top 10 important features
        print("Top 10 Important Features:")
        print(importance_df.head(top_n))

        plt.figure(figsize=(8, 6))
        sns.barplot(x='Importance', y='Feature', data=importance_df.head(top_n), hue='Feature', palette='viridis')
        plt.title(f"Feature Importances: {name} ({feature_group})")
        plt.xlabel("Importance Score")
        plt.ylabel("Feature")
        plt.tight_layout()
        plt.savefig(os.path.join(results_dir), bbox_inches='tight')
        plt.show()


    else:
        print(f"{name} does not support feature importance or coefficient extraction.")

    return importance_df

#### Functions to save results

In [None]:
# function to save results from classification pipeline
def save_results_to_csv(results, label, feature_group, results_dir=results_dir):

    for name, result_dict in results.items():
        filename = f"{name}_{label}_{feature_group}.csv"
        filepath = os.path.join(results_dir, 'classification_results', filename)
        df = pd.DataFrame([result_dict])
        df.to_csv(filepath, index=False)
        print(f"Saved: {filepath}")


# function to merge all results in a single csv file
def merge_all_results_to_csv(classifiers, label, feature_groups, results_dir=results_dir):
    c = 0
    cr_all_results = None
    sufix = ''

    for name in classifiers:
        for feature_group in feature_groups:
            results_path = os.path.join(results_dir, 'classification_results', f"{name}_{label}_{feature_group}.csv")
            if not os.path.exists(results_path):
                print(f"Attempted to merge: {results_path} not found")
                continue

            df_cr = pd.read_csv(results_path)

            if c == 0:
                cr_all_results = df_cr
                c += 1
            else:
                cr_all_results = pd.concat([cr_all_results, df_cr], axis=0)

    if cr_all_results is not None:
        cr_filename = f"classification_results_all{sufix}_{label}.csv"

        cr_all_results.to_csv(os.path.join(results_dir, cr_filename), index=False)
    else:
        print(f"No results to merge for {name}")

    print(f"Merged all result of {c} classifiers to CSV")

#### Hyperparameter Tuning & Evaluation

In [None]:
def evaluate_pipeline(pipelines, param_grids, df, cols, label, feature_group, results_dir=results_dir):

    X_train, X_test, y_train, y_test, le = get_training_data(df, cols, label) # get training and test data

    best_models = {}
    save_results = {}

    print(pipelines)

    for name in pipelines.keys():

        # initialise results dictionary
        save_results[name] = {"Classifier": name, "Feature set": feature_group,
                              "Accuracy (5-fold CV) - Mean": 0.0, "Accuracy (5-fold CV) - Std Dev": 0.0, "Test Accuracy": 0.0,
                              "Best Parameters": None, "All Parameters": None, "Classification Report": None, "Feature Importances": None}

        # calling GridSearch with 5-foldCV to look for optimal set of hyperparameters using the previously defined parameter grid and pipelines
        grid = GridSearchCV(pipelines[name], param_grids[name], cv=5, scoring='accuracy', n_jobs=-1)
        grid.fit(X_train, y_train)

        mean_accuracies = grid.cv_results_['mean_test_score']  # calculate average accuracy across all 5-folds
        std_accuracies = grid.cv_results_['std_test_score']  # calculate std of accuracy across all 5-folds

        print("Accuracy statistics across 5-fold cross-validation:")

        all_params = ""
        best_acc = 0.0
        best_std = 0.0

        for params, mean_acc, std_acc in zip(grid.cv_results_['params'], mean_accuracies, std_accuracies): # iterate over classification results and accuracy results simultaneously
           print(f"{params}: Mean Accuracy = {mean_acc:.4f}, Std Dev = {std_acc:.4f}")
           all_params += f"{params}: Mean Accuracy = {mean_acc:.4f}, Std Dev = {std_acc:.4f}\n"
           if mean_acc > best_acc:
               best_acc = mean_acc
               best_std = std_acc

        best_models[name] = grid.best_estimator_

        save_results[name]["Accuracy (5-fold CV) - Mean"] = f"{best_acc:.4f}"
        save_results[name]["Accuracy (5-fold CV) - Std Dev"] = f"{best_std:.4f}"
        save_results[name]["Best Parameters"] = grid.best_estimator_
        save_results[name]["All Parameters"] = all_params

        # evaluate optimised model on the test set
        y_pred = grid.predict(X_test)
        acc = accuracy_score(y_test, y_pred) # obtain accuracy scores on test set

        print(f"Best Parameters for {name}: {grid.best_params_}")
        print("Test Accuracy: {:.2f}%".format(acc * 100))

        save_results[name]["Test Accuracy"] = f"{acc:.4f}"

        print("Classification Report:")
        c_report = classification_report(y_test, y_pred, target_names=le.classes_)
        print(c_report)
        print("-" * 50)

        save_results[name]["Classification Report"] = c_report

        # plot confusion matrix
        plot_confusion_matrix(name, label, le, y_test, y_pred, feature_group, results_dir=results_dir)

        # plot feature importances
        clf = grid.best_estimator_.named_steps['clf'] # get classifier
        feature_names = X_train.columns
        importances = plot_features_importance(name, clf, label, feature_names, feature_group, results_dir=results_dir)
        print("=" * 80)

        if importances is not None:
            save_results[name]["Feature Importances"] = importances.head(10)
        else:
            save_results[name]["Feature Importances"] = "Not supported for this classifier"

        # save results to csv file
        save_results_to_csv(save_results, label, feature_group, results_dir=results_dir)

#### Model Training & Evaluation

In [None]:
# uncomment to select dataset / label to use

FRAME_LEVEL = True # True for frame-level, False for video-level
LABEL = 'emotion'
#LABEL = 'sentiment'

# uncomment to select the different subset of features (testing different modalities on classificaiton performance)
f_group = feature_group # 'rgb_hsv','audio', 'optical_flow'
'''
f_group = ['rgb_hsv', 'audio']
# f_group = ['optical_flow']
# merged_feature_group = join_feature_datasets(['rgb_hsv', 'optical_flow'])
# f_group = [merged_feature_group]
'''

for FEATURE_GROUP in f_group:

    frame_df, video_df, feature_cols = get_feature_group_datasets(FEATURE_GROUP)

    if FRAME_LEVEL:
        df = frame_df
        dataset = "frame_level"
    else:
        df = video_df
        dataset = "video_level"

    # Evaluate all pipelines
    print("=== EVALUATING PIPELINES ===\n")
    evaluate_pipeline(pipelines, param_grids, df, feature_cols, LABEL, FEATURE_GROUP, results_dir=os.path.join(results_dir, dataset, "modelling"))

In [None]:
merge_all_results_to_csv(pipelines.keys(), LABEL, feature_group, results_dir=os.path.join(results_dir, dataset, "modelling"))