# **1. SEED**

In [0]:
# Seed value
seed_value= 0
 
# 1. Set `PYTHONHASHSEED` environment variable at a fixed value
import os
os.environ['PYTHONHASHSEED']=str(seed_value)
 
# 2. Set `python` built-in pseudo-random generator at a fixed value
import random
random.seed(seed_value)
 
# 3. Set `numpy` pseudo-random generator at a fixed value
import numpy as np
np.random.seed(seed_value)
 
# 4. Set `tensorflow` pseudo-random generator at a fixed value
import tensorflow as tf
tf.random.set_seed(seed_value)
 
# 5. Configure a new global `tensorflow` session
session_conf = tf.compat.v1.ConfigProto(intra_op_parallelism_threads=1, inter_op_parallelism_threads=1)
sess = tf.compat.v1.Session(graph=tf.compat.v1.get_default_graph(), config=session_conf)
tf.compat.v1.keras.backend.set_session(sess)

# **2. Load packages**

In [2]:
# import the necessary
from tensorflow.keras.callbacks import LearningRateScheduler, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import tensorflow.keras.backend as K

import matplotlib.pyplot as plt
from sklearn import metrics
import seaborn as sns
import pandas as pd
import datetime
import math
import time
import os
import csv

  import pandas.util.testing as tm


# **3. Load Data**

In [0]:
# Load the data from the csv file.
# path = path to your csv file with FER2013 dataset
path = '/content/drive/My Drive/datasets/facial-expression/fer2013/new-fer2013-dataset-balanced.csv'
fer_dataset = pd.read_csv(path)

# **4. Data preprocessing**

In [0]:
# Shuffle the dataset 
fer_dataset = fer_dataset.sample(frac=1, random_state=seed_value).reset_index(drop=True)

In [0]:
# Function for normalizing the dataset
# input_set: dataset 
# scaler: the scaler for normalize
# @return: dataset with normalized data
def normalizeSet(input_set, scaler):
  x = scaler.fit_transform(input_set)
  return x

# The function for getting the pixels as a list 
# input_set: dataset 
# @return: an array of flattened images
def getPixels(input_set):
  pixels = input_set['pixels'].tolist()
  new_pixels = []
  for pixel_sequence in pixels:
    image = [int(pixel) for pixel in pixel_sequence.split(' ')]
    new_pixels.append(image)
  return np.asarray(new_pixels, dtype='float32')  

# The function for reshpping the dataset
# cutsize: if True the image will be cutted
# input_set: dataset with only pixels (# the return of the getPixels(input_set))
def reshapeDataset(input_set, cutsize=False):
  new_input_set = []
  for image in input_set:
    # reshape the image in (48, 48) shape
    image = image.reshape((48, 48))

    # add new axis for being in (48, 48, 1) shape
    image = image[:, :, np.newaxis]

    # reshape in (48, 48, 3) shape by concatenating 3 times the image
    # image = np.concatenate((image, image, image), axis=2)
    new_input_set.append(image)
  return np.asarray(new_input_set, dtype='float32')

# This function is going to help with normalizing and reshapping a set
def normalize_reshape_set(input_set, scaler, cutsize):
  images = getPixels(input_set)
  print('Done with pixels of images')

  images = normalizeSet(images,scaler)
  print('Done normalizing the images')

  reshapped_images = reshapeDataset(images,cutsize)
  print('Done reshaping the images')

  return reshapped_images

In [0]:
# We want to preparing the data set for normalizing part
data_for_training = fer_dataset[fer_dataset['Usage'] == 'Training']

data_for_validation = fer_dataset[fer_dataset['Usage'] == 'PublicTest']
data_for_testing = fer_dataset[fer_dataset['Usage'] == 'PrivateTest']

# Print the shape of the sets
print('Train shape = ',data_for_training.shape)
print('Validation shape = ',data_for_validation.shape)
print('Test shape = ',data_for_testing.shape)

In [0]:
# Drop the column Usage from train, validation and test sets
data_for_training = data_for_training.drop(['Usage'],axis=1)
data_for_validation = data_for_validation.drop(['Usage'],axis=1)
data_for_testing = data_for_testing.drop(['Usage'],axis=1)

In [0]:
# Split the data in X and y format

from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler()

X_train = normalize_reshape_set(data_for_training, scaler, cutsize = False)
X_val = normalize_reshape_set(data_for_validation, scaler, cutsize = True)
X_test = normalize_reshape_set(data_for_testing, scaler, cutsize = True)

y_train = np.asarray(data_for_training['emotion'], dtype='int32')
y_val = np.asarray(data_for_validation['emotion'], dtype='int32')
y_test = np.asarray(data_for_testing['emotion'], dtype='int32')

In [0]:
# Let's check the shape of every set
print('X_train = ',X_train.shape)
print('X_val = ', X_val.shape)
print('X_test = ', X_test.shape)

# **5. Parameters**

