In [None]:
from google.colab import drive
drive.mount('/content/gdrive/')

Mounted at /content/gdrive/


# Configurations

In [None]:
import os
from os import path
#import the necessary packages
# define the base path to the emotion dataset
BASE_PATH = '/content/gdrive/My Drive/EmotionRecognition'

# use the base path to define the path to the input emotions file
INPUT_PATH = path.sep.join([BASE_PATH, "fer2013/fer2013.csv"])
NUM_CLASSES = 6

# define the path to the output training, validation, and testing
# HDF5 files
TRAIN_HDF5 = path.sep.join([BASE_PATH, "hdf5/train.hdf5"])
VAL_HDF5 = path.sep.join([BASE_PATH, "hdf5/val.hdf5"])
TEST_HDF5 = path.sep.join([BASE_PATH, "hdf5/test.hdf5"])
if os.path.exists(TRAIN_HDF5):
  os.remove(TRAIN_HDF5)
if os.path.exists(VAL_HDF5):
  os.remove(VAL_HDF5)
if os.path.exists(TEST_HDF5):
  os.remove(TEST_HDF5)

# define the batch size
BATCH_SIZE = 128

# define the path to where output logs will be stored
OUTPUT_PATH = path.sep.join([BASE_PATH, "output"])
print(BASE_PATH)

/content/gdrive/My Drive/EmotionRecognition


## HDF5 Dataset Writer

In [None]:
# import the necessary packages
import h5py
import os

class HDF5DatasetWriter:
	def __init__(self, dims, outputPath, dataKey="images",
		bufSize=1000):
		# check to see if the output path exists, and if so, raise
		# an exception
		if os.path.exists(outputPath):
			raise ValueError("The supplied `outputPath` already "
				"exists and cannot be overwritten. Manually delete "
				"the file before continuing.", outputPath)

		# open the HDF5 database for writing and create two datasets:
		# one to store the images/features and another to store the
		# class labels
		self.db = h5py.File(outputPath, "w")
		self.data = self.db.create_dataset(dataKey, dims,
			dtype="float")
		self.labels = self.db.create_dataset("labels", (dims[0],),
			dtype="int")

		# store the buffer size, then initialize the buffer itself
		# along with the index into the datasets
		self.bufSize = bufSize
		self.buffer = {"data": [], "labels": []}
		self.idx = 0

	def add(self, rows, labels):
		# add the rows and labels to the buffer
		self.buffer["data"].extend(rows)
		self.buffer["labels"].extend(labels)

		# check to see if the buffer needs to be flushed to disk
		if len(self.buffer["data"]) >= self.bufSize:
			self.flush()

	def flush(self):
		# write the buffers to disk then reset the buffer
		i = self.idx + len(self.buffer["data"])
		self.data[self.idx:i] = self.buffer["data"]
		self.labels[self.idx:i] = self.buffer["labels"]
		self.idx = i
		self.buffer = {"data": [], "labels": []}

	def storeClassLabels(self, classLabels):
		# create a dataset to store the actual class label names,
		# then store the class labels
		dt = h5py.special_dtype(vlen=str) # `vlen=unicode` for Py2.7
		labelSet = self.db.create_dataset("label_names",
			(len(classLabels),), dtype=dt)
		labelSet[:] = classLabels

	def close(self):
		# check to see if there are any other entries in the buffer
		# that need to be flushed to disk
		if len(self.buffer["data"]) > 0:
			self.flush()

		# close the dataset
		self.db.close()

## Build dataset

In [None]:
# import the necessary packages
# from config import emotion_config as config
# from ImageUtilities.io import HDF5DatasetWriter
import numpy as np

