# Imports

In [2]:
import numpy as np
import pandas as pd
import os
import argparse
import errno
import scipy.misc
import dlib
import cv2

from skimage.feature import hog

import tensorflow as tf 
from tflearn.layers.core import input_data, dropout, fully_connected
from tflearn.layers.conv import conv_2d, max_pool_2d
from tflearn.layers.merge_ops import merge_outputs, merge
from tflearn.layers.normalization import local_response_normalization, batch_normalization
from tflearn.layers.estimator import regression 
from tflearn.optimizers import Momentum, Adam

from tflearn import DNN
import time

import imutils

# Transform datas

In [4]:
path = '/Users/maelfabien/filrouge_pole_emploi/Common/'
local_path = '/Users/maelfabien/Desktop/LocalDB/Videos/'

In [5]:
#Reading the dataset
data = pd.read_csv(local_path + 'fer2013.csv')

In [9]:
# loading Dlib predictor and preparing arrays:
predictor = dlib.shape_predictor('/Users/maelfabien/Desktop/LocalDB/Videos/models/shape_predictor_68_face_landmarks.dat')
new_labels = [0, 1, 2, 3, 4, 5, 6]
nb_images_per_label = list(np.zeros(len(new_labels), 'uint8'))

image_height = 48
image_width = 48
window_size = 24
window_step = 6

def get_landmarks(image, rects):
    if len(rects) > 1:
        raise BaseException("TooManyFaces")
    if len(rects) == 0:
        raise BaseException("NoFaces")
    return np.matrix([[p.x, p.y] for p in predictor(image, rects[0]).parts()])

def get_new_label(label, one_hot_encoding=False):
    if one_hot_encoding:
        new_label = new_labels.index(label)
        label = list(np.zeros(len(new_labels), 'uint8'))
        label[new_label] = 1
        return label
    else:
        return new_labels.index(label)

def sliding_hog_windows(image):
    hog_windows = []
    for y in range(0, image_height, window_step):
        for x in range(0, image_width, window_step):
            window = image[y:y+window_size, x:x+window_size]
            hog_windows.extend(hog(window, orientations=8, pixels_per_cell=(8, 8),
                                            cells_per_block=(1, 1), visualise=False))
    return hog_windows

In [12]:
for category in data['Usage'].unique():
    print( "converting set: " + category + "...")
    # create folder
    if not os.path.exists(category):
        try:
            os.makedirs(local_path + "/model3/" + category)
        except :
            pass
    
    # get samples and labels of the actual category
    category_data = data[data['Usage'] == category]
    samples = category_data['pixels'].values
    labels = category_data['emotion'].values
    
    # get images and extract features
    images = []
    labels_list = []
    landmarks = []
    hog_features = []
    hog_images = []
    
    for i in range(len(samples)):
        image = np.fromstring(samples[i], dtype=int, sep=" ").reshape((image_height, image_width))
        images.append(image)
        #scipy.misc.imsave(category + '/' + str(i) + '.jpg', image)
        
        features = sliding_hog_windows(image)
        f, hog_image = hog(image, orientations=8, pixels_per_cell=(16, 16),cells_per_block=(1, 1), visualise=True)
        hog_features.append(features)
        hog_images.append(hog_image)
        scipy.misc.imsave('temp.jpg', image)
        image2 = cv2.imread('temp.jpg')
            
        face_rects = [dlib.rectangle(left=1, top=1, right=47, bottom=47)]
        face_landmarks = get_landmarks(image2, face_rects)
            
        landmarks.append(face_landmarks)            
        labels_list.append(get_new_label(labels[i], one_hot_encoding=True))
            
        nb_images_per_label[get_new_label(labels[i])] += 1

    np.save(local_path + '/model3/' + category + '/images.npy', images)
    np.save(local_path + '/model3/' + category + '/labels.npy', labels_list)
    np.save(local_path + '/model3/' + category + '/landmarks.npy', landmarks)
    np.save(local_path + '/model3/' + category + '/hog_features.npy', hog_features)
    np.save(local_path + '/model3/' + category + '/hog_images.npy', hog_images)