In [0]:
# -------- FOR MODEL ARCHITECTURE --------
number_classes = 7
number_features = 32
growthRate = 8
# -------- FOR MODEL ARCHITECTURE --------

# -------- FOR LEARNING RATE SCHEDULE --------
learning_rate_decay_start = 80
learning_rate_decay_every = 5
learning_rate_decay_rate = 0.9
# -------- FOR LEARNING RATE SCHEDULE --------

# -------- FOR FITTING THE MODEL --------
# NUMBER_OF_EPOCHS = 600
NUMBER_OF_EPOCHS = 1
batch_size = 64
num_samples = X_train.shape[0]
num_samples_val = X_val.shape[0]
steps = math.ceil(num_samples / batch_size)
steps_validation = math.ceil(num_samples_val / batch_size)
initial_learning_rate = 0.01
lambd = 0.0005
# -------- FOR FITTING THE MODEL --------

# -------- OTHERS --------
filepath = "/content/weights-improvement-{epoch:02d}-{val_accuracy:.2f}.hdf5"
loss_function = 'sparse_categorical_crossentropy'
regularizer = tf.keras.regularizers.l1(lambd)
verbosity = 1
# -------- OTHERS --------

# **6. Learning rate schedule**

In [0]:
# This function only get the parameters for learning rate scheduler
def get_setup_learning_rate_schedule():
  # for production code you should read them from the config file
  return (learning_rate_decay_start, learning_rate_decay_every, learning_rate_decay_rate, initial_learning_rate)

In [0]:
def step_decay(epoch):
  learning_rate_decay_start, learning_rate_decay_every, learning_rate_decay_rate, initial_learning_rate = get_setup_learning_rate_schedule()
  if (epoch > learning_rate_decay_start):
    frac = (epoch - learning_rate_decay_start) // learning_rate_decay_every
    decay_factor = learning_rate_decay_rate ** frac
    initial_learning_rate = initial_learning_rate * decay_factor
    return initial_learning_rate
  else: return initial_learning_rate

# **7. EdgeCNN implementation**

In [0]:
# Let's make the EdgeCNN arhictecture based on DenseNet, but with some additional changes in order to work well on Raspberry Pi 4
from tensorflow.keras import layers, models, Model

# Create the function for Edge convolution block
# x: input layer 
# growthRate: how many filter we want fot output
# @return: the layer after applying convolution block 1 and 2
def EdgeBlock(x, growthRate, name):
  # Convolution block 1
  x = layers.Conv2D(4 * growthRate, kernel_size = 3, kernel_regularizer = regularizer,padding = 'same', name='conv1_dense' + name)(x)
  x = layers.BatchNormalization(name='batchnorm1_dense' + name)(x)
  x = layers.ReLU(name='relu_dense'+name)(x)

  # Convolution block 2
  x = layers.Conv2D(1 * growthRate, kernel_size = 3, kernel_regularizer = regularizer, padding = 'same', name='conv2_dense' + name)(x)
  x = layers.BatchNormalization(name='batchnorm2_dense' + name)(x)
  return x

# Create the function for Dense Block
# input_layer: the layer on which we apply the EdgeBlock function of 'repetition' times 
# growthRate: how many filter we want for output
# name: name of the denseblock densenumber_
# @return: concatenated layers after applying the EdgeBlock of 'repetition' times
def denseBlock(input_layer, repetition, growthRate, name):
  for i in range(repetition):
    # apply the convolution block 1 and 2
    x = EdgeBlock(input_layer, growthRate,str(name)+'_'+str(i))
    # concatenate with the input layer
    input_layer = layers.Concatenate()([input_layer,x])
  return input_layer

# Create the transition layer
# input_layer: the layer on which we apply average pooling
# name: name of the layer
# @return: the layer with average pooling applied
def transitionLayer(input_layer, name):
  input_layer = layers.AveragePooling2D((2,2), strides = 2, name = 'transition_'+str(name))(input_layer)
  return input_layer

In [0]:
input_shape = (48,48,1)
input_net = layers.Input(input_shape)
# First layer of convolution
x = layers.Conv2D(number_features,(3,3), kernel_regularizer = regularizer,padding = 'same', use_bias = True, strides = 1, name='conv0')(input_net)
x = layers.MaxPool2D((3,3),padding = 'same',strides = 2, name='pool0')(x)

# Add the Dense layers
repetitions = 4, 4, 7

layer_index = 1
for repetition in repetitions:
  dense_block = denseBlock(x, repetition, growthRate,name=layer_index)
  x = transitionLayer(dense_block, name=layer_index)
  layer_index+=1

x = layers.GlobalAveragePooling2D(data_format='channels_last')(dense_block)

output_layer = layers.Dense(number_classes, activation='softmax', kernel_regularizer = regularizer)(x)

In [0]:
EdgeCNN = Model(input_net, output_layer)

In [0]:
EdgeCNN.summary()

# **8. Callbacks & Online data augmentation**

In [0]:
data_generator_train = ImageDataGenerator(
    rotation_range=10,
    horizontal_flip=True)

