# Emotion Recognition Using Gaussian Kernel & Triplet Loss  

This is something I wanted to try out and didn't work out! :(

## Description

### Dataset
I used FERplus dataset, with only 6 classes; neutral, happy, sad, angry, fear and surprise.
I computed 5 average images for each class by averaging pixel values of class images.

### Architecture
I used Mobilenet pretrained on ImageNet, with layers until block 12, followed by a kernel layer that maps the embedding space into a space with axes corresponding to euclidean distance from the embeddings of the average images.

### Results
Training accuracy was always below 80% and validation accuracy was never above 20%.

### Conclusion
The model has a high bias high variance problem.  
High bias problem could suggest that the mapping to the new space computed by the kernel layer doesn't provide enough information for classififcation.  

High variance problem indicates that the model is overfitting on training data which suggests that the model is fitting on features irrelevant to the given classification task.

In [0]:
!pip install tensorflow-gpu==2.1

In [0]:
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


In [0]:
!tar -xf "./drive/My Drive/Datasets/fer_6_classes_balanced.tar.xz"

In [0]:
from __future__ import absolute_import, division, print_function, unicode_literals

import os
import csv
import copy
import sys

import tensorflow as tf
from tensorflow import keras
import IPython.display as display
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.preprocessing.image import ImageDataGenerator

print(tf.version.VERSION)

2.1.0


In [0]:
BATCH_SIZE = 64
IMG_HEIGHT = 96
IMG_WIDTH = 96
IMG_SIZE = 96
train_dir = "./fer_6_classes_balanced/train"
test_dir = "./fer_6_classes_balanced/test"
valid_dir = "./fer_6_classes_balanced/valid"
CLASS_NAMES = np.array(['anger', 'fear', 'happiness', 'neutral', 'sadness', 'surprise'])

In [0]:
data_generator = ImageDataGenerator(rescale = 1./255.,
                                   rotation_range = 40,
                                   width_shift_range = 0.2,
                                   height_shift_range = 0.2,
                                   shear_range = 0.2,
                                   zoom_range = 0.2,
                                   horizontal_flip = True)
test_data_generator = ImageDataGenerator(rescale = 1./255.)
train_data_gen = data_generator.flow_from_directory(batch_size=BATCH_SIZE,
                                               directory=train_dir,
                                               shuffle=True,
                                               target_size=(IMG_HEIGHT, IMG_WIDTH))
valid_data_gen = test_data_generator.flow_from_directory(batch_size=BATCH_SIZE,
                                               directory=valid_dir,
                                               shuffle=True,
                                               target_size=(IMG_HEIGHT, IMG_WIDTH))
test_data_gen = test_data_generator.flow_from_directory(batch_size=BATCH_SIZE,
                                               directory=test_dir,
                                               shuffle=True,
                                               target_size=(IMG_HEIGHT, IMG_WIDTH))

Found 19315 images belonging to 6 classes.
Found 12208 images belonging to 6 classes.
Found 3497 images belonging to 6 classes.


In [0]:
def decode_img(img):
  # convert the compressed string to a 3D uint8 tensor
  img = tf.image.decode_image(img, channels=3)
  # Use `convert_image_dtype` to convert to floats in the [0,1] range.
  img = tf.image.convert_image_dtype(img, tf.float32, saturate=True)
  # resize the image to the desired size.
  return tf.image.resize(img, [IMG_WIDTH, IMG_HEIGHT])

In [0]:
averages = [tf.io.read_file("./drive/My Drive/Datasets/averages/" + avg) for avg in os.listdir("./drive/My Drive/Datasets/averages/")]

In [0]:
averages = [decode_img(avg) for avg in averages]

In [0]:
averages = [tf.convert_to_tensor(avg) for avg in averages]

In [0]:
averages = tf.convert_to_tensor(averages)

In [0]:
IMG_SHAPE = (IMG_SIZE, IMG_SIZE, 3)

# Create the base model from the pre-trained model MobileNet V2
mobilenet = tf.keras.applications.MobileNetV2(input_shape=IMG_SHAPE,
                                               include_top=False,
                                               weights='imagenet')

Downloading data from https://github.com/JonathanCMitchell/mobilenet_v2_keras/releases/download/v1.1/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_96_no_top.h5


In [0]:
mobilenet.layers[-39].name

'block_12_add'

In [0]:
small_mobilenet = tf.keras.models.Model(inputs=mobilenet.layers[0].input, outputs=mobilenet.layers[-39].output)

In [0]:
small_mobilenet.summary()

Model: "model_1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 96, 96, 3)]  0                                            
__________________________________________________________________________________________________
Conv1_pad (ZeroPadding2D)       (None, 97, 97, 3)    0           input_1[0][0]                    
__________________________________________________________________________________________________
Conv1 (Conv2D)                  (None, 48, 48, 32)   864         Conv1_pad[0][0]                  
__________________________________________________________________________________________________
bn_Conv1 (BatchNormalization)   (None, 48, 48, 32)   128         Conv1[0][0]                      
____________________________________________________________________________________________