converting set: Training...


/anaconda3/lib/python3.6/site-packages/skimage/feature/_hog.py:150: skimage_deprecation: Default value of `block_norm`==`L1` is deprecated and will be changed to `L2-Hys` in v0.15. To supress this message specify explicitly the normalization method.
  skimage_deprecation)
/anaconda3/lib/python3.6/site-packages/skimage/feature/_hog.py:248: skimage_deprecation: Argument `visualise` is deprecated and will be changed to `visualize` in v0.16
  'be changed to `visualize` in v0.16', skimage_deprecation)
`imsave` is deprecated in SciPy 1.0.0, and will be removed in 1.2.0.
Use ``imageio.imwrite`` instead.


converting set: PublicTest...
converting set: PrivateTest...


# Load datas

In [19]:
input_size = 48
output_size = 7

In [18]:
data_dict = dict()
validation_dict = dict()
test_dict = dict()

# load train set
data_dict['X'] = np.load('/Users/maelfabien/Desktop/LocalDB/Videos/model3/Training/images.npy')
data_dict['X'] = data_dict['X'].reshape([-1, input_size, input_size, 1])
data_dict['X2'] = np.load('/Users/maelfabien/Desktop/LocalDB/Videos/model3/Training/landmarks.npy')
data_dict['X2'] = np.array([x.flatten() for x in data_dict['X2']])
data_dict['X2'] = np.concatenate((data_dict['X2'], np.load('/Users/maelfabien/Desktop/LocalDB/Videos/model3/Training/hog_features.npy')), axis=1)
data_dict['Y'] = np.load('/Users/maelfabien/Desktop/LocalDB/Videos/model3/Training/labels.npy')

test_dict['X'] = np.load('/Users/maelfabien/Desktop/LocalDB/Videos/model3/PublicTest/images.npy')
test_dict['X'] = test_dict['X'].reshape([-1, input_size, input_size, 1])
test_dict['X2'] = np.load('/Users/maelfabien/Desktop/LocalDB/Videos/model3/PublicTest/landmarks.npy')
test_dict['X2'] = np.array([x.flatten() for x in test_dict['X2']])
test_dict['X2'] = np.concatenate((test_dict['X2'], np.load('/Users/maelfabien/Desktop/LocalDB/Videos/model3/PublicTest/hog_features.npy')), axis=1)
test_dict['Y'] = np.load('/Users/maelfabien/Desktop/LocalDB/Videos/model3/PublicTest/labels.npy')
  
validation_dict['X'] = np.load('/Users/maelfabien/Desktop/LocalDB/Videos/model3/PrivateTest/images.npy')
validation_dict['X'] = validation_dict['X'].reshape([-1, input_size, input_size, 1])
validation_dict['X2'] = np.load('/Users/maelfabien/Desktop/LocalDB/Videos/model3/PrivateTest/landmarks.npy')
validation_dict['X2'] = np.array([x.flatten() for x in validation_dict['X2']])
validation_dict['X2'] = np.concatenate((validation_dict['X2'], np.load('/Users/maelfabien/Desktop/LocalDB/Videos/model3/PrivateTest/hog_features.npy')), axis=1)
validation_dict['Y'] = np.load('/Users/maelfabien/Desktop/LocalDB/Videos/model3/PrivateTest/labels.npy')       

# Models

