<a href="https://colab.research.google.com/github/sneakatyou/OFER/blob/main/OFER.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import tensorflow as tf
import numpy as np
import os
from PIL import Image
import cv2
from matplotlib import pyplot as plt
import pathlib
from sklearn.model_selection import train_test_split
import time
print(tf.__version__)

2.2.0-rc3


In [None]:
WORKERS = tf.data.experimental.AUTOTUNE
VAL_SPLIT = 0.2

In [None]:
class Dataset():
  def __init__(self, path, batch_size = 32, image_shape = (88, 88, 3), create_occlusion = False):
    self.path = path
    self.batch_size = batch_size
    self.image_height = image_shape[0]
    self.image_width = image_shape[1]
    self.label_dict = {'surprise': 0, 'fear': 1, 'disgust': 2, 'happiness': 3, 'sadness': 4, 'anger': 5, 'neutral': 6}
    self.num_classes = len(list(self.label_dict))
    self.all_paths, self.all_labels = [], []
    self.X_train, self.X_test, self.Y_train, self.Y_test = [], [], [], []
    for subdir, dirs, files in os.walk(self.path):
      for f in files:
        image_path = os.path.join(subdir, f)
        if f[:5] == 'train':
          self.X_train.append(image_path)
          label = image_path.split('/')[-2]
          self.Y_train.append(self.label_dict[label])
        else:
          self.X_test.append(image_path)
          label = image_path.split('/')[-2]
          self.Y_test.append(self.label_dict[label])


  def parse_function(self, image_path, label):
   
    image = tf.io.read_file(image_path)
    image = tf.image.decode_jpeg(image, channels = 3)
    image = tf.image.resize(image, [self.image_width, self.image_height])
    label = tf.one_hot(label,self.num_classes)
    return image,label
  
  def get_train_ds(self):
    BUFFER_SIZE = len(self.X_train)
    ds = tf.data.Dataset.from_tensor_slices((self.X_train , self.Y_train))
    ds = ds.shuffle(BUFFER_SIZE)
    ds = ds.repeat()
    ds = ds.map(self.parse_function, num_parallel_calls = WORKERS)
    ds = ds.batch(self.batch_size, drop_remainder = True)
    ds = ds.prefetch(1)
    return ds
  
  def get_test_ds(self):
    BUFFER_SIZE = len(self.X_test)
    ds = tf.data.Dataset.from_tensor_slices((self.X_test , self.Y_test))
    ds = ds.shuffle(BUFFER_SIZE)
    ds = ds.repeat(count=1)
    ds = ds.map(self.parse_function, num_parallel_calls = WORKERS)
    ds = ds.batch( self.batch_size, drop_remainder = True )
    ds = ds.prefetch(1)
    return ds

