In [None]:
pip install -i https://test.pypi.org/simple/ monitor-training==0.1.3

In [3]:
import numpy as np
import pandas as pd
import cv2
import os
import time
import random
import datetime
import json
from tensorboard import notebook
from sklearn.model_selection import KFold, StratifiedKFold
from PIL import Image
import matplotlib.pyplot as plt
import dlib
import glob
import tensorflow as tf
from tensorflow.keras import Model, Sequential
from tensorflow.keras.preprocessing.image import load_img, img_to_array, ImageDataGenerator
from tensorflow.keras.utils import to_categorical, plot_model
from tensorflow.keras.layers import Concatenate, Dropout, Activation, Add, Input, Dense, Conv2D, Flatten, GlobalAvgPool2D, GlobalMaxPool2D, MaxPooling2D, AvgPool2D, Flatten
from tensorflow.keras.optimizers import Adam, Nadam, RMSprop, SGD
from sklearn.model_selection import train_test_split
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.initializers import glorot_uniform
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.models import load_model
from tensorflow.keras.preprocessing import image
from monitor_training import MonitorSingleTraining

# Global variables
- Use models/checkpoint/ folder for storing checkpoints
- Use models/recovery/ folder for storing recovery json files

In [7]:
# class label dictionaries
CLASS_LABEL_MAP = {0:"Angry", 1:"Disgust", 2:"Fear", 3:"Happy", 4:"Neutral", 5:"Sad", 6:"Surprise"}
LABEL_CLASS_MAP = {"Angry":0, "Disgust":1, "Fear":2, "Happy":3, "Neutral":4, "Sad":5, "Surprise":6}

# network input width and height
IMG_WIDTH = 160
IMG_HEIGHT = 160

SEED_VALUE = 41
random.seed(SEED_VALUE)

CHECKPOINT_PATH = "../models/checkpoint/your dataset name"
RECOVERY_DIR = "../models/recovery/your dataset name"
drive_data_path = "../data/your dataset folder"

# Create Facial Landmark Annotations

## OPENCV FACE DETECTORS
- Use models/utils/ folder to store face and landmark detection models

In [8]:
## opencv DNN caffe model
opencv_dnn_caffe_model = "../models/utils/path to .caffemodel"
opencv_dnn_caffe_prototxt = "../models/utils/path to .prototxt"
caffe_net = cv2.dnn.readNetFromCaffe(opencv_dnn_caffe_prototxt, opencv_dnn_caffe_model)

## DLIB LANDMARKS PREDICTOR

In [9]:
### DLIB LANDMARKS PREDICTOR : ERT ALGORITHM
ert_landmarks_predictor = dlib.shape_predictor("../models/utils/path to .dat")

## Helper Functions

In [10]:
def rect_to_bb_hog(face):
	# take a bounding predicted by dlib and convert it
	# to the format (x, y, w, h) as we would normally do
	# with OpenCV
	x = face.left()
	y = face.top()
	w = face.right() - x
	h = face.bottom() - y
	# return a tuple of (x, y, w, h)
	return (x, y, w, h)

def rect_to_bb_cnn(face):
	# take a bounding predicted by dlib and convert it
	# to the format (x, y, w, h) as we would normally do
	# with OpenCV
	x = face.rect.left()
	y = face.rect.top()
	w = face.rect.right() - x
	h = face.rect.bottom() - y
	h = int(h*1.5)

	# return a tuple of (x, y, w, h)
	return (x, y, w, h)

def landmarks_to_np(shape, dtype="int"):
	# initialize the list of (x, y)-coordinates
	coords = np.zeros((68, 2), dtype=dtype)
	# loop over the 68 facial landmarks and convert them
	# to a 2-tuple of (x, y)-coordinates
	for i in range(0, 68):
		coords[i] = (shape.part(i).x, shape.part(i).y)
	# return the list of (x, y)-coordinates
	return coords