In [20]:
def build_modelB() :
    
    images_network = input_data(shape=[None, input_size, input_size, 1], name='input1')
    images_network = conv_2d(images_network, 64, 3, activation='relu')
    images_network = batch_normalization(images_network)
    
    images_network = max_pool_2d(images_network, 3, strides = 2)
    images_network = conv_2d(images_network, 128, 3, activation='relu')
    images_network = batch_normalization(images_network)
    
    images_network = max_pool_2d(images_network, 3, strides = 2)
    images_network = conv_2d(images_network, 256, 3, activation='relu')
    images_network = batch_normalization(images_network)
    
    images_network = max_pool_2d(images_network, 3, strides = 2)
    images_network = dropout(images_network, keep_prob=True)
    
    images_network = fully_connected(images_network, 4096, activation='relu')
    images_network = dropout(images_network, keep_prob=0.956)
    images_network = fully_connected(images_network, 1024, activation='relu')
    
    landmarks_network = input_data(shape=[None, 2728], name='input2')
    landmarks_network = fully_connected(landmarks_network, 1024, activation='relu')
    landmarks_network = batch_normalization(landmarks_network)
    
    images_network = fully_connected(images_network, 128, activation='relu')
    network = merge([images_network, landmarks_network], 'concat', axis=1)
    
    network = fully_connected(network, output_size, activation='softmax')

    optimizer = Momentum(learning_rate=0.016, momentum=0.95,lr_decay=0.864, decay_step=50)
    network = regression(network, optimizer=optimizer, loss='categorical_crossentropy', learning_rate=0.016, name='output')

    return network

# Train

In [None]:
with tf.Graph().as_default():
    data = data_dict
    validation = validation_dict
    test = test_dict
    print( "building model...")
    network = build_modelB()
    model = DNN(network, tensorboard_dir="/Users/maelfabien/Desktop/LocalDB/Videos/model3/logs", tensorboard_verbose=0, 
                checkpoint_path="/Users/maelfabien/Desktop/LocalDB/Videos/model3/checkpoint/chk",max_checkpoints=1)

    start_time = time.time()
    model.fit([data['X'], data['X2']], data['Y'],
                validation_set=([validation['X'], validation['X2']], validation['Y']),
                snapshot_step=500,show_metric=True,batch_size=128,n_epoch=13)
    validation['X2'] = None
    training_time = time.time() - start_time
    print( "training time = {0:.1f} sec".format(training_time))

    print( "saving model...")
    model.save("/Users/maelfabien/Desktop/LocalDB/Videos/model3/models/saved_model.bin")

    print( "evaluating...")
    validation_accuracy = evaluate(model, validation['X'], validation['X2'], validation['Y'])
    print( "  - validation accuracy = {0:.1f}".format(validation_accuracy*100))
    print(validation_accuracy)

def evaluate(model, X, X2, Y):
    accuracy = model.evaluate([X, X2], Y)
    return accuracy[0]

