In [1]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
import os
import matplotlib.pyplot as plt
import cv2
import random
from sklearn import preprocessing
from sklearn.model_selection import train_test_split

from tensorflow.keras.models import Model
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.layers import AveragePooling2D
from tensorflow.keras.layers import MaxPooling2D
from tensorflow.keras.layers import GlobalMaxPooling2D
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import Activation
from tensorflow.keras.layers import Dropout
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import Input
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import concatenate
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.callbacks import ModelCheckpoint

### Image Data Generator & Preprocessing

In [2]:
#### Use Image Data Generator to call data to prevent OOM

IMG_SIZE = 48
batch_size = 100

train_datagen = ImageDataGenerator(
        shear_range=0.2, ### Augmentation: the IDG will randomly apply augmentation on every image, and use that augmented 
        zoom_range=0.2,   ### data to train instead of using the original data.
        horizontal_flip=True,
        validation_split=0.1, ### split for validation. Train : validation = 9:1
        preprocessing_function=tf.keras.applications.imagenet_utils.preprocess_input)

train_generator = train_datagen.flow_from_directory(
        "../input/fer2013/train",
        target_size=(IMG_SIZE, IMG_SIZE),
        batch_size=batch_size,
        class_mode='binary',
        subset='training')

validation_generator = train_datagen.flow_from_directory(
        "../input/fer2013/train",
        target_size=(IMG_SIZE, IMG_SIZE), 
        batch_size=batch_size,
        class_mode='binary',
        subset='validation')

Found 25841 images belonging to 7 classes.
Found 2868 images belonging to 7 classes.


In [3]:
### Create test set.

test_datagen = ImageDataGenerator(
        shear_range=0.2,
        zoom_range=0.2,
        horizontal_flip=True,
        preprocessing_function=tf.keras.applications.imagenet_utils.preprocess_input)

test_generator = test_datagen.flow_from_directory(
        "../input/fer2013/test",
        target_size=(IMG_SIZE, IMG_SIZE),
        batch_size=batch_size,
        class_mode='binary')

Found 7178 images belonging to 7 classes.


In [16]:
### check distribution:
for sentiment in list(train_generator.class_indices.keys()):
    print(sentiment, sum(sentiment in s for s in train_generator.filenames))
    
### We can see that the data is skewed, since 'happy' sentiment occurs many time and 'disgust' occurs very few.
### To prevent this, we can use resampling. However, I will not use resampling for this project

angry 3596
disgust 393
fear 3688
happy 6494
neutral 4469
sad 4347
surprise 2854


In [21]:
#### Load pretrained model (if have):
#### This is my pretrained model with accuracy over test set is 0.6095
model = tf.keras.models.load_model("../input/pretrained-model/cnn_model.h5")

### Simple CNN

In [18]:
model = Sequential()

model.add(Conv2D(32,(3,3), padding="same", input_shape=(224, 224, 3)))
#model.add(BatchNormalization())
model.add(Activation("relu"))
model.add(Conv2D(32,(3,3), padding="same"))
#model.add(BatchNormalization())
model.add(Activation("relu"))
model.add(MaxPooling2D(pool_size=(2,2)))

model.add(Conv2D(64,(3,3), padding="same"))
model.add(Activation("relu"))
model.add(Conv2D(64,(3,3), padding="same"))
model.add(Activation("relu"))
model.add(MaxPooling2D(pool_size=(2,2)))

model.add(Conv2D(128,(3,3), padding="same"))
model.add(Activation("relu"))
model.add(Conv2D(128,(3,3), padding="same"))
model.add(Activation("relu"))
model.add(MaxPooling2D(pool_size=(2,2)))

#model.add(Flatten())
model.add(Dense(128))
model.add(Activation("relu"))
model.add(Dropout(0.2))
#model.add(BatchNormalization())
model.add(Dense(64))
model.add(Activation("relu"))
model.add(Dropout(0.2))
#model.add(BatchNormalization())
model.add(GlobalMaxPooling2D())
model.add(Dense(7))
#model.add(MaxPooling2D(pool_size=(2,2)))
model.add(Activation("softmax"))

model.summary()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d (Conv2D)              (None, 224, 224, 32)      896       
_________________________________________________________________
activation (Activation)      (None, 224, 224, 32)      0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 224, 224, 32)      9248      
_________________________________________________________________
activation_1 (Activation)    (None, 224, 224, 32)      0         
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 112, 112, 32)      0         
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 112, 112, 64)      18496     
_________________________________________________________________
activation_2 (Activation)    (None, 112, 112, 64)      0

In [19]:
#aug=ImageDataGenerator(rotation_range=0.18, zoom_range=0.15, width_shift_range=0.2,height_shift_range=0.2, horizontal_flip=True)
#opt= SGD(learning_rate=0.01,momentum=0.9)

checkpoint = ModelCheckpoint('best-weights.h5', monitor='val_loss', save_best_only=True)