# open the input file for reading (skipping the header), then
# initialize the list of data and labels for the training,
# validation, and testing sets
print("[INFO] loading input data...")
f = open(INPUT_PATH)
f.__next__() # f.next() for Python 2.7
(trainImages, trainLabels) = ([], [])
(valImages, valLabels) = ([], [])
(testImages, testLabels) = ([], [])
# loop over the rows in the input file
for row in f:
    # extract the label, image, and usage from the row
    (label, image, usage) = row.strip().split(",")
    label = int(label)
    # if we are ignoring the "disgust" class there will be 6 total
    # # class labels instead of 7
    if NUM_CLASSES == 6:
        # merge together the "anger" and "disgust classes
        if label == 1:
            label = 0
        # if label has a value greater than zero, subtract one from
        # it to make all labels sequential (not required, but helps
        # when interpreting results)
        if label > 0:
            label -= 1
    
    # reshape the flattened pixel list into a 48x48 (grayscale)
    # image
    image = np.array(image.split(" "), dtype="uint8")
    image = image.reshape((48, 48))

    # check if we are examining a training image
    if usage == "Training":
        trainImages.append(image)
        trainLabels.append(label)
    # check if this is a validation image
    elif usage == "PrivateTest":
        valImages.append(image)
        valLabels.append(label)
    # otherwise, this must be a testing image
    else:
        testImages.append(image)
        testLabels.append(label)

# construct a list pairing the training, validation, and testing
# images along with their corresponding labels and output HDF5
# files
datasets = [
    (trainImages, trainLabels, TRAIN_HDF5),
    (valImages, valLabels, VAL_HDF5),
    (testImages, testLabels, TEST_HDF5)]

# Loop over the dataset tuples

for (images, labels, outputPath) in datasets:
    # create HDF5 writer
    print("[INFO] building {}...".format(outputPath))
    writer = HDF5DatasetWriter((len(images), 48, 48), outputPath)
    
    # loop over the image and add them to the dataset
    # 
    for (image, label) in zip(images, labels):
        writer.add([image], [label])
        
    # close the HDF5 writer
    writer.close()
    # close the input file
f.close()

[INFO] loading input data...
[INFO] building /content/gdrive/My Drive/EmotionRecognition/hdf5/train.hdf5...
[INFO] building /content/gdrive/My Drive/EmotionRecognition/hdf5/val.hdf5...
[INFO] building /content/gdrive/My Drive/EmotionRecognition/hdf5/test.hdf5...


## VGGNet for Emotion Recognition

In [None]:
# import the necessary packages
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import MaxPooling2D
from tensorflow.keras.layers import ELU
from tensorflow.keras.layers import Activation
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import Dropout
from tensorflow.keras.layers import Dense
from tensorflow.keras import backend as K