Training Step: 517  | total loss: [1m[32m1.22977[0m[0m | time: 306.509s
[2K| Momentum | epoch: 003 | loss: 1.22977 - acc: 0.5357 -- iter: 08576/28709


# Predict

In [None]:
window_size = 24
window_step = 6

def load_model():
    model = None
    with tf.Graph().as_default():
        print( "loading pretrained model...")
        network = build_model()
        model = DNN(network)
        model.load("/Users/maelfabien/Desktop/LocalDB/Videos/model3/models")
    return model

def get_landmarks(image, rects, predictor):
    # this function have been copied from http://bit.ly/2cj7Fpq
    if len(rects) > 1:
        break
        #raise TooManyFaces
    if len(rects) == 0:
        break
        #raise NoFaces
    return np.matrix([[p.x, p.y] for p in predictor(image, rects[0]).parts()])

def sliding_hog_windows(image):
    hog_windows = []
    for y in range(0, input_size, window_step):
        for x in range(0, input_size, window_step):
            window = image[y:y+window_size, x:x+window_size]
            hog_windows.extend(hog(window, orientations=8, pixels_per_cell=(8, 8),
                                            cells_per_block=(1, 1), visualise=False))
    return hog_windows

def predict(image, model, shape_predictor=None):
    
    # get landmarks
    face_rects = [dlib.rectangle(left=0, top=0, right=input_size, bottom=input_size)]
    face_landmarks = np.array([get_landmarks(image, face_rects, shape_predictor)])
    features = face_landmarks
    
    hog_features = sliding_hog_windows(image)
    hog_features = np.asarray(hog_features)
    
    face_landmarks = face_landmarks.flatten()
    features = np.concatenate((face_landmarks, hog_features))
     
    tensor_image = image.reshape([-1, input_size, input_size, 1])
    predicted_label = model.predict([tensor_image, features.reshape((1, -1))])
    
    return get_emotion(predicted_label[0])


def get_emotion(label):
    print( "- Angry: {0:.1f}%\n- Happy: {1:.1f}%\n- Sad: {2:.1f}%\n- Surprise: {3:.1f}%\n- Neutral: {4:.1f}%".format(
                label[0]*100, label[1]*100, label[2]*100, label[3]*100, label[4]*100))
    label = label.tolist()
    emotions = ["Angry", "Disgust", "Fear", "Happy", "Sad", "Surprise", "Neutral"]
    return emotions[label.index(max(label))], max(label)

# parse arg to see if we need to launch training now or not yet
model = load_model()
hanks = '/Users/maelfabien/filrouge_pole_emploi/Common/test_samples/tom-hanks.jpg'

image = cv2.imread(hanks, 0)
start_time = time.time()

emotion, confidence = predict(image, model, predictor)
total_time = time.time() - start_time

print( "Prediction: {0} (confidence: {1:.1f}%)".format(emotion, confidence*100))
print( "time: {0:.1f} sec".format(total_time))

# Predict from video

In [None]:
BOX_COLOR = (0, 255, 0)
TEXT_COLOR = (0, 255, 0)

# initializebevideo stream
video_stream = cv2.VideoCapture(0)
face_detector = cv2.CascadeClassifier('/Users/maelfabien/Desktop/LocalDB/Videos/models/lbpcascade_frontalface.xml')

shape_predictor = dlib.shape_predictor('/Users/maelfabien/Desktop/LocalDB/Videos/models/shape_predictor_68_face_landmarks.dat')
        
model = load_model()
last_predicted_time = 0
last_predicted_confidence = 0
last_predicted_emotion = ""

def predict_emotion(image):
    image.resize([input_size, input_size], refcheck=False)
    emotion, confidence = predict(image, model, shape_predictor)
    return emotion, confidence

def recognize_emotions():
    failedFramesCount = 0
    detected_faces = []
    time_last_sent = 0
    
    while True:
        grabbed, frame = video_stream.read()

        if grabbed:
        # detection phase
            frame = imutils.resize(frame, width=600)
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

            # detect faces
            faces = face_detector.detectMultiScale(gray, 1.3, 5)
            for (x,y,w,h) in faces:
                if w < 30 and h<30: # skip the small faces (probably false detections)
                    continue

                # bounding box
                cv2.rectangle(frame, (x, y), (x + w, y + h), BOX_COLOR, 2)

                # try to recognize emotion
                face = gray[y:y+h, x:x+w].copy()
                
                if time.time() - self.last_predicted_time < 0.5 :
                    label = last_predicted_emotion
                    confidence = self.last_predicted_confidence
                else:
                    label, confidence = predict_emotion(face)
                    last_predicted_emotion = label
                    last_predicted_confidence = confidence
                    last_predicted_time = time.time()
                    
                # display and send message by socket
                text = "{0} ({1:.1f}%)".format(label, confidence*100)
                if label is not None:
                    cv2.putText(frame, text, (x - 20, y - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.7, TEXT_COLOR, 2)

            # display images
            cv2.imshow("Facial Expression Recognition", frame)

            key = cv2.waitKey(1) & 0xFF
            if key == ord("q"):
                break            
        else:
            failedFramesCount += 1
            if failedFramesCount > 10:
                print( "can't grab frames")
                break

    self.video_stream.release()
    cv2.destroyAllWindows()

r = EmotionRecognizer()
r.recognize_emotions()