In [20]:
'''
BUFFER_SIZE = 2000

def make_ds(features, labels):
  ds = tf.data.Dataset.from_tensor_slices((features, labels))#.cache()
  ds = ds.shuffle(BUFFER_SIZE).repeat()
  return ds

pos_ds = make_ds(train_generator, train_generator.classes)

resampled_ds = pos_ds.batch(batch_size).prefetch(2)

'''

'\nBUFFER_SIZE = 2000\n\ndef make_ds(features, labels):\n  ds = tf.data.Dataset.from_tensor_slices((features, labels))#.cache()\n  ds = ds.shuffle(BUFFER_SIZE).repeat()\n  return ds\n\npos_ds = make_ds(train_generator, train_generator.classes)\n\nresampled_ds = pos_ds.batch(batch_size).prefetch(2)\n\n'

In [22]:
### Training using pretrained model, or we can also use the new model.
### I have trained before with 100 epochs, now I run 15 epochs just for showing the result

batch_size = 100
opt= SGD(learning_rate=0.00005, momentum =0.9)
model.compile(loss = "sparse_categorical_crossentropy", optimizer = opt, metrics = ['accuracy'])
model.fit(train_generator, epochs = 15, batch_size = batch_size,validation_data = validation_generator, 
          steps_per_epoch =train_generator.samples//batch_size,callbacks=[checkpoint])

2022-05-02 12:49:29.958411: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)


Epoch 1/15


2022-05-02 12:49:32.661329: I tensorflow/stream_executor/cuda/cuda_dnn.cc:369] Loaded cuDNN version 8005


Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15


<keras.callbacks.History at 0x7f6fb7d1b0d0>

In [20]:
model.evaluate(test_generator)



[1.0593725442886353, 0.6095012426376343]

In [9]:
### model predict image:
img_array = cv2.imread("../input/fer2013/train/disgust/Training_10598340.jpg")
img_size = 224
resized_array = cv2.resize(img_array, (img_size, img_size))
resized_array = np.expand_dims(resized_array, axis = 0)

np.argmax(model.predict(resized_array))

5

In [24]:
model.save("cnn_model.h5")

### Deep Learning & Transfer Learning

In [25]:
### change last layers so that the final output will be 7 corresponds to 7 sentiments.

### take input
base_input = tf.keras.Input(shape=(IMG_SIZE, IMG_SIZE, 3))
### use pretrained ResNet
model = tf.keras.applications.ResNet50(include_top=False, pooling="max", weights="imagenet")(base_input)

### Add new layers

final_output = layers.Dense(128)(model)
final_output = layers.Activation('relu')(final_output)
final_output = layers.BatchNormalization()(final_output)
final_output = layers.Dense(64)(final_output)
final_output = layers.Activation('relu')(final_output)
final_output = layers.BatchNormalization()(final_output)
final_output = layers.Dense(7, activation = 'softmax')(final_output)
final_output

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5


<KerasTensor: shape=(None, 7) dtype=float32 (created by layer 'dense_5')>

In [26]:
#### model summary
new_model = keras.Model(inputs = base_input, outputs = final_output)
new_model.summary()

Model: "model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 48, 48, 3)]       0         
_________________________________________________________________
resnet50 (Functional)        (None, 2048)              23587712  
_________________________________________________________________
dense_3 (Dense)              (None, 128)               262272    
_________________________________________________________________
activation_9 (Activation)    (None, 128)               0         
_________________________________________________________________
batch_normalization (BatchNo (None, 128)               512       
_________________________________________________________________
dense_4 (Dense)              (None, 64)                8256      
_________________________________________________________________
activation_10 (Activation)   (None, 64)                0     

In [27]:
opt= SGD(learning_rate=0.0003, momentum =0.9)

early_stopping_monitor = EarlyStopping(
    monitor='val_loss',
    min_delta=0,
    patience=0,
    verbose=0,
    mode='auto',
    baseline=None,
    restore_best_weights=True
)

checkpoint = ModelCheckpoint('best-weights.h5', monitor='val_loss', save_best_only=True)

new_model.compile(loss = "sparse_categorical_crossentropy", optimizer = opt, metrics = ['accuracy'])

In [28]:
#callbacks=[early_stopping_monitor]
### I have trained before with 100 epochs and got about 0.5700 accuracy, but I lost that h5 file. Now I rerun with 20 epochs just for showing.
new_model.fit(train_generator, validation_data=validation_generator, callbacks=[checkpoint], epochs=20,verbose=1)

Epoch 1/20




Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


<keras.callbacks.History at 0x7f6f5300e990>

In [30]:
new_model.save("transfer_model.h5")

In [None]:
new_model = tf.keras.models.load_model("transfer_model.h5")

In [29]:
### 20 epochs got 0.5678 accuracy on test set.
new_model.evaluate(test_generator)



[1.1946264505386353, 0.5678461790084839]