num_features = 64
class EmotionVGGNet:
	@staticmethod
	def build(width, height, depth, classes):
		# initialize the model along with the input shape to be
		# "channels last" and the channels dimension itself
		model = Sequential()
		inputShape = (height, width, depth)
		chanDim = -1

		# if we are using "channels first", update the input shape
		# and channels dimension
		if K.image_data_format() == "channels_first":
			inputShape = (depth, height, width)
			chanDim = 1
		########################## Adrian's Model ######################
		# Block #1: first CONV => RELU => CONV => RELU => POOL
		# layer set
		model.add(Conv2D(32, (3, 3), padding="same",
			kernel_initializer="he_normal", input_shape=inputShape))
		model.add(ELU())
		model.add(BatchNormalization(axis=chanDim))
		model.add(Conv2D(32, (3, 3), kernel_initializer="he_normal",
			padding="same"))
		model.add(ELU())
		model.add(BatchNormalization(axis=chanDim))
		model.add(MaxPooling2D(pool_size=(2, 2)))
		model.add(Dropout(0.55))

		# Block #2: second CONV => RELU => CONV => RELU => POOL
		# layer set
		model.add(Conv2D(64, (3, 3), kernel_initializer="he_normal",
			padding="same"))
		model.add(ELU())
		model.add(BatchNormalization(axis=chanDim))
		model.add(Conv2D(64, (3, 3), kernel_initializer="he_normal",
			padding="same"))
		model.add(ELU())
		model.add(BatchNormalization(axis=chanDim))
		model.add(MaxPooling2D(pool_size=(2, 2)))
		model.add(Dropout(0.25))

		# Block #3: third CONV => RELU => CONV => RELU => POOL
		# layer set
		model.add(Conv2D(128, (3, 3), kernel_initializer="he_normal",
			padding="same"))
		model.add(ELU())
		model.add(BatchNormalization(axis=chanDim))
		model.add(Conv2D(128, (3, 3), kernel_initializer="he_normal",
			padding="same"))
		model.add(ELU())
		model.add(BatchNormalization(axis=chanDim))
		model.add(MaxPooling2D(pool_size=(2, 2)))
		model.add(Dropout(0.45))

		# Block #4: first set of FC => RELU layers
		model.add(Flatten())
		model.add(Dense(128, kernel_initializer="he_normal"))
		model.add(ELU())
		model.add(BatchNormalization())
		model.add(Dropout(0.65))

		# Block #6: second set of FC => RELU layers
		model.add(Dense(128, kernel_initializer="he_normal"))
		model.add(ELU())
		model.add(BatchNormalization())
		model.add(Dropout(0.7))

		# Block #7: softmax classifier
		model.add(Dense(classes, kernel_initializer="he_normal"))
		model.add(Activation("softmax"))
		
		########################## Kaggle Model ######################
		# #module 1
		# model.add(Conv2D(2*2*num_features, kernel_size=(3, 3), kernel_initializer="he_normal", input_shape=inputShape))
		# # model.add(Activation('relu'))
		# model.add(ELU())
		# model.add(BatchNormalization(axis=chanDim))
		# model.add(Conv2D(2*2*num_features, kernel_size=(3, 3), kernel_initializer="he_normal", padding='same'))
		# # model.add(Activation('relu'))
		# model.add(ELU())
		# model.add(BatchNormalization(axis=chanDim))
		# model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
		# model.add(Dropout(0.15))

		# #module 2
		# model.add(Conv2D(2*num_features, kernel_size=(3, 3), kernel_initializer="he_normal", padding='same'))
		# # model.add(Activation('relu'))
		# model.add(ELU())
		# model.add(BatchNormalization(axis=chanDim))
		# model.add(Conv2D(2*num_features, kernel_size=(3, 3), kernel_initializer="he_normal", padding='same'))
		# # model.add(Activation('relu'))
		# model.add(ELU())
		# model.add(BatchNormalization(axis=chanDim))
		# model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
		# model.add(Dropout(0.15))

		# #module 3
		# model.add(Conv2D(num_features, kernel_size=(3, 3), kernel_initializer="he_normal", padding='same'))
		# # model.add(Activation('relu'))
		# model.add(ELU())
		# model.add(BatchNormalization(axis=chanDim))
		# model.add(Conv2D(num_features, kernel_size=(3, 3), kernel_initializer="he_normal", padding='same'))
		# # model.add(Activation('relu'))
		# model.add(ELU())
		# model.add(BatchNormalization(axis=chanDim))
		# model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
		# model.add(Dropout(0.15))

		# #flatten
		# model.add(Flatten())

		# #dense 1
		# model.add(Dense(2*2*num_features, kernel_initializer="he_normal"))
		# # model.add(Activation('relu'))
		# model.add(ELU())
		# model.add(BatchNormalization())
		# model.add(Dropout(0.2))
	
		# #dense 2
		# model.add(Dense(2*2*num_features, kernel_initializer="he_normal"))
		# # model.add(Activation('relu'))
		# model.add(ELU())
		# model.add(BatchNormalization())
		# model.add(Dropout(0.4))
	
		# #dense 3
		# model.add(Dense(2*num_features, kernel_initializer="he_normal"))
		# # model.add(Activation('relu'))
		# model.add(ELU())
		# model.add(BatchNormalization())
		# model.add(Dropout(0.4))
	
		# #output layer
		# model.add(Dense(NUM_CLASSES, kernel_initializer="he_normal", activation='softmax'))
		# return the constructed network architecture
		return model

if __name__ == "__main__":
	# visualize the network architecture
	from tensorflow.keras.utils import plot_model
	from tensorflow.keras.regularizers import l2
	model = EmotionVGGNet.build(48, 48, 1, 6)
	plot_model(model, to_file="model.png", show_shapes=True,
		show_layer_names=True)

# Utilities

### dataset generator

In [None]:
# import the necessary packages
from tensorflow.keras.utils import to_categorical
import numpy as np
import h5py

