In [0]:
import os
import pickle
import pandas as pd
import numpy as np
from PIL import Image
from keras.applications.inception_resnet_v2 import InceptionResNetV2, preprocess_input
from keras.utils import to_categorical
from keras.preprocessing import image


EMOTIONS = [
    "angry",
    "calm",
    "disgust",
    "fear",
    "sad",
    "happy",
    "neutral",
    "surprise"
]
DATA_PATH = 'data/'

IMG_WIDTH, IMG_HEIGHT = 100, 100
SEQ_LENGTH = 2
OVERLAP_IDX = int(0.9 * SEQ_LENGTH)
SEQUENCE_PATH = 'sequence/'

In [0]:
model = InceptionResNetV2(include_top=False, weights='imagenet')

In [0]:
def extract_feature_sequence():
    X, y = [], []
    for emotion in EMOTIONS:
        video_list = [f for f in os.listdir(DATA_PATH + emotion)]
        for video in video_list:
            video_path = DATA_PATH + emotion + '/' + video + '/' + video + '_aligned'
            frames = [f for f in os.listdir(video_path) if os.path.isfile(os.path.join(video_path, f))]
            if len(frames) >= SEQ_LENGTH:
                X, y = process_frames(frames, video_path, emotion, X, y)
        print('{} sequences extracted'.format(emotion))
    # use onehot encoding for LSTM
    if SEQ_LENGTH > 1:
        y = to_categorical(y, num_classes=len(EMOTIONS))
    # save to binary files
    print('Saving sequence')
    if SEQ_LENGTH == 1:
        np.save(SINGLE_PATH + 'X_InceptionResNetV2', X)
        np.save(SINGLE_PATH + 'y_InceptionResNetV2', y)
    else:
        np.save(SEQUENCE_PATH + 'X_InceptionResNetV2', X)
        np.save(SEQUENCE_PATH + 'y_InceptionResNetV2', y)


def process_frames(frames, video_path, emotion, X, y):
    sequence = []
    for frame in frames:
        frame = video_path + '/' + frame
        features = extract_features(model, frame)
        sequence.append(features)
        if len(sequence) == SEQ_LENGTH:
            X.append(sequence)
            y.append(EMOTIONS.index(emotion))
            # no overlapping frames if sequence length is less than 2
            if SEQ_LENGTH > 1:
                sequence = sequence[OVERLAP_IDX:]
            else:
                sequence = []
    return X, y


def extract_features(model, image_path):
    # load and preprocess the frame
    print(image_path)
    img = image.load_img(image_path, target_size=(IMG_WIDTH, IMG_HEIGHT))
    x = image.img_to_array(img)
    x = np.expand_dims(x, axis=0)
    x = preprocess_input(x)
    # Get the prediction.
    features = model.predict(x)
    features = features[0]
    return features

# extract_feature_sequence()

In [0]:
from sklearn.model_selection import train_test_split

def split_dataset(X, y, test_size=0.2, val_split=True):
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=42, stratify=y)
    if val_split:
        X_val, X_test, y_val, y_test = train_test_split(X_test, y_test, test_size=0.5, random_state=42, stratify=y_test)
        return X_train, y_train, X_val, y_val, X_test, y_test
    else:
        return X_train, y_train, X_test, y_test

def load_sequence():
    X = np.load(SEQUENCE_PATH + 'X_InceptionResNetV2.npy')
    X = X.reshape(X.shape[0], X.shape[1], X.shape[2] * X.shape[3] * X.shape[4])
    y = np.load(SEQUENCE_PATH + 'y_InceptionResNetV2.npy')
    X_train, y_train, X_val, y_val, X_test, y_test = split_dataset(X, y, test_size=0.2)
    return X_train, y_train, X_val, y_val, X_test, y_test

In [0]:
X_train, y_train, X_val, y_val, X_test, y_test = load_sequence()

In [0]:
import matplotlib.pyplot as plt
import time
from keras.models import Sequential, load_model
from keras.layers import LSTM, Dense
from keras.callbacks import TensorBoard, ModelCheckpoint
from sklearn.metrics import confusion_matrix
import itertools

def plot_confusion_matrix(cm, title='Confusion matrix', float_display='.4f', cmap=plt.cm.Greens, class_names=None):
    # create confusion matrix plot
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(cm.shape[1])
    plt.xticks(tick_marks)
    ax = plt.gca()
    ax.set_xticklabels(class_names)
    plt.yticks(tick_marks)
    ax.set_yticklabels(class_names)

    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], float_display),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('Actual')
    plt.xlabel('Predicted')

def get_predictions_and_labels(model, X, y):
    predictions = model.predict(X)
    y_true = []
    y_pred = []
    for i in range(len(y)):
        label = list(y[i]).index(1)
        pred = list(predictions[i])
        max_value = max(pred)
        max_index = pred.index(max_value)
        p = max_index
        y_true.append(label)
        y_pred.append(p)
    return y_true, y_pred