In [None]:
class OcclusionCreator():
  def __init__(self):
    #path to folder which contains images to be used as occlusions
    path = '/content/drive/My Drive/Occluded Facial Expression Recognition/Datasets/occlusion/www.pngplay.com'
    self.occlusion_paths = []
    for subdir, dirs, files in os.walk(path):
      for f in files:
        image_path = os.path.join(subdir, f)
        self.occlusion_paths.append(image_path)
    self.index = 0
  
  def resize(self, image_1, image_2):
    x1 = image_1.shape[1]
    y1 = image_1.shape[0]
    x2 = image_2.shape[1]
    y2 = image_2.shape[0]
    r = x2 // y2

    if r == 0:
      r = y2 // x2
      f = np.random.rand(30, 55)
      factor = np.random.randint(10, 15)
      factor = factor / 10
      seedY = np.random.randint(30, 55)
      seedX = int((seedY / r)*factor)
    elif r in (0.8, 1.2):
      seedX = np.random.randint(30, 55)
      seedY = int((seedX / r)*factor)     
    else:
      f = np.random.rand(30, 55)
      factor = np.random.randint(10, 15)
      factor = factor / 10
      seedX = np.random.randint(30, 55)
      seedY = int((seedX / r)*factor)

    if(x2 > x1 or y2 > y1 or x2 > x1 // 2 or y2 > x2 // 2):
      image_2 = cv2.resize(image_2, (seedX, seedY), interpolation = cv2.INTER_AREA) 
    return image_2

  def generate_coordinates(self, image_1, image_2):
    x1 = image_1.shape[1]
    y1 = image_1.shape[0]
    x2 = image_2.shape[1]
    y2 = image_2.shape[0]
    try:
      seedX = np.random.randint(0, (x1 - x2))
      seedY = np.random.randint(0, (y1 - y2))
    except:
      seedX = 50
      seedY = 50
    return seedX, seedY

  def overlay(self, image, occlusion, coordinates):
    x_offset, y_offset = coordinates[0], coordinates[1]
    x1, x2 = x_offset, x_offset + occlusion.shape[1]
    y1, y2 = y_offset, y_offset + occlusion.shape[0]
    occ_alpha = (occlusion[:, :, 3] / 255.0)
    img_alpha = 1.0 - occ_alpha
    for c in range(0, 3):
      image[y1:y2, x1:x2, c] = (occ_alpha*occlusion[:, :, c] + img_alpha*image[y1:y2, x1:x2, c])
    return image

  def impose(self, x_batch):
    occluded_images = []
    for i in range(x_batch.shape[0]):
      try:
        image = x_batch[i]
        occlusion_path = self.occlusion_paths[self.index]
        self.index = (self.index + 1) % len(self.occlusion_paths)
        occlusion = cv2.imread(occlusion_path, cv2.IMREAD_UNCHANGED)
        occlusion = self.resize(image, occlusion)
        coordinates = self.generate_coordinates(image, occlusion)
        occluded_image = self.overlay(image, occlusion, coordinates)
        occluded_images.append(occluded_image)
      except:
        print(self.occlusion_paths[self.index - 1])
    return tf.convert_to_tensor(occluded_images)

MODEL DESCRIPTION

In [None]:
class BaseModel(tf.keras.Model):
  def __init__(self):
    super(BaseModel, self).__init__()
    self.dense = tf.keras.layers.Dense(units = 7)
    self.input_layer = tf.keras.Input(shape = (88, 88, 3))
    self.dropout = tf.keras.layers.Dropout(rate=0.6,seed=7)
    self.batchNorm=tf.keras.layers.BatchNormalization()
    self.GlobalAvg=tf.keras.layers.GlobalAveragePooling2D()
    self.base = tf.keras.applications.resnet50.ResNet50(weights = 'imagenet', input_tensor = self.input_layer, include_top = False)
    for layer in self.base.layers:
      if layer.__class__.__name__ == 'BatchNormalization':
        layer.trainable = False
    output = self.base.get_layer('conv3_block4_3_conv').output
    self.model = tf.keras.Model(inputs = [self.base.input], outputs = [output])

  def call(self, x):
    resnet = self.base
    
    x=tf.keras.applications.resnet50.preprocess_input(x)
    z = resnet(x)
    z=self.dropout(z)
    z=self.GlobalAvg(z)
    z=self.dense(z)
    z=self.batchNorm(z)   
    z = tf.keras.activations.softmax(z)
    return z

  def feature_map(self, x):
    return self.model.predict(x)

In [None]:
class Discriminator(tf.keras.Model):
  def __init__(self):
    super(Discriminator, self).__init__()
    self.conv1 = tf.keras.layers.Conv2D(filters = 512, kernel_size = (3, 3), strides = (2, 2), padding = 'same')
    self.conv2 = tf.keras.layers.Conv2D(filters = 512, kernel_size = (3, 3), strides = (2, 2), padding = 'same')
    self.conv3 = tf.keras.layers.Conv2D(filters = 512, kernel_size = (3, 3), strides = (2, 2), padding = 'same')
    self.conv4 = tf.keras.layers.Conv2D(filters = 512, kernel_size = (3, 3), strides = (2, 2), padding = 'same')
    self.conv5 = tf.keras.layers.Conv2D(filters = 1, kernel_size = (3, 3), strides = (1, 1), padding = 'same') #-- padding changed to same

  def call(self, h):
    z = self.conv1(h)
    z = tf.nn.leaky_relu(z)

    z = self.conv2(z)
    z = tf.keras.layers.BatchNormalization()(z)
    z = tf.nn.leaky_relu(z)

    z = self.conv3(z)
    z = tf.keras.layers.BatchNormalization()(z)
    z = tf.nn.leaky_relu(z)

    z = self.conv4(z)
    z = tf.keras.layers.BatchNormalization()(z)
    z = tf.nn.leaky_relu(z)

    z = self.conv5(z)

    logits = tf.squeeze(z)
    z = tf.nn.sigmoid(z) + 1e-10
    return logits

In [None]:
class Decoder(tf.keras.Model):
  def __init__(self):
    super(Decoder, self).__init__()
    self.deconv1 = tf.keras.layers.Conv2DTranspose(filters = 256, kernel_size = (3, 3), strides = (2, 2), padding = 'same')
    self.deconv2 = tf.keras.layers.Conv2DTranspose(filters = 64, kernel_size = (3, 3), strides = (2, 2), padding = 'same')
    self.deconv3 = tf.keras.layers.Conv2DTranspose(filters = 3, kernel_size = (3, 3), strides = (2, 2), padding = 'same')
    
  def call(self, h):
    z = self.deconv1(h)
    z = tf.keras.layers.BatchNormalization()(z)
    z = tf.keras.activations.relu(z)

    z = self.deconv2(z)
    z = tf.keras.layers.BatchNormalization()(z)
    z = tf.keras.activations.relu(z)

    z = self.deconv3(z)
    z = tf.keras.layers.BatchNormalization()(z)
    z = tf.keras.activations.relu(z)

    reconstructed_image = tf.math.scalar_mul(255, z)
    return reconstructed_image

In [None]:
class Model(tf.keras.Model):
  def __init__(self):
    super(Model, self).__init__()
    self.occluded_net = BaseModel()
    self.non_occluded_net = BaseModel()
    self.discriminator = Discriminator()
    self.decoder = Decoder()
    self.occ_model = None
    self.occlusion_creator_1 = OcclusionCreator()
    self.occlusion_creator_2 = OcclusionCreator()

  def train_occluded_net(self, train_ds, test_ds, num_epochs = 100, learning_rate = 0.0001, batch_size = 32):
    optimizer = tf.keras.optimizers.RMSprop(learning_rate = learning_rate)
    
    early_stopping = tf.keras.callbacks.EarlyStopping(min_delta = 0.00001, mode = 'min', patience = 10)
    ckpt_path = '/content/drive/My Drive/Occluded Facial Expression Recognition/Occluded Checkpoints'
    #checkpoint = tf.keras.callbacks.ModelCheckpoint(os.path.join(ckpt_path, 'Ckpt_{epoch:02d}_{val_loss:.2f}.ckpt'), save_best_only = True, save_weights_only = True)
    lr_scheduler = tf.keras.callbacks.LearningRateScheduler(lambda epoch: learning_rate*tf.math.exp(-0.01*epoch))

    self.occluded_net.compile(loss = 'categorical_crossentropy', optimizer = optimizer, metrics = ['accuracy'])
    self.occluded_net.fit(train_ds, epochs = num_epochs, steps_per_epoch = 200, 
                          validation_data = test_ds, 
                          callbacks = [early_stopping, lr_scheduler]) #, checkpoint
    
  def train_non_occluded_net(self, train_ds, test_ds, num_epochs = 100, learning_rate = 0.0001, batch_size = 32):
    optimizer = tf.keras.optimizers.RMSprop(learning_rate = learning_rate)
    
    early_stopping = tf.python.keras.callbacks.EarlyStopping(min_delta = 0.00001, mode = 'min', patience = 10)
    ckpt_path = '/content/drive/My Drive/Occluded Facial Expression Recognition/Non-occluded Checkpoints'
    #checkpoint = tf.python.keras.callbacks.ModelCheckpoint(os.path.join(ckpt_path, 'Ckpt_{epoch:02d}_{val_loss:.2f}.ckpt'), save_best_only = True, save_weights_only = True)
    lr_scheduler = tf.keras.callbacks.LearningRateScheduler(lambda epoch: learning_rate*tf.math.exp(-0.01*epoch))

    self.non_occluded_net.compile(loss = 'categorical_crossentropy', optimizer = optimizer, metrics = ['accuracy'])
    self.non_occluded_net.fit(train_ds, epochs = num_epochs, steps_per_epoch = 200, 
                              validation_data = test_ds,
                              callbacks = [early_stopping,lr_scheduler]) #, checkpoint

  def build_architecture(self):
    x = tf.keras.Input(shape = (88, 88, 3))
    h = tf.keras.Input(shape = (11, 11, 512))
    y = self.occluded_net(x)
    x_rec = self.decoder(h)
    model = tf.keras.Model(inputs = [x, h], outputs = [y, x_rec])
    return model
  
  def train(self, train_ds, test_ds, lambdas, num_epochs = 100, k1 = 50, k2 = 50, disc_lr = 0.0001, occ_lr = 0.00002, batch_size = 32):
    self.occ_model = self.build_architecture()
    disc_optimizer = tf.keras.optimizers.Adam(learning_rate = disc_lr)
    occ_optimizer = tf.keras.optimizers.Adam(learning_rate = occ_lr)
    disc_loss = tf.keras.metrics.Mean(name = 'disc_loss')
    occ_loss = tf.keras.metrics.Mean(name = 'occ_loss')
    occ_accuracy = tf.keras.metrics.CategoricalAccuracy(name = 'occ_accuracy')
    disc_loss_log = []
    occ_loss_log = []
    accuracy_log = []

    for epoch in range(num_epochs):
      disc_loss.reset_states()
      batch_iter = iter(train_ds)
      for k in range(k1):
        x_batch, _ = next(batch_iter)
        occ_x_batch = self.occlusion_creator_1.impose(x_batch.numpy())
        ho_batch = self.occluded_net.feature_map(occ_x_batch)
        hc_batch = self.non_occluded_net.feature_map(x_batch)

        with tf.GradientTape() as tape:
          d_batch = tf.ones((batch_size, ))
          d_batch2 = tf.zeros((batch_size,))
          d_pred_fake = self.discriminator(ho_batch)
          d_pred_real = self.discriminator(hc_batch)        
          loss += tf.nn.sigmoid_cross_entropy_with_logits(d_batch, d_pred_real)
        gradients = tape.gradient(loss, self.discriminator.trainable_variables)
        disc_optimizer.apply_gradients(zip(gradients, self.discriminator.trainable_variables))
        disc_loss(loss)
      disc_loss_log.append(disc_loss.result())

      template = 'Epoch {}: discriminator_loss: {}'
      print(template.format(epoch + 1, disc_loss.result()), end = " - ")

      occ_loss.reset_states()
      occ_accuracy.reset_states()
      batch_iter = iter(train_ds)
      for k in range(k2):
        x_batch, y_batch = next(batch_iter)
        occ_x_batch = self.occlusion_creator_2.impose(x_batch.numpy())
        ho = self.occluded_net.feature_map(occ_x_batch)
        with tf.GradientTape() as tape:
          yo = self.occluded_net(occ_x_batch)
          yc = self.non_occluded_net(x_batch)
          d_ho = self.discriminator(ho)
          x_rec = self.decoder(ho)

          l_sup = tf.reduce_mean(tf.keras.losses.categorical_crossentropy(y_batch, yo))  #.2
          l_sim = tf.reduce_mean(tf.keras.losses.MSE(yo, yc)) #.125
          l_lir = tf.math.maximum(0, tf.reduce_mean(tf.keras.losses.categorical_crossentropy(y_batch, yc)) - l_sup)#.175
          l_adv = tf.reduce_mean(-tf.math.log(d_ho))#2.5
          l_rec = tf.reduce_mean(tf.keras.losses.MSE(tf.reshape(x_batch, (batch_size, -1)), tf.reshape(x_rec, (batch_size, -1)))) #2.5
          
          loss = l_sup + lambdas[0]*l_sim + lambdas[1]*l_lir + lambdas[2]*l_adv + lambdas[3]*l_rec
          # print(l_sup, lambdas[0]*l_sim, lambdas[1]*l_lir, lambdas[2]*l_adv, lambdas[3]*l_rec)
        gradients = tape.gradient(loss, self.occ_model.trainable_variables)
        occ_optimizer.apply_gradients(zip(gradients, self.occ_model.trainable_variables))

        occ_loss(loss)
        occ_accuracy(y_batch, yo)
      occ_loss_log.append(occ_loss.result())
      accuracy_log.append(occ_accuracy.result())

      template = 'model_loss: {} - train_accuracy: {} - test_accuracy: {}'
      print(template.format(occ_loss.result(), 100*occ_accuracy.result(), self.occluded_net.evaluate(test_ds)[1]))

  def evaluate(self, dataset):
    evaluation = {}
    scores = self.occluded_net.evaluate(dataset)
    metrics_names = self.occluded_net.metrics_names
    for i in range(len(metrics_names)):
      evaluation[metrics_names[i]] = scores[i]
    return evaluation

  def predict(self, dataset):
    pred = self.occluded_net.predict(dataset)
    label_dict = {0: 'surprise', 1: 'fear', 2: 'disgust', 3: 'happiness', 4: 'sadness', 5: 'anger', 6: 'neutral'}
    predictions = []
    for i in range(pred.shape[0]):
      predictions.append(label_dict[np.argmax(pred[i])])
    return predictions

In [None]:
model=Model()

In [None]:
path = '/content/drive/My Drive/Occluded Facial Expression Recognition/Datasets/NEW_OCCLUDED RAF-DB/Images'
dataset = Dataset(path)
train_dso = dataset.get_train_ds()
test_dso = dataset.get_test_ds()

In [None]:
model.train_occluded_net(train_dso, test_dso)

In [None]:
path = '/content/drive/My Drive/Occluded Facial Expression Recognition/Datasets/RAF-DB/Images'
dataset = Dataset(path)
train_ds = dataset.get_train_ds()
test_ds = dataset.get_test_ds()

In [None]:
model.train_non_occluded_net(train_ds, test_ds)

In [None]:
lambdas = tf.constant([0.2, 0.175, 0.25, 0.25]) #[0.2, 0.175, 0.75, 0.75]
model.train(train_ds, test_ds, lambdas)

In [None]:
model.evaluate(test_dso)