class HDF5DatasetGenerator:
	def __init__(self, dbPath, batchSize, preprocessors=None,
		aug=None, binarize=True, classes=2):
		# store the batch size, preprocessors, and data augmentor,
		# whether or not the labels should be binarized, along with
		# the total number of classes
		self.batchSize = batchSize
		self.preprocessors = preprocessors
		self.aug = aug
		self.binarize = binarize
		self.classes = classes

		# open the HDF5 database for reading and determine the total
		# number of entries in the database
		self.db = h5py.File(dbPath)
		self.numImages = self.db["labels"].shape[0]

	def generator(self, passes=np.inf):
		# initialize the epoch count
		epochs = 0

		# keep looping infinitely -- the model will stop once we have
		# reach the desired number of epochs
		while epochs < passes:
			# loop over the HDF5 dataset
			for i in np.arange(0, self.numImages, self.batchSize):
				# extract the images and labels from the HDF dataset
				images = self.db["images"][i: i + self.batchSize]
				labels = self.db["labels"][i: i + self.batchSize]

				# check to see if the labels should be binarized
				if self.binarize:
					labels = to_categorical(labels,
						self.classes)

				# check to see if our preprocessors are not None
				if self.preprocessors is not None:
					# initialize the list of processed images
					procImages = []

					# loop over the images
					for image in images:
						# loop over the preprocessors and apply each
						# to the image
						for p in self.preprocessors:
							image = p.preprocess(image)

						# update the list of processed images
						procImages.append(image)

					# update the images array to be the processed
					# images
					images = np.array(procImages)

				# if the data augmenator exists, apply it
				if self.aug is not None:
					(images, labels) = next(self.aug.flow(images,
						labels, batch_size=self.batchSize))

				# yield a tuple of images and labels
				yield (images, labels)

			# increment the total number of epochs
			epochs += 1

	def close(self):
		# close the database
		self.db.close()

### Image to Array

In [None]:
# import the necessary packages
from tensorflow.keras.preprocessing.image import img_to_array

class ImageToArrayPreprocessor:
	def __init__(self, dataFormat=None):
		# store the image data format
		self.dataFormat = dataFormat

	def preprocess(self, image):
		# apply the Keras utility function that correctly rearranges
		# the dimensions of the image
		return img_to_array(image, data_format=self.dataFormat)

### Epoch Checkpointer

In [None]:
from tensorflow.keras.callbacks import Callback
import os

class EpochCheckpoint(Callback):
	def __init__(self, outputPath, every=5, startAt=0):
		# call the parent constructor
		super(Callback, self).__init__()

		# store the base output path for the model, the number of
		# epochs that must pass before the model is serialized to
		# disk and the current epoch value
		self.outputPath = outputPath
		self.every = every
		self.intEpoch = startAt

	def on_epoch_end(self, epoch, logs={}):
		# check to see if the model should be serialized to disk
		if (self.intEpoch + 1) % self.every == 0:
			p = os.path.sep.join([self.outputPath,
				"epoch_{}.hdf5".format(self.intEpoch + 1)])
			self.model.save(p, overwrite=True)

		# increment the internal epoch counter
		self.intEpoch += 1


### Training Monitor

## Training

In [None]:
# USAGE
# python train_recognizer.py --checkpoints fer2013/checkpoints
# python train_recognizer.py --checkpoints fer2013/checkpoints --model fer2013/checkpoints/epoch_20.hdf5 \
#   --start-epoch 20
 
# set the matplotlib backend so figures can be saved in the background
import matplotlib
matplotlib.use("Agg")
from tensorflow.keras.callbacks import BaseLogger
from tensorflow.keras.callbacks import LearningRateScheduler
import matplotlib.pyplot as plt
import numpy as np
import json
import os
# import the necessary packages
# from config import emotion_config as config
# from ImageUtilities.preprocessing import ImageToArrayPreprocessor
# from ImageUtilities.callbacks import TrainingMonitor
# from ImageUtilities.callbacks import EpochCheckpoint
# from ImageUtilities.io import HDF5DatasetGenerator
# from ImageUtilities.nn.conv import EmotionVGGNet
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import load_model
import tensorflow.keras.backend as K
# import argparse
import os
 