In [0]:
class LSTMNetwork:
    def __init__(self, n_layer, lstm_unit, input_shape, feature, data_type):
        DATA = load_var()
        self.EMOTIONS = DATA['EMOTIONS']

        self.model = Sequential()
        if n_layer > 1:
            self.model.add(LSTM(lstm_unit, return_sequences=True, input_shape=input_shape,
                                dropout=0.2))
            layer_count = 1
            while layer_count < n_layer:
                if layer_count == n_layer - 1:
                    self.model.add(LSTM(lstm_unit, return_sequences=False, dropout=0.2))
                else:
                    self.model.add(LSTM(lstm_unit, return_sequences=True, dropout=0.2))
                layer_count += 1
        else:
            self.model.add(LSTM(lstm_unit, return_sequences=False, input_shape=input_shape,
                                dropout=0.2))
        nb_class = len(self.EMOTIONS)
        self.model.add(Dense(nb_class, activation='softmax'))

        current_time = time.strftime("%Y%m%d-%H%M%S")
        self.base_dir = 'LSTM/' + data_type + '/' + feature + '/'
        self.model_dir = 'LSTM_' + str(n_layer) + '_' + str(lstm_unit) + '_' + current_time + '/'
        filename = 'LSTM.h5'
        self.model_file = self.base_dir + self.model_dir + filename

    def train(self, X_train, y_train, X_val, y_val, epochs, batch_size):
        # compile and train the model
        if not os.path.exists(self.base_dir + self.model_dir):
            os.makedirs(self.base_dir + self.model_dir)
        log_dir = self.base_dir + self.model_dir + 'log/'
        os.mkdir(log_dir)
        self.model.compile(optimizer='adam',
                           loss='categorical_crossentropy',
                           metrics=['accuracy'])
        callbacks = [ModelCheckpoint(self.model_file, monitor='val_loss', save_best_only=True, verbose=0),
                     TensorBoard(log_dir=log_dir, write_graph=True)]
        self.model.fit(X_train, y_train,
                       epochs=epochs,
                       batch_size=batch_size,
                       validation_data=(X_val, y_val),
                       callbacks=callbacks)

    def evaluate(self, X_val, y_val):
        # evaluate_vgg16 the model with validation set
        model = load_model(self.model_file)
        scores = model.evaluate(X_val, y_val)
        print('val_loss: {}, val_acc: {}'.format(scores[0], scores[1]))

        y_true, y_pred = get_predictions_and_labels(model, X_val, y_val)
        cm = confusion_matrix(y_true, y_pred)
        cm_percent = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        df = pd.DataFrame(cm_percent, index=self.EMOTIONS, columns=self.EMOTIONS)
        df.index.name = 'Actual'
        df.columns.name = 'Predicted'
        df.to_csv(self.base_dir + self.model_dir + 'cm_val.csv', float_format='%.4f')

        # plot percentage confusion matrix
        fig1, ax1 = plt.subplots()
        plot_confusion_matrix(cm_percent, class_names=self.EMOTIONS)
        plt.savefig(self.base_dir + self.model_dir + 'cm_percent_val.png', format='png')
        # plot normal confusion matrix
        fig2, ax2 = plt.subplots()
        plot_confusion_matrix(cm, float_display='.0f', class_names=self.EMOTIONS)
        plt.savefig(self.base_dir + self.model_dir + 'cm_val.png', format='png')

        plt.show()

    def compare_model(self, X_val, y_val):
        folder_list = [model_dir for model_dir in os.listdir(self.base_dir) if 'LSTM' in model_dir]
        for folder in folder_list:
            filename = 'LSTM.h5'
            path = os.path.join(self.base_dir, folder, filename)
            model = load_model(path)
            scores = model.evaluate(X_val, y_val)
            print('model: {}, val_loss: {}, val_acc: {}'.format(folder, scores[0], scores[1]))

            y_true, y_pred = get_predictions_and_labels(model, X_val, y_val)
            cm = confusion_matrix(y_true, y_pred)
            cm_percent = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

            # plot percentage confusion matrix
            fig1, ax1 = plt.subplots()
            plot_confusion_matrix(cm_percent, class_names=self.EMOTIONS)
            plt.savefig(os.path.join(self.base_dir, folder, 'cm_percent_test.png'), format='png')
            # plot normal confusion matrix
            fig2, ax2 = plt.subplots()
            plot_confusion_matrix(cm, float_display='.0f', class_names=self.EMOTIONS)
            plt.savefig(os.path.join(self.base_dir, folder, 'cm_test.png'), format='png')

In [0]:
feature = 'InceptionResNetV2'
data_type = 'Basic'
n_layer = 1
lstm_unit = 32
batch_size = 256
epochs = 250
lstm_net = LSTMNetwork(n_layer, lstm_unit, X_train.shape[1:], feature, data_type)

In [0]:
lstm_net.train(X_train, y_train, X_test, y_test, epochs, batch_size)

In [0]:
lstm_net.evaluate(X_val, y_val)