def visualize_face(im, x, y, w, h, landmarks):
	# draw box over face
	cv2.rectangle(im, (x,y), (x+w,y+h), (0,255,0), 2)
 
	# # loop over the (x, y)-coordinates for the facial landmarks and draw them on the image
	for (xl, yl) in landmarks.reshape((int(landmarks.shape[0]/2),2)):
		cv2.circle(im,(xl, yl), 1, (0, 0, 255), -1)
	
	plt.imshow(im)
	plt.show()


# Load img/label data to drive

In [68]:
train_face_crop, valid_face_crop, y_train, y_valid = [], [], [], []
# face crops of size 160x160 are fed to model
# use the pre-processing pipeline mentioned in the paper 
# to get face crops for train and validation (or cross-validation) portions of dataset

# Model

## Custom Data Generator for model 

In [16]:
class ImageDataGeneratorLandmarksOnly(object):
  def __init__(self,
               datagen,
               landmarks_predictor,
               target_shape=None,
               gen_batch_size = 1, 
               preprocess_input = None,
               start_idx = 0,
               end_idx = 68,
               verbose = 0):
      
    self.datagen = datagen
    self.target_shape = target_shape
    self.landmarks_predictor = landmarks_predictor
    print(type(preprocess_input))

    self.preprocess_input = preprocess_input
    self.gen_batch_size = gen_batch_size
    self.start_idx = start_idx
    self.end_idx = end_idx

    self.verbose = verbose

  def flow(self, X, y, batch_size = 4):

    generator = self.datagen.flow(X, y, batch_size = self.gen_batch_size)

    while True:

      N = 0
      X_batch, ylr_batch = [], []
      dict_labels_collected = {0:0, 1:0, 2:0, 3:0, 4:0, 5:0, 6:0, 7:0}
      got2 = False

      while N < batch_size:
        X_gen, yemotion_gen = generator.next()

        skip_cnt = 0
        for i in range(X_gen.shape[0]):

          if got2==True and dict_labels_collected[np.where(yemotion_gen[i]==1)[0][0]]>=(batch_size/7):
            skip_cnt+=1
            continue

          Xaug = X_gen[i]
          Xaug = Xaug.astype(dtype='uint8')

          x1, y1, x2, y2,w,h = 0,0,160,160,160,160
          face_rect = dlib.rectangle(x1, y1, x2, y2)
          landmarks = self.landmarks_predictor(Xaug, face_rect) 
          landmarks = landmarks_to_np(landmarks)

          if self.verbose == 1:
            visualize_face(np.array(Xaug),x1,y1,w,h,landmarks)
     
          skip_image = False

          landmarks = landmarks.astype(np.float32)

          for j in range(landmarks.shape[0]):
            landmarks[j][0]/=(IMG_WIDTH)
            landmarks[j][1]/=(IMG_HEIGHT)

          landmarks = landmarks.reshape((-1,))

          if skip_image==False:
            Xaug = Xaug.astype(dtype='float32')
            X_batch.append(Xaug)
            ylr_batch.append(landmarks[2*self.start_idx:2*self.end_idx])
            dict_labels_collected[np.where(yemotion_gen[i]==1)[0][0]]+=1
            if dict_labels_collected[np.where(yemotion_gen[i]==1)[0][0]]==((batch_size/7)+1):
              got2=True
          
          else:
            skip_cnt += 1 
        
          if len(X_batch)>=batch_size:
            break

        N+=self.gen_batch_size - skip_cnt

      X_batch, ylr_batch = np.array(X_batch), np.array(ylr_batch)
      X_batch =self.preprocess_input(X_batch)
      
      yield (X_batch,ylr_batch)