data_generator_val = ImageDataGenerator(
    rotation_range=10,
    horizontal_flip=True)

train_generator = data_generator_train.flow(X_train, y_train, batch_size=batch_size,seed=seed_value)
val_generator = data_generator_val.flow(X_val, y_val, batch_size=batch_size,seed=seed_value)

In [0]:
# Checkpoint callback
checkpoint_callback_sgd = ModelCheckpoint(filepath, monitor='val_accuracy', verbose=2, save_best_only=True, mode='max')

# Learning rate schedule callback
scheduler_lr = LearningRateScheduler(step_decay)

#callbacks list
callbacks_list_sgd = [checkpoint_callback_sgd, scheduler_lr]

# **9. Compile**

In [0]:
# SGD optimizer
sgd_optimizer = tf.optimizers.SGD(learning_rate=initial_learning_rate,  momentum=0.9)

# Compile the model
EdgeCNN.compile(
    optimizer = sgd_optimizer,
    loss = loss_function,
    metrics = ['accuracy']
)


# **10. Fit**

In [0]:
# Get the start time
start = time.time()

# Let's fit the model
history = EdgeCNN.fit(
    train_generator,
    steps_per_epoch=steps,
    verbose = verbosity,
    epochs = NUMBER_OF_EPOCHS,
    validation_data = val_generator,
    validation_steps = steps_validation, 
    callbacks=callbacks_list_sgd
)
# Get the finish time
end = time.time()
hours, rem = divmod(end-start, 3600)
minutes, seconds = divmod(rem, 60)
print("{:0>2}:{:0>2}:{:05.2f}".format(int(hours),int(minutes),seconds))

# **11. Evaluate**

In [0]:
# Get the metrics
accuracy = history.history['accuracy']
val_accuracy = history.history['val_accuracy']
loss = history.history['loss']
val_loss = history.history['val_loss']

In [0]:
# Get the max accuracy on the validation
max_accuracy = max(val_accuracy)
print('Max val accuracy: {:5.2f}%'.format(100*max_accuracy))

# Get the epoch of max accuracy
at_epoch = np.asarray(val_accuracy, dtype='float32').argmax()
print('At epoch: ' + str(at_epoch + 1))

In [0]:
def plot_metric(x,y1,y2,color1,color2,label1,label2, title):
  plt.subplots(figsize=(20,8)) 
  plt.plot(x, y1, 'b', label=label1, color=color1)
  plt.plot(x, y2, 'b', label=label2, color=color2)
  plt.title(title)
  plt.legend()

In [0]:
# Plot the accuracy on the training and validation set
epochs = range(len(accuracy))
sns.set(style="white")
plot_metric(epochs,accuracy,val_accuracy, 'red','blue','training','validation', 'Training and valdiation ACCURACY')

In [0]:
# Plot the loss on the training and validation set
epochs = range(len(accuracy))
plot_metric(epochs,loss,val_loss, 'red','blue','training','validation', 'Training and valdiation LOSS')

In [0]:
# Get the accuracy and loss on test set
loss, acc = EdgeCNN.evaluate(X_test,  y_test,batch_size=batch_size, verbose=2)
print('Model, accuracy: {:5.2f}%'.format(100*acc))

In [0]:
# Get the predictions of 
y_pred = EdgeCNN.predict(X_test,batch_size=batch_size,verbose=verbosity)

In [0]:
# Get the confusion matrix
conf_matrix = metrics.confusion_matrix(y_test, y_pred.argmax(axis = 1))

# Plot the confusion matrix
figure = plt.figure(figsize=(8, 8))
sns.heatmap(conf_matrix, annot=True,cmap=plt.cm.Blues, fmt="d")
plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.show()

In [0]:
# Print the classification Report
class_names = ['Angry', 'Disgust', 'Fear', 'Happy', 'Sad', 'Surprise', 'Neutral']
print(metrics.classification_report(y_test, y_pred.argmax(axis=1), target_names=class_names))

In [0]:
# Get the ROC_AUC Score
metrics.roc_auc_score(y_test, y_pred,multi_class='ovr')

# **12. Save**

In [0]:
# Load the weights of the best checkpoint
# Set the path of the trained model
EdgeCNN.load_weights('/content/drive/My Drive/FaceX/tests/ReduceLROnPlateau/v0.0.1/weights-improvement-319-0.66.hdf5')

In [0]:
# Save the model
# Set the path of where you want to save the model 
EdgeCNN.save('/content/facex_tflite')

# **13. Convert to TFLite** 

In [0]:
# Get the model from saved model file
# Set the path of where the model was saved
converter = tf.lite.TFLiteConverter.from_saved_model('/content/drive/My Drive/FaceX/tflite/facex_tflite')

# Convert the model
tflite_model = converter.convert()

In [0]:
# Write to file the Lite version of the model
# Set the path of where you want to save the model
open("/content/drive/My Drive/FaceX/tflite/facial_expression.tflite", "wb").write(tflite_model)