In [0]:
class Kernel(tf.keras.layers.Layer):

  def __init__(self, name='Kernel', variance=1):
    super(Kernel, self).__init__(name=name)
    self.softmax = tf.keras.layers.Dense(len(CLASS_NAMES), activation='softmax', name='prediction_softmax')
    self.variance = variance

  def call(self, input_tensor):
    averages_len = 30
    anchor = input_tensor[:-averages_len] # shape: BATCH_SIZE , EMBEDDING_DIM
    avg_features = input_tensor[-averages_len:]

    anchor = tf.expand_dims(anchor, 1)
    avg_features = tf.expand_dims(avg_features, 0)

    distances = tf.sqrt(tf.reduce_sum(tf.square(tf.subtract(anchor, avg_features)), axis=-1))
    distances.set_shape((None, averages_len))
    distances = tf.math.l2_normalize(distances, axis=-1)
    # variance = tf.cast(self.variance, tf.float32)
    # gaussian = tf.math.exp(-1 * distances / (2 * variance))
    # tf.print(distances[-5:])
    # distances_log = -1 * tf.math.log(distances)
    # distances_log.set_shape((None, len(CLASS_NAMES)))
    output = self.softmax(1 / distances)

    return output

In [0]:
EMBEDDING_DIM = 16

regularizer = tf.keras.regularizers.l2()
images = keras.Input(IMG_SHAPE)
inputs = tf.concat([images, averages], 0)

images_features = small_mobilenet(inputs)
images_features = tf.keras.layers.GlobalAveragePooling2D()(images_features)

final_output = Kernel()(images_features)

model = tf.keras.Model(inputs=images, outputs=final_output, name="kernel_model")

In [0]:
model.summary()

Model: "kernel_model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_7 (InputLayer)         [(None, 96, 96, 3)]       0         
_________________________________________________________________
tf_op_layer_concat_5 (Tensor [(None, 96, 96, 3)]       0         
_________________________________________________________________
model_1 (Model)              (None, 6, 6, 96)          558656    
_________________________________________________________________
global_average_pooling2d_5 ( (None, 96)                0         
_________________________________________________________________
Kernel (Kernel)              (None, 6)                 186       
Total params: 558,842
Trainable params: 542,714
Non-trainable params: 16,128
_________________________________________________________________


In [0]:
num_train = 19315
num_test = 3497
num_valid = 12208

steps_per_epoch = round(num_train)//BATCH_SIZE
validation_steps = round(num_valid)//BATCH_SIZE
test_steps = round(num_test)//BATCH_SIZE

lr_schedule = tf.keras.optimizers.schedules.InverseTimeDecay(
  0.0003,
  decay_steps=steps_per_epoch*1000,
  decay_rate=1,
  staircase=False)

# model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=lr_schedule),
#               loss=AverageTripletLoss(call_model, averages),
#               metrics=[TripletMetrics(averages, call_model)])
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
              loss='categorical_crossentropy',
              metrics=['accuracy'])

In [0]:
model.evaluate(test_data_gen, steps=test_steps)

  ...
    to  
  ['...']


[12.715241167280409, 0.2650463]

In [0]:
callback = tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=10, restore_best_weights=True)


history = model.fit(train_data_gen,
                    callbacks=[callback],
                    steps_per_epoch=steps_per_epoch,
                    epochs=100,
                    validation_data=valid_data_gen,
                    validation_steps=validation_steps)
model.save("./drive/My Drive/Models/mobilenetv2_softmax_kernel_30avgs")
model.evaluate(test_data_gen)

  ...
    to  
  ['...']
  ...
    to  
  ['...']
Train for 301 steps, validate for 190 steps
Epoch 1/100
Epoch 2/100
Epoch 3/100


KeyboardInterrupt: ignored

In [0]:
def call_model(inp):
  return model(inp)