In [17]:
class ImageDataGeneratorEmotionOnly(object):
  def __init__(self,
               datagen,
               landmarks_predictor,
               target_shape=None,
               gen_batch_size = 1,
               preprocess_input = None,
               verbose = 0):
      
    self.datagen = datagen
    self.target_shape = target_shape
    self.landmarks_predictor = landmarks_predictor
    self.gen_batch_size = gen_batch_size
    self.preprocess_input = preprocess_input
    self.verbose = verbose

  def flow(self, X, y, batch_size = 4):

    generator = self.datagen.flow(X, y, batch_size = batch_size)
    
    while True:

      N = 0
      X_batch, yemotion_batch = [], []
      dict_labels_collected = {0:0, 1:0, 2:0, 3:0, 4:0, 5:0, 6:0, 7:0}
      got2 = False
  
      while N < batch_size:
        X_gen, yemotion_gen = generator.next()

        skip_cnt = 0
        for i in range(X_gen.shape[0]):
          
          if got2==True and dict_labels_collected[np.where(yemotion_gen[i]==1)[0][0]]>=(batch_size/7):
            skip_cnt+=1
            continue
          
          Xaug = X_gen[i]
          Xaug = Xaug.astype(dtype='uint8')
          
          x1, y1, x2, y2,w,h = 0,0,160,160,160,160
          face_rect = dlib.rectangle(x1, y1, x2, y2)
          landmarks = self.landmarks_predictor(Xaug, face_rect) 
          landmarks = landmarks_to_np(landmarks)

          if self.verbose == 1:
            visualize_face(np.array(Xaug),x1,y1,w,h,landmarks)

          skip_image = False

          if skip_image==False:
            Xaug = Xaug.astype(dtype='float32')
            X_batch.append(Xaug)
            
            yemotion_batch.append(yemotion_gen[i])
            dict_labels_collected[np.where(yemotion_gen[i]==1)[0][0]]+=1
            if dict_labels_collected[np.where(yemotion_gen[i]==1)[0][0]]==((batch_size/7)+1):
              got2=True
            
          
          else:
            skip_cnt += 1 
          
          if len(X_batch)>=batch_size:
            break

        N+=batch_size - skip_cnt

      X_batch, yemotion_batch= np.array(X_batch), np.array(yemotion_batch)
      X_batch =self.preprocess_input(X_batch) 

      yield (X_batch,yemotion_batch)

### Preprocessing functions

In [18]:
def preprocess_input_v1(x):
  x /= 255.0
  x -= 0.5
  x *= 2.0
  return x

### Create data augmentors

In [19]:
aug = ImageDataGenerator(
		rotation_range=15,
		width_shift_range=0.05,
		height_shift_range=0.05,
		shear_range=0.05, 
		horizontal_flip=True)

## Create Test dataset for landmarks

In [20]:
def create_test_dataset(X, landmarks_predictor, preprocess_input = None, start_idx = 0, end_idx = 68, verbose=0):
  
  Xlr, ylr = [], []
  for i in range(X.shape[0]):
    x1, y1, x2, y2,w,h = 0,0,160,160,160,160
    face_rect = dlib.rectangle(x1, y1, x2, y2)
    landmarks = landmarks_predictor(X[i], face_rect) 
    landmarks = landmarks_to_np(landmarks)
    landmarks = landmarks.astype(dtype='float32')

    landmarks = landmarks.reshape((-1,))

    if verbose==1:
      visualize_face(np.array(X[i]),x1,y1,int(w),int(h),landmarks[2*start_idx:2*end_idx])

    landmarks = landmarks/IMG_WIDTH
    ylr.append(landmarks[2*start_idx:2*end_idx])

  ylr = np.array(ylr)
  Xlr = preprocess_input(X.astype(np.float32))

  return (Xlr, ylr)

## Create model functions

