In [None]:
import cv2
import os
import numpy as np
from tqdm import tqdm
import random
from time import time
import sklearn

import matplotlib.pyplot as plt
from sklearn.metrics import *
from sklearn.model_selection import KFold, train_test_split

import tensorflow as tf
from tensorflow.keras.applications.densenet import DenseNet121
from tensorflow.keras.applications.densenet import preprocess_input
from tensorflow.keras import layers
from tensorflow.keras.models import Sequential
from tensorflow.keras.utils import to_categorical

In [None]:
seed_constant = 27
np.random.seed(seed_constant)
random.seed(seed_constant)
tf.random.set_seed(seed_constant)

In [None]:
model = DenseNet121()
feature_extractor_model = tf.keras.Model(inputs=model.inputs, outputs=model.layers[-2].output)

video_length = 70
# read directory of classes. in each class read video frames. extract features from each frame using DenseNet121
def read_video_frames(video_dir, classes_names):
    video_frames = []
    classes = []
    for idx, class_name in enumerate(classes_names):
        print('\n', class_name)
        class_dir = os.path.join(video_dir, class_name)

        for video_name in tqdm(os.listdir(class_dir)):
            cap = cv2.VideoCapture(os.path.join(class_dir, video_name))
            frames = []

            for i in range(video_length):
                ret, frame = cap.read()
                if ret == False:
                  break
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                frame = cv2.resize(frame, (224, 224))
                frames.append(frame)
            cap.release()
            
            if len(frames) == video_length:
              try:
                frames = np.array(frames)
                frames = preprocess_input(frames)
                frames = feature_extractor_model.predict(frames)
                video_frames.append(frames)
                classes.append(idx)
              except:
                pass 

    return np.array(video_frames), np.array(classes)

In [None]:
classes = ["Anger", "Disgust", 'Fear', 'Happiness', 'Neutral', 'Sadness', 'Surprise']

video_frames, classesf = read_video_frames('path/to/dataset/train', classes)
X_test_v, y_test_v = read_video_frames('path/to/dataset/validation', classes)

In [None]:
X_train, X_test, y_train, y_test = train_test_split(video_frames, classesf, test_size=0.2, random_state=42, shuffle=True)

In [None]:
def plot_metric(model_training_history, metric_name_1, metric_name_2, plot_name):

    metric_value_1 = model_training_history.history[metric_name_1]
    metric_value_2 = model_training_history.history[metric_name_2]
    epochs = range(len(metric_value_1))
    plt.plot(epochs, metric_value_1, 'blue', label = 'train')
    plt.plot(epochs, metric_value_2, 'red', label = 'validation')
    plt.title(str(plot_name))
    plt.legend()

In [None]:
def get_lstm_model():
  model = Sequential()
  model.add(layers.LSTM(64, input_shape=(70, 1024), return_sequences=True))
  model.add(layers.LSTM(32))
  model.add(layers.BatchNormalization())
  model.add(layers.Dense(len(classes), activation='softmax'))
  return model

In [None]:
model = get_lstm_model()

model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath='modelbest.h5',
    monitor='val_loss',
    verbose = 1,
    save_best_only=True)

opt = tf.keras.optimizers.Adam(learning_rate=0.001)
model.compile(loss='categorical_crossentropy', optimizer=opt, metrics=['accuracy', tf.keras.metrics.Precision(), tf.keras.metrics.Recall()])

history = model.fit(video_frames2, to_categorical(classesf), epochs=100, batch_size=64, verbose=1, shuffle = True, validation_data=[X_test_v, to_categorical(y_test_v)], callbacks = [model_checkpoint_callback])

In [None]:
model.load_weights('modelbest.h5')
test_results = model.evaluate(X_test_v, to_categorical(y_test_v), verbose=1)
print(f'Test results - Loss: {test_results[0]} - Accuracy: {test_results[1]}%')
print(f'Precision: {test_results[2]} - Recall: {test_results[3]}')
print(f'F1-Score: {2 * (test_results[2] * test_results[3]) / (test_results[2] + test_results[3])}')