# construct the argument parse and parse the arguments
# ap = argparse.ArgumentParser()
# ap.add_argument("-c", "--checkpoints", required=True,
#   help="path to output checkpoint directory")
# ap.add_argument("-m", "--model", type=str,
#   help="path to *specific* model checkpoint to load")
# ap.add_argument("-s", "--start-epoch", type=int, default=0,
#   help="epoch to restart training at")
# args = vars(ap.parse_args())
 
##################### Training Monitor #########################
class TrainingMonitor(BaseLogger):
    def __init__(self, figPath, jsonPath=None, startAt=0):
        # store the output path for the figure, the path to the JSON
        # serialized file, and the starting epoch
        super(TrainingMonitor, self).__init__()
        self.figPath = figPath
        self.jsonPath = jsonPath
        self.startAt = startAt
 
    def on_train_begin(self, logs={}):
        # initialize the history dictionary
        self.H = {}
 
        # if the JSON history path exists, load the training history
        if self.jsonPath is not None:
            if os.path.exists(self.jsonPath):
                self.H = json.loads(open(self.jsonPath).read())
 
                # check to see if a starting epoch was supplied
                if self.startAt > 0:
                    # loop over the entries in the history log and
                    # trim any entries that are past the starting
                    # epoch
                    for k in self.H.keys():
                        self.H[k] = self.H[k][:self.startAt]
 
    def on_epoch_end(self, epoch, logs={}):
        # loop over the logs and update the loss, accuracy, etc.
        # for the entire training process
        for (k, v) in logs.items():
            l = self.H.get(k, [])
            # print("l : ", l)
            # print("k : ", k)
            # print("v: ", v)
            l.append(str(v))
            self.H[k] = l
 
        # check to see if the training history should be serialized
        # to file
        if self.jsonPath is not None:
            f = open(self.jsonPath, "w")
            f.write(json.dumps(self.H))
            f.close()
 
        # ensure at least two epochs have passed before plotting
        # (epoch starts at zero)
        if len(self.H["loss"]) > 1:
            # plot the training loss and accuracy
            N = np.arange(0, len(self.H["loss"]))
            plt.style.use("ggplot")
            plt.figure()
            plt.plot(N, np.array(self.H["loss"]).astype(np.float), label="train_loss")
            plt.plot(N, np.array(self.H["val_loss"]).astype(np.float), label="val_loss")
            plt.plot(N, np.array(self.H["accuracy"]).astype(np.float), label="train_acc")
            plt.plot(N, np.array(self.H["val_accuracy"]).astype(float), label="val_acc")
            plt.title("Training Loss and Accuracy [Epoch {}]".format(
                len(self.H["loss"])))
            plt.xlabel("Epoch #")
            plt.ylabel("Loss/Accuracy")
            plt.legend()
 
            # save the figure
            plt.savefig(self.figPath)
            plt.close()
#################### Training Monitor Ends here ###################
checkpoints = '/content/gdrive/My Drive/EmotionRecognition/checkpoints/'
# checkpoint_model = '/content/gdrive/My Drive/EmotionRecognition/checkpoints/epoch_40.hdf5'
checkpoint_model = None
 
# learning rate schedule
def my_schedule(epoch):
    if epoch >= 0 and epoch < 40:
        lrate = 1e-3
        print("[Info] Learning rate: ", K.get_value(model.optimizer.lr))
    if epoch >= 41 and epoch < 60:
        lrate = 1e-4
        print("[Info] Learning rate: ", K.get_value(model.optimizer.lr))
    if epoch >= 61 and epoch < 75:
        lrate = 1e-5
        print("[Info] Learning rate: ", K.get_value(model.optimizer.lr))
    return lrate
 
 
# construct the training and testing image generators for data
# augmentation, then initialize the image preprocessor
trainAug = ImageDataGenerator(rotation_range=10, zoom_range=0.1,
    horizontal_flip=True, height_shift_range=0.1, rescale=1 / 255.0, fill_mode="nearest")
valAug = ImageDataGenerator(rotation_range=10, zoom_range=0.1,
    horizontal_flip=True, height_shift_range=0.1, rescale=1 / 255.0, fill_mode="nearest")