In [21]:
def feature_extractor_base():
    input_img = Input(shape=(IMG_WIDTH,IMG_HEIGHT,3))

    x = Conv2D(16,(3,3),padding="same", kernel_initializer='he_uniform')(input_img)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = Conv2D(16,(3,3),padding="same", kernel_initializer='he_uniform')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = MaxPooling2D((3,3),strides=(2,2),padding="same")(x)
    
    x = Conv2D(32,(3,3),padding="same", kernel_initializer='he_uniform')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = Conv2D(32,(3,3),padding="same", kernel_initializer='he_uniform')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = MaxPooling2D((3,3),strides=(2,2),padding="same")(x)
    
    x = Conv2D(64,(3,3),padding="same", kernel_initializer='he_uniform')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = Conv2D(64,(3,3),padding="same", kernel_initializer='he_uniform')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = MaxPooling2D((3,3),strides=(2,2),padding="same")(x)

    x = Conv2D(128,(3,3),padding="same", kernel_initializer='he_uniform')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = Conv2D(128,(3,3),padding="same", kernel_initializer='he_uniform')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = MaxPooling2D((3,3),strides=(2,2),padding="same")(x)

    output_vector = GlobalAvgPool2D()(x)

    return Model(inputs = input_img, outputs=output_vector)


### Landmarks Model

In [22]:
def create_landmarks_model(output_size=136):
  feature_extractor_base_model = feature_extractor_base()
  feature_extractor_base_model.trainable = True
  
  input_img = Input(shape=(IMG_WIDTH,IMG_HEIGHT,3))
  feature_vector = feature_extractor_base_model(inputs=input_img)
  dense_1 = Dense(units=128,kernel_initializer='glorot_uniform')(feature_vector)
  lr_output = Dense(units=output_size,kernel_initializer='glorot_uniform',name="lr_output")(dense_1)

  return Model(inputs=input_img, outputs=lr_output)

### Emotion model

In [23]:
def emotion_classifier_model(feature_extractor_base_model, num_classes = 7):

  input_img = Input(shape=(IMG_WIDTH,IMG_HEIGHT,3))
  feature_vector = feature_extractor_base_model(input_img, training = False)
  dense_1 = Dense(units=128,kernel_initializer='glorot_uniform')(feature_vector)
  dense_2 = Dense(units=128,kernel_initializer='glorot_uniform')(dense_1)
  emotion_output = Dense(units=num_classes, activation='softmax',name="emotion_output",kernel_initializer='glorot_uniform')(dense_2)

  return Model(inputs=input_img, outputs=emotion_output)


In [24]:
def create_emotion_classifier_model(landmarks_checkpoint = ""):
  
  ## set up feature extractor model
  feature_extractor_base_model = feature_extractor_base()

  landmarks_model = load_model(os.path.join(CHECKPOINT_PATH,landmarks_checkpoint))

  feature_extractor_base_model_layers = feature_extractor_base_model.layers
  feature_extractor_landmarks_submodel_layers = landmarks_model.layers[1].layers
  
  for i in range(len(feature_extractor_landmarks_submodel_layers)):
    feature_extractor_base_model_layers[i].set_weights(feature_extractor_landmarks_submodel_layers[i].get_weights())

  feature_extractor_base_model.trainable=False
  emotion_model = emotion_classifier_model(feature_extractor_base_model)

  emotion_model.summary()

  return emotion_model

# Training

#### LR training

In [25]:
lr_model_name = "full"

In [26]:
landmarks_dict = {
    "full":{
        "start_idx":0,
        "end_idx":68,
        "output_size":136
    },
    "mouth":{
        "start_idx":48,
        "end_idx":68,
        "output_size":40
    },
    "eyes":{
        "start_idx":36,
        "end_idx":48,
        "output_size":24
    },
    "nose":{
        "start_idx":27,
        "end_idx":36,
        "output_size":18
    },
    "eyebrows":{
        "start_idx":17,
        "end_idx":27,
        "output_size":20
    },
    "jaw":{
        "start_idx":0,
        "end_idx":17,
        "output_size":34
    }
}

In [27]:
learning_rate_base_reg_lr = 0.01
batch_size_reg_lr = 32
epochs_reg_lr = 10
steps_per_epoch_reg_lr = 30
checkpoint_name_reg_lr = f'checkpoint_landmarks_model_{lr_model_name}_test1'
filename_reg_lr = f"logs_dict_lr_{lr_model_name}_test1.json"
preprocess_input_reg_lr = preprocess_input_v1