In [None]:
plot_metric(history, 'loss', 'val_loss', 'Total Loss vs Total Validation Loss')

In [None]:
plot_metric(history, 'accuracy', 'val_accuracy', 'Total Accuracy vs Total Validation Accuracy')

In [None]:
def model_build():
  model = get_lstm_model()
  opt = tf.keras.optimizers.Adam(learning_rate=0.001)
  model.compile(loss='categorical_crossentropy', optimizer=opt, metrics=['accuracy', tf.keras.metrics.Recall(), tf.keras.metrics.Precision()])
  return model

In [None]:
def KFold_cross_val_test(X, Y, K=5, Report=True, shuffle=False, print_report=True, random_state=False):
    if random_state:
        cv = KFold(n_splits=K, shuffle=shuffle, random_state=random_state)
    else:
        cv = KFold(n_splits=K, shuffle=shuffle)    

    avgacc = 0
    avgf1 = 0
    avgprecision = 0
    avgrecall = 0
    f = 0
    report = []
    ticf=time()
    
    for train_ix, test_ix in cv.split(X):
        result={}
        train_X, test_X = X[train_ix], X[test_ix]
        train_y, Y_test = Y[train_ix], Y[test_ix]

        tic = time()
        model = model_build()
        model_filename = f'classi{f}_model.h5'
        callback_checkpoint = tf.keras.callbacks.ModelCheckpoint(model_filename, verbose=0, monitor='val_loss', save_best_only=True)
        history = model.fit(train_X, train_y, validation_data=(test_X,Y_test), epochs=100 ,verbose=0, callbacks=[callback_checkpoint])
        toc = time()

        model.load_weights(model_filename)
        Y_pred = model.predict(X_test_v)
        y_test_v_cat = to_categorical(y_test_v)

        eval = model.evaluate(X_test_v, to_categorical(y_test_v), verbose=0)
        acc = eval[1]
        recall = eval[2]
        precision = eval[3]
        f1_s = 2 * (precision * recall) / (precision + recall)

        result["Accuracy score"] = acc
        avgacc += result["Accuracy score"]
        result["confusion matrix"] = sklearn.metrics.confusion_matrix(y_test_v_cat.argmax(axis=1), Y_pred.argmax(axis=1))
        result["f1 score"] = f1_s
        avgf1 += result["f1 score"]
        result["precision score"] = precision
        avgprecision += result["precision score"]
        result["recall"] = recall
        avgrecall += result["recall"]
        result["Training time"] = toc-tic
        f = f+1
        if print_report:
            print(f"\n ======== fold {f} ========")
            print(f"\nAccuracy score : ", result["Accuracy score"])
            print(f"\nconfusion matrix : \n", result["confusion matrix"])
            print(f"\nf1 score : ", result["f1 score"])
            print(f"\nrecall : ", result["recall"])
            print(f"\nprecision score : ", result["precision score"])
            print(f"\nTraining Time: {result['Training time']:.3f} s")
            
            plt.plot(history.history['accuracy'])
            plt.plot(history.history['val_accuracy'])
            plt.title('model accuracy')
            plt.ylabel('accuracy')
            plt.xlabel('epoch')
            plt.legend(['Train', 'Validation'], loc='upper left')
            plt.show()
            plt.plot(history.history['loss'])
            plt.plot(history.history['val_loss'])
            plt.title('model loss')
            plt.ylabel('loss')
            plt.xlabel('epoch')
            plt.legend(['Train', 'Validation'], loc='upper left')
            plt.show()
            print(f"\n---------------------------\n")

        report.append(result)
    tocf=time()    
    total_acc = avgacc/K
    total_f1 = avgf1/K
    total_precision = avgprecision/K
    total_recall = avgrecall/K

    print(f'5-fold Accuracy: ', total_acc)
    print(f'5-fold F1: ', total_f1)
    print(f'5-fold Precision: ', total_precision)
    print(f'5-fold Recall: ', total_recall)

    if Report:
        return total_acc, report
    else:
        return total_acc

In [None]:
ac, re = KFold_cross_val_test(video_frames, to_categorical(classesf), shuffle=True, random_state=0)