In [None]:
from google.colab import drive
drive.mount('/content/drive')
base_path = "/content/drive/Shareddrives/ECE 4580 Final Project/Colab/"

Mounted at /content/drive


In [None]:
import keras
from keras.models import Sequential
from keras.layers import Dense, Dropout, Activation, Flatten
from keras.layers import Conv2D, MaxPooling2D, BatchNormalization
from keras.regularizers import l2
import pandas as pd
import tensorflow as tf
import numpy as np
import os
import cv2
from keras.preprocessing.image import ImageDataGenerator
from keras.callbacks import ReduceLROnPlateau, TensorBoard, ModelCheckpoint, EarlyStopping
from keras.losses import categorical_crossentropy
from keras.optimizers import Adam
from sklearn.model_selection import train_test_split

In [None]:
'''
This function will instantiate a Keras sequential model and incrementally add layers with the correct parameters

This architecture was designed by Furkan Kinli, Ph.D. His methodology is described here: https://medium.com/@birdortyedi_23820/deep-learning-lab-episode-3-fer2013-c38f2e052280

:param width: the width of the input images
:param height: the height of the input images
:param num_features: the dimensionality of each layers output. This changes the number of convolutional filters that pass over the image
:param num_labels: the size of the output. In this case, it is the number of emotions that can be classified
'''
def create_model(width, height, num_features, num_labels):
    #instantiate a Keras Sequential object to construct the model
    model = Sequential()

    # use a series of convolutional layers to downsample the 2d input
    # followed by batchnorm for normalization 
    # pooling to reduce the dimensionality
    # dropout to randomly remove weights to reduce overfitting 
    # relu (Rectified Linear Unit) is standard in most networks to add in non-linearity
    model.add(Conv2D(num_features, kernel_size=(3, 3), activation='relu', input_shape=(width, height, 1), data_format='channels_last', kernel_regularizer=l2(0.01)))
    model.add(Conv2D(num_features, kernel_size=(3, 3), activation='relu', padding='same'))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
    model.add(Dropout(0.5))

    model.add(Conv2D(2*num_features, kernel_size=(3, 3), activation='relu', padding='same'))
    model.add(BatchNormalization())
    model.add(Conv2D(2*num_features, kernel_size=(3, 3), activation='relu', padding='same'))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
    model.add(Dropout(0.5))

    model.add(Conv2D(2*2*num_features, kernel_size=(3, 3), activation='relu', padding='same'))
    model.add(BatchNormalization())
    model.add(Conv2D(2*2*num_features, kernel_size=(3, 3), activation='relu', padding='same'))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
    model.add(Dropout(0.5))

    model.add(Conv2D(2*2*2*num_features, kernel_size=(3, 3), activation='relu', padding='same'))
    model.add(BatchNormalization())
    model.add(Conv2D(2*2*2*num_features, kernel_size=(3, 3), activation='relu', padding='same'))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
    model.add(Dropout(0.5))

    # flatten will reshape the 2d output of the convolutional layers to a 1d vector
    model.add(Flatten())

    # follow convolutional layers with a few simple fully connected (dense) layers
    # still use dropout to reduce overfitting
    model.add(Dense(2*2*2*num_features, activation='relu'))
    model.add(Dropout(0.4))
    model.add(Dense(2*2*num_features, activation='relu'))
    model.add(Dropout(0.4))
    model.add(Dense(2*num_features, activation='relu'))
    model.add(Dropout(0.5))

    # the final softmax layer will specify the output size
    # softmax activation is used for categorical classification
    model.add(Dense(num_labels, activation='softmax'))

    return model

In [None]:
'''
Read in the csv data from the fer2013 dataset and load it into a pandas dataframe
'''
EMOTIONS = {
    0:"angry",
    1:"disgust",
    2:"fear",
    3:"happy",
    4:"sad",
    5:"surprise",
    6:"neutral"
}

train_dir = base_path+"data/train.csv"

df = pd.read_csv(train_dir)

# convert pixel strings into 2d numpy arrays
#faces = df['pixels'].apply(string_to_image)

pixels = df['pixels'].tolist()
faces = []
for sequence in pixels:
    face = [int(pixel) for pixel in sequence.split()]
    face = np.asarray(face).reshape(48, 48)
    face = cv2.resize(face.astype('uint8'), (48, 48))
    faces.append(face.astype('float32'))


#expand the channel dimension of each image
faces = np.asarray(faces)
faces = np.expand_dims(faces, -1)

#convert labels to categorical matrix
emotions = pd.get_dummies(df['emotion']).values

# get train-test split
x_train, x_test, y_train, y_test = train_test_split(faces, emotions, test_size=0.1, random_state=42)

# split training once more into validation
x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.1, random_state=69)

In [None]:
'''
Train the model
'''
num_features = 64
num_labels = 7
batch_size = 64
epochs = 100
width, height = 48, 48
model = create_model(width=width, height=height, num_features=num_features, num_labels=num_labels)

model.summary()

optimizer = Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-7)
model.compile(loss=categorical_crossentropy, optimizer=optimizer, metrics=['accuracy'])

# call custom function to get a list of callback objects
callbacks = []
# add callbacks
lr_reducer = ReduceLROnPlateau(monitor='val_loss', factor=0.9, patience=3, verbose=True)
callbacks.append(lr_reducer)

# save logs in separate directories
tb_logdir= os.path.join(os.getcwd(), 'training_logs')
i = 0 
while os.path.exists(os.path.join(tb_logdir, str(i))):
    i += 1
tb_logdir = os.path.join(tb_logdir, str(i))
os.makedirs(tb_logdir)

tb = TensorBoard(log_dir=tb_logdir)
callbacks.append(tb)

checkpoint_fpath = os.path.join(tb_logdir, "checkpoint_weights.{epoch:02d}")
checkpointer = ModelCheckpoint(checkpoint_fpath, monitor='val_loss', verbose=True, save_best_only=True)
callbacks.append(checkpointer)

# train the model
model.fit(np.array(x_train), np.array(y_train), 
    batch_size=batch_size,
    epochs=epochs,
    verbose=True,
    validation_data=(x_test, y_test), 
    shuffle=True,
    callbacks=callbacks
)

scores = model.evaluate(np.array(x_test), np.array(y_test), batch_size=batch_size)
print(f"Loss: {scores[0]}")
print(f"Accuracy: {scores[1]}")

model.save(base_path+"vt-moji-0")


Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d (Conv2D)              (None, 46, 46, 64)        640       
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 46, 46, 64)        36928     
_________________________________________________________________
batch_normalization (BatchNo (None, 46, 46, 64)        256       
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 23, 23, 64)        0         
_________________________________________________________________
dropout (Dropout)            (None, 23, 23, 64)        0         
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 23, 23, 128)       73856     
_________________________________________________________________
batch_normalization_1 (Batch (None, 23, 23, 128)       5