iap = ImageToArrayPreprocessor()
 
# initialize the training and validation dataset generators
trainGen = HDF5DatasetGenerator(TRAIN_HDF5, BATCH_SIZE,
    aug=trainAug, preprocessors=[iap], classes=NUM_CLASSES)
valGen = HDF5DatasetGenerator(VAL_HDF5, BATCH_SIZE,
    aug=valAug, preprocessors=[iap], classes=NUM_CLASSES)
 
start_epoch = 0
INIT_LR = 1e-3
EPOCHS = 75
BATCHES_PER_EPOCH = trainGen.numImages // BATCH_SIZE
LR_DECAY = (1./0.01-1)/ BATCHES_PER_EPOCH
# if there is no specific model checkpoint supplied, then initialize
# the network and compile the model
if checkpoint_model is None:
    print("[INFO] compiling model...")
    model = EmotionVGGNet.build(width=48, height=48, depth=1,
        classes=NUM_CLASSES)
    # opt = Adam(lr=INIT_LR, decay=LR_DECAY)
    opt = Adam(lr=INIT_LR, beta_1=0.9, beta_2=0.999, epsilon=1e-7)
    model.compile(loss="categorical_crossentropy", optimizer=opt,
        metrics=["accuracy"])
 
# otherwise, load the checkpoint from disk
else:
    print("[INFO] loading {}...".format(checkpoint_model))
    model = load_model(checkpoint_model)
 
    # update the learning rate
    print("[INFO] old learning rate: {}".format(
        K.get_value(model.optimizer.lr)))
    K.set_value(model.optimizer.lr, INIT_LR)
    print("[INFO] new learning rate: {}".format(
        K.get_value(model.optimizer.lr)))
 
# construct the set of callbacks
figPath = os.path.sep.join([OUTPUT_PATH,
    "vggnet_emotion.png"])
jsonPath = os.path.sep.join([OUTPUT_PATH,
    "vggnet_emotion.json"])
lrate = LearningRateScheduler(my_schedule)
callbacks = [
    EpochCheckpoint(checkpoints, every=5,
        startAt=start_epoch),
    TrainingMonitor(figPath, jsonPath=jsonPath,
        startAt=start_epoch),
        lrate]
 
# train the network
model.fit_generator(
    trainGen.generator(),
    steps_per_epoch=BATCHES_PER_EPOCH,
    validation_data=valGen.generator(),
    validation_steps=valGen.numImages // BATCH_SIZE,
    epochs=EPOCHS,
    max_queue_size=BATCH_SIZE * 2,
    callbacks=callbacks, verbose=1)
 
# close the databases
trainGen.close()
valGen.close()



[INFO] compiling model...
Instructions for updating:
Please use Model.fit, which supports generators.
[Info] Learning rate:  0.001
Epoch 1/75
[Info] Learning rate:  0.001
Epoch 2/75
[Info] Learning rate:  0.001
Epoch 3/75
[Info] Learning rate:  0.001
Epoch 4/75
[Info] Learning rate:  0.001
Epoch 5/75
[Info] Learning rate:  0.001
Epoch 6/75
[Info] Learning rate:  0.001
Epoch 7/75
[Info] Learning rate:  0.001
Epoch 8/75
[Info] Learning rate:  0.001
Epoch 9/75
[Info] Learning rate:  0.001
Epoch 10/75
[Info] Learning rate:  0.001
Epoch 11/75
[Info] Learning rate:  0.001
Epoch 12/75
[Info] Learning rate:  0.001
Epoch 13/75
[Info] Learning rate:  0.001
Epoch 14/75
[Info] Learning rate:  0.001
Epoch 15/75
[Info] Learning rate:  0.001
Epoch 16/75
[Info] Learning rate:  0.001
Epoch 17/75
[Info] Learning rate:  0.001
Epoch 18/75
[Info] Learning rate:  0.001
Epoch 19/75
[Info] Learning rate:  0.001
Epoch 20/75
[Info] Learning rate:  0.001
Epoch 21/75
[Info] Learning rate:  0.001
Epoch 22/75
[Info