In [28]:
aug_lr_train = ImageDataGeneratorLandmarksOnly(aug,ert_landmarks_predictor, gen_batch_size=1, preprocess_input = preprocess_input_reg_lr, start_idx = landmarks_dict[lr_model_name]["start_idx"], end_idx = landmarks_dict[lr_model_name]["end_idx"],verbose=0)

<class 'function'>


In [29]:
## params dictionary
params_reg_lr = {
  "output_size":landmarks_dict[lr_model_name]["output_size"],
  "start_idx":landmarks_dict[lr_model_name]["start_idx"],
  "end_idx":landmarks_dict[lr_model_name]["end_idx"],
  "learning_rate_base": learning_rate_base_reg_lr,
  "batch_size":batch_size_reg_lr,
  "epochs":epochs_reg_lr,
  "steps_per_epoch":steps_per_epoch_reg_lr,
  "checkpoint":checkpoint_name_reg_lr,
  "filename":filename_reg_lr,
  "model_name":lr_model_name,
  "preprocess_input":preprocess_input_reg_lr,
  "aug_lr":aug_lr_train,
  "landmarks_predictor":ert_landmarks_predictor
}

In [49]:
def reg_training_lr(X_train, y_train, X_valid, y_valid, params):
  
  print(f"------- Creating/Loading model-------")
  # use your own base checkpoint path refering to google drive directory where model will be saved
  checkpoint_full_path = os.path.join(CHECKPOINT_PATH,params["checkpoint"])
  recovery_filepath = os.path.join(RECOVERY_DIR, params["filename"])

  ## optional load
  try:
    landmarks_model = load_model(os.path.join(CHECKPOINT_PATH,params["checkpoint"]))
    print("loaded model")
    landmarks_model.summary()
  
  except:
    ## create emotion model
    landmarks_model = create_landmarks_model(params["output_size"])
    print("new model")  
    landmarks_model.summary()
    landmarks_model.compile(loss={"lr_output":"mae"},optimizer=Adam(learning_rate=params["learning_rate_base"]))


  #### Training landmarks model
  monitor_training_callback = MonitorSingleTraining(filepath = recovery_filepath , model_name = params["model_name"], epochs=params["epochs"])
  callbacks = [
    ModelCheckpoint(filepath=checkpoint_full_path,
      save_weights_only=False,
      monitor='val_loss', 
      mode='min',
      save_best_only=True,
      verbose=1),
      monitor_training_callback
  ]

  epochs_left = monitor_training_callback.get_epochs_left()

  ## compile emotion model
  landmarks_model.compile(loss={"lr_output":"mae"},optimizer=Adam(lr=params["learning_rate_base"]))

  ## Start training
  print(f"------- Training model -------")

  ## fit data
  history = landmarks_model.fit(
                      x = params["aug_lr"].flow(X_train, y_train, 
                      batch_size = params["batch_size"]),
                      epochs= epochs_left, # do not change epochs here, you can set the value in params["epochs"]
                      validation_data = (X_valid, y_valid),
                      steps_per_epoch=params["steps_per_epoch"],
                      callbacks = callbacks,
                      verbose = 1)
  
  return history

In [50]:
Xlr_valid, ylr_valid = create_test_dataset(np.array(valid_face_crop), params_reg_lr["landmarks_predictor"], preprocess_input=params_reg_lr["preprocess_input"], start_idx = params_reg_lr["start_idx"], end_idx = params_reg_lr["end_idx"])

In [None]:
history_lr = reg_training_lr(train_face_crop, y_train, Xlr_valid, ylr_valid, params=params_reg_lr)

#### Emotion training

In [52]:
lr_model_name = "full"
model_name = "lr"