In [0]:
class TripletMetrics(keras.metrics.Metric):

    def __init__(self, targets, call_model, name='triplet_accuracy', **kwargs):
      super(TripletMetrics, self).__init__(name=name, **kwargs)
      self.size = self.add_weight(name='size', initializer='zeros', dtype=tf.int32)
      self.true_positives = self.add_weight(name='tp', initializer='zeros', dtype=tf.int32)
      self.accuracy = self.add_weight(name='acc', initializer='zeros', dtype=tf.float32)
      self.targets = targets
      self.call_model = call_model

    def get_avg_features(self, call_model, avgs):
      features = tf.map_fn(lambda img: tf.squeeze(call_model(tf.expand_dims(img, 0))), avgs)
      return features


    def update_state(self, y_true, y_pred, sample_weight=None):
  
      call_model = self.call_model
      embeddings = tf.expand_dims(y_pred, 1)  # shape: (batch_size, 1, embedding_size)
      targets = self.targets
      avg_features = self.get_avg_features(call_model, targets)
      avg_features = tf.expand_dims(avg_features, 0)  # shape: (1, NUM_CLASSES, embedding_size)

      size = tf.cast(tf.shape(embeddings)[0], tf.int32)

      distances = tf.sqrt(tf.square(tf.subtract(embeddings, avg_features))) # shape: (batch_size, NUM_CLASSES, embedding_size)
      distances = tf.reduce_sum(distances, axis=2)  # shape: (batch_size, NUM_CLASSES,)


      predictions = tf.argmin(distances, axis=1)  # shape: (batch_size,)
      true_labels = tf.argmax(tf.cast(y_true, tf.int32), axis=1)   # shape: (batch_size,)
      true_positives = tf.cast(tf.shape(tf.where(tf.equal(predictions, true_labels)))[0], tf.int32)

      self.true_positives.assign_add(true_positives)
      self.size.assign_add(size)
      self.accuracy.assign(tf.cast(self.true_positives / self.size, tf.float32))

    def result(self):
      return self.accuracy
      
    def reset_states(self):
      self.accuracy.assign(0)
      self.true_positives.assign(0)
      self.size.assign(0)


In [0]:
class AverageTripletLoss(tf.keras.losses.Loss):

  def __init__(self, model_fn, targets, margin=1, name='AverageTripletLoss'):
    super(AverageTripletLoss, self).__init__(name=name)
    self.targets = targets
    self.model_fn = model_fn
    self.margin = margin

  def custom_triplet_loss(self, avg_features, anchor, margin, labels):

    anchor = tf.expand_dims(anchor, 1)
      
    where_pos = tf.where(tf.equal(labels, 1))
    where_neg = tf.where(tf.equal(labels, 0))
    negatives = tf.gather_nd(avg_features, where_neg)
    negatives = tf.reshape(negatives, [-1, len(CLASS_NAMES) - 1, EMBEDDING_DIM])
    positives = tf.gather_nd(avg_features, where_pos)
    positives = tf.reshape(positives, [-1, 1, EMBEDDING_DIM])

    # calculate euclidean distances
    d_pos = tf.sqrt(tf.reduce_sum(tf.square(tf.subtract(anchor, positives)), axis=-1))
    d_neg = tf.sqrt(tf.reduce_sum(tf.square(tf.subtract(anchor, negatives)), axis=-1))
    d_neu = tf.sqrt(tf.reduce_sum(tf.square(tf.subtract(positives, negatives)), axis=-1))

    # get triplet distances
    d1 = tf.maximum(tf.cast(0, tf.float32), margin + d_pos - d_neg)
    d2 = tf.maximum(tf.cast(0, tf.float32), margin + d_pos - d_neu)

    # apply gaussian kernel & margin policy
    # sigma = tf.cast(1, tf.float32)
    # d_pos = tf.minimum(tf.cast(1, tf.float32), tf.math.exp(-1 * d_pos / (2 * tf.square(sigma))) + margin)
    # d_neg = tf.maximum(tf.cast(0, tf.float32), tf.math.exp(-1 * d_neg / (2 * tf.square(sigma))) - margin)
    # d_neu = tf.maximum(tf.cast(0, tf.float32), tf.math.exp(-1 * d_neu / (2 * tf.square(sigma))) - margin)
    # d1 = tf.math.exp(-1 * d1 / (2 * tf.square(sigma)))
    # d2 = tf.math.exp(-1 * d2 / (2 * tf.square(sigma)))

    # calculate cross entropies of euclidean distances' distributions
    # d_pos = -1 * tf.math.log(d_pos)
    # d_neg = -1 * tf.math.log(1 - d_neg)
    # d_neu = -1 * tf.math.log(1 - d_neu)
    # d1 = -1 * tf.math.log(d1)
    # d2 = -1 * tf.math.log(d2)

    # loss = tf.reduce_sum(d_pos, -1) + tf.reduce_sum(d_neg, -1) + tf.reduce_sum(d_neu, -1)
    loss = tf.reduce_sum(d1 + d2, -1)
    loss = tf.reduce_mean(loss)
    return loss
    
  def get_avg_features(self, model_fn, avgs):
    features = tf.map_fn(lambda img: tf.squeeze(model_fn(tf.expand_dims(img, 0))), avgs)
    return features

  def call(self, y_true, y_pred):
    targets = self.targets
    model_fn = self.model_fn
    labels = tf.cast(y_true, tf.float32)
    anchor = y_pred
    margin = self.margin
    avg_features = self.get_avg_features(model_fn, targets)
    avg_features = tf.expand_dims(tf.convert_to_tensor(avg_features), 0)
    avg_features = tf.tile(avg_features, [BATCH_SIZE, 1, 1])
    return self.custom_triplet_loss(avg_features, anchor, margin, labels)