In [61]:
learning_rate_base_reg_emotion = 0.01
batch_size_reg_emotion = 32
epochs_reg_emotion = 10
steps_per_epoch_reg_emotion = 30
checkpoint_name_reg_emotion = f'checkpoint_{model_name}_emotion_model_{lr_model_name}_test1'
checkpoint_name_reg_lr = f'checkpoint_landmarks_model_{lr_model_name}_test1'
filename_reg_emotion = f"logs_dict_{model_name}_emotion_{lr_model_name}_test1.json"
preprocess_input_reg_emotion = preprocess_input_v1

In [62]:
aug_emotion_train = ImageDataGeneratorEmotionOnly(aug,ert_landmarks_predictor, gen_batch_size=1, verbose=0, preprocess_input=preprocess_input_reg_emotion)

In [63]:
## params dictionary
params_reg_emotion = {
  "learning_rate_base": learning_rate_base_reg_emotion,
  "batch_size":batch_size_reg_emotion,
  "epochs":epochs_reg_emotion,
  "steps_per_epoch":steps_per_epoch_reg_emotion,
  "checkpoint":checkpoint_name_reg_emotion,
  "checkpoint_lr":checkpoint_name_reg_lr,
  "filename":filename_reg_emotion,
  "model_name":model_name,
  "preprocess_input":preprocess_input_reg_emotion,
  "aug_emotion":aug_emotion_train
}

In [64]:
def reg_training_emotion(X_train, y_train, X_valid, y_valid, params):
  
  print(f"------- Creating/Loading model-------")

  # use your own base checkpoint path refering to google drive directory where model will be saved
  checkpoint_full_path = os.path.join(CHECKPOINT_PATH,params["checkpoint"])
  checkpoint_lr_full_path = os.path.join(CHECKPOINT_PATH,params["checkpoint_lr"])
  recovery_filepath = os.path.join(RECOVERY_DIR, params["filename"])

  ## optional load
  try:
    emotion_model = load_model(checkpoint_full_path)
    print("loaded model")
    emotion_model.summary()

  except:
    ## create emotion model
    emotion_model = create_emotion_classifier_model(landmarks_checkpoint=checkpoint_lr_full_path)
    print("new model")
    emotion_model.summary()
    emotion_model.compile(loss='categorical_crossentropy',optimizer=Adam(learning_rate=params["learning_rate_base"]), metrics=['accuracy'])


  monitor_training_callback = MonitorSingleTraining(filepath = recovery_filepath , model_name = params["model_name"], epochs=params["epochs"])
  callbacks = [
    ModelCheckpoint(filepath=checkpoint_full_path,
      save_weights_only=False,
      monitor='val_loss', 
      mode='min',
      save_best_only=True,
      verbose=1),
      monitor_training_callback
  ]

  epochs_left = monitor_training_callback.get_epochs_left()

  ## Start training
  print(f"------- Training model -------")

  ## fit data
  history = emotion_model.fit(
                      x=params["aug_emotion"].flow(X_train, y_train, batch_size=params["batch_size"]), 
                      epochs= epochs_left, # do not change epochs here, you can set the value in params["epochs"]
                      validation_data = (X_valid, y_valid),
                      steps_per_epoch=params["steps_per_epoch"],
                      callbacks = callbacks,
                      verbose = 1)
  
  emotion_model=load_model(checkpoint_full_path)

  ## generate generalization metrics
  ## load model
  scores = emotion_model.evaluate(X_valid,  y_valid, verbose=0)
  print(f'Score for model {params["model_name"]}: {emotion_model.metrics_names[0]} of {scores[0]}; {emotion_model.metrics_names[1]} of {scores[1]*100}%')
  return history


In [65]:
X_valid = params_reg_emotion["preprocess_input"](np.array(valid_face_crop).astype(np.float32))

In [None]:
history_emotion = reg_training_emotion(train_face_crop, y_train, X_valid, y_valid, params=params_reg_emotion)

# Workspace for testing ...