# Progetto Emotion Recognition


In [None]:
import math
import cv2
import numpy as np
import dlib
import pandas as pd
import os

## Funzione "indici"
Questa funzione è stata creata per migliorare, a livello di efficienza, il calcolo delle distanze tra tutti i landmark presenti all'interno di ogni frame. In particolare, attraverso la funzione **indici**, sono stati creati tutte le combinazioni di punti dei landmark e messi all'interno di un array. 
Gli elementi dell'array saranno utili per il calcolo delle distanze dei punti dei landmark, evitando di calcolare distanze superflue. 


In [2]:
# Calcolo di tutte le combinazioni tra i landmark
def indici():
    print("Calcolo gli indici...")
    arrayIndici = []
    for i in range(1, 69):
        for j in range(i + 1, 69):
            arrayIndici.append((i, j))
    return arrayIndici

## Funzione "distanza_punti"
Questa funzione va ad effettuare il calcolo della distanza tra 2 punti. In particolare, ricevento le coordinate dei punti, viene applicata la formula e ritornato il risultato.


In [14]:
# Calcolo della distanza tra 2 punti
def distanza_punti(x1, x2, y1, y2):
    print("x1:" + str(x1) + " x2: " + str(x2) + " y1:" + str(y1) + " y2: " + str(y2))
    return math.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2)

## Funzione "crea_Distanze"
Questa funzione crea una matrice (dataset) contenente tutte le distanze tra tutti i punti dei landmarks di ogni frame. In particolare, su ogni riga, saranno memorizzate tutte le distanze appartenenti ad un determinato frame. 
Tutto questo calcolo, viene fatto prendendo i dati (relativi ad ogni punto) dal dataset che riceve in input e analizzando l'array "indici" (sempre ricevuto in input) prende le coppie da analizzare e effettua il calcolo delle distenze attraverso le funzione "distanza_punti", memorizzando ogni distanza in questa matrice.

In [15]:
# Creazione del dataset contenente le distanze tra tutte le coppie di landmark ricavati da ogni immagine
def crea_Distanze(indici, dataset):
    print("Procedo al calcolo delle distanze tra i landmark...")
    distanze = np.empty((0, 2280))
    counter = 0
    for list in dataset:
        array = []
        array = np.append(array, list[0])
        array = np.append(array, list[1])
        for indice in indici:
            print("(" + str(indice[0]) + "," + str(indice[1]) + ")")
            punto1_x = indice[0]*2
            punto2_x = indice[1]*2
            punto1_y = indice[0]*2+1
            punto2_y = indice[1]*2+1
            print("x1:" + str(punto1_x) + " y1: " + str(punto1_y) + " x2:" + str(punto2_x) + " y2: " + str(punto2_y))
            array = np.append(array, distanza_punti(float(dataset[counter, punto1_x]),
                                                    float(dataset[counter, punto2_x]), float(dataset[counter, punto1_y]),
                                                    float(dataset[counter, punto2_y])))
        counter += 1
        distanze = np.vstack([distanze, array])

    return distanze

## Funzione "create_column"
Questa funzione è stata creata semplicemente per creare le label per le colonne del dataset. In input riceve il parametro "type" e ciò è stato fatto per poter creare 2 tipi diversi di labels. In particolar modo con "type" = 1 crea le label che si riferiscono ai landmark trovati, con "type" = 0 invece, crea le label che si riferiscono al dataset delle distanze.

In [16]:
# Creazione delle labels per le colonne del dataset
def create_column(type):
    labels = []
    labels.append("Subject_ID")
    labels.append("Emotion_ID")
    if(type):
        for i in range(0, 68):
            label1 = "landmark_" + str(i) + "_x"
            label2 = "landmark_" + str(i) + "_y"
            labels.append(label1)
            labels.append(label2)

    else:
        arrayIndici = indici()
        print(len(arrayIndici))
        for i in range(0, 2278):
            label1 = "distance" + str(arrayIndici[i])
            labels.append(label1)
    return labels

## Funzione "max_timestep"
Questa funzione viene utilizzata per trovare il timestep massimo all'interno del dataset che contiene le distanze tra tutti i landmark individuati nei frame. In particolar modo, la funzione, ricevendo in input il dataframe, lo scandisce riga per riga e conta, per ogni video, quanti frame contiene. Il numero massimo di frame individuato, verrà dato in output.

In [17]:
#Ricerca Del massimo numero di Frame/Timestamp
def max_timestep(df):
    max = 0
    count = 0
    before = np.empty(0)
    before = np.append(before, df[0,0])
    before = np.append(before, df[0,1])

    for row in df:
        if(row[0] == before[0] and int(row[1])==int(before[1])):
            count = count + 1
        else:
            if(count > max):
                max = count
            count = 1

        before[0] = row[0]
        before[1] = row[1]

    return max

## Funzione "fillingData" e "zeroFillingData"
La funzioni "fillingData" e "zeroFillingData" si occupano di riempire i dati presenti nel dataset distanze. In particolare dato che la rete neurale riccorente (nel caso in questione l'LSTM) vuole in input video con lo stesso numero di frame, questa funzione andrà a riempire il dataset in modo tale da avere per ogni video lo stesso numero di frame. La funzione "fillingData" quindi, ricevendo in input il dataframe e il timestep aggiungerà per ogni video, delle righe (prima e dopo i frames) contenenti tutti 0, in modo tale da avere video con numero di frame tutti uguali. 

In [None]:
def fillingData(df,timestep):

    count = 0
    before = np.empty(0)
    before = np.append(before, df[0,0])
    before = np.append(before, df[0,1])
    appoggio = []
    dataframe = np.empty((0, 2280))


    for row in df:
        if(row[0] == before[0] and int(row[1])==int(before[1])):
            count = count + 1
            appoggio.append(row)
        else:
            dataframe = zeroFillingData(count,timestep, before, appoggio, dataframe)

            appoggio = []
            count = 1

        before[0] = row[0]
        before[1] = row[1]

    dataframe = zeroFillingData(count, timestep, before, appoggio, dataframe)

    filling = pd.DataFrame(data=dataframe, columns=create_column(0))
    return filling

def zeroFillingData(count, timestep, before, appoggio, dataframe):
    if (count < timestep):
        diff = timestep - count
        div = diff // 2
        labels = []
        labels.append(before[0])
        labels.append(before[1])
        for x in range(2, 2280):
            labels.append(0)
        for x in range(0, div):
            dataframe = np.vstack([dataframe, labels])

        dataframe = np.vstack([dataframe, appoggio])

        for x in range(0, diff - div):
            dataframe = np.vstack([dataframe, labels])
    else:
        if (count == timestep):
            dataframe = np.vstack([dataframe, appoggio])

    return dataframe

## Scansione Cartelle

In questo snippet di codice, vengono scansionate in maniera automatica tutte le cartelle presenti nel dataset, andando a prendere frame per frame ed analizzandono, estraendo attraverso il detector di punti facciali tutti i 68 landmark. Una volta estratti tutti, per ogni punto verranno calcolati x e y e memorizzati sequenzialemente all'interno della matrice. La matrice quindi conterrà, per ogni riga, tutti i 68 landmark relativi ad un determinato frame. Da questa matrice, successivamente, si è creato il dataset.

In [25]:
labels = p_u.create_column(0)
print(len(labels))
labels = p_u.create_column(1)
dataset = np.empty((0, 138))

# Scansione cartelle
for cartella, sottocartelle, files in os.walk("/Users/michelantoniopanichella/PycharmProjects/FVAB-Emotion_Recognition/CK+/cohn-kanade-images/S010/001"):
    count = 0
    for image in files:
        if image.endswith(".png"):

            subjectID = image[0:4]
            emotionID = image[5:8]
            count += 1
            print("Calcolo i landmark per l'emozione " + emotionID + " del soggetto " + subjectID + " Frame: "+ str(count) +"...")

            img = cv2.imread(cartella + '/' + image)

            # Landmark points detector
            p = '/Users/michelantoniopanichella/PycharmProjects/FVAB-Emotion_Recognition/Exercise/shape_predictor_68_face_landmarks.dat'
            detector = dlib.get_frontal_face_detector()
            predictor = dlib.shape_predictor(p)

            faces = detector(img)

            for face in faces:
                landmarks = predictor(img, face)
                landmarks_points = []
                landmarks_points.append(subjectID)
                landmarks_points.append(emotionID)
                for n in range(0, 68):
                    x = landmarks.part(n).x
                    y = landmarks.part(n).y
                    landmarks_points.append(x)
                    landmarks_points.append(y)

                dataset = np.vstack([dataset, landmarks_points])

Calcolo i landmark per l'emozione 001 del soggetto S010 Frame: 1...
Calcolo i landmark per l'emozione 001 del soggetto S010 Frame: 2...
Calcolo i landmark per l'emozione 001 del soggetto S010 Frame: 3...
Calcolo i landmark per l'emozione 001 del soggetto S010 Frame: 4...
Calcolo i landmark per l'emozione 001 del soggetto S010 Frame: 5...
Calcolo i landmark per l'emozione 001 del soggetto S010 Frame: 6...
Calcolo i landmark per l'emozione 001 del soggetto S010 Frame: 7...
Calcolo i landmark per l'emozione 001 del soggetto S010 Frame: 8...
Calcolo i landmark per l'emozione 001 del soggetto S010 Frame: 9...
Calcolo i landmark per l'emozione 001 del soggetto S010 Frame: 10...
Calcolo i landmark per l'emozione 001 del soggetto S010 Frame: 11...
Calcolo i landmark per l'emozione 001 del soggetto S010 Frame: 12...
Calcolo i landmark per l'emozione 001 del soggetto S010 Frame: 13...
Calcolo i landmark per l'emozione 001 del soggetto S010 Frame: 14...


In [26]:
##Creazione del file csv contente i landmark
df = pd.DataFrame(data=dataset, columns=labels)
df.to_csv('/Users/michelantoniopanichella/PycharmProjects/FVAB-Emotion_Recognition/Exercise/dataset.csv', index=False, header=True)

[['S010' '001' '262' ... '351' '369' '350']
 ['S010' '001' '259' ... '360' '365' '358']
 ['S010' '001' '260' ... '360' '365' '358']
 ...
 ['S010' '001' '264' ... '348' '368' '347']
 ['S010' '001' '261' ... '359' '367' '358']
 ['S010' '001' '262' ... '359' '367' '358']]


In [21]:
# Creazione del dataset contenente le distanze a partire dal DataFrame contenente le posizioni dei landmark
arrayIndici = indici()
distanze = pd.DataFrame(data=crea_Distanze(arrayIndici, dataset))
distanze.to_csv('../Exercise/distanze.csv', index=True, header=True)

/Users/michelantoniopanichella


In [None]:
import keras
from keras.layers import LSTM
from keras.layers import Dense
from keras.layers import Dropout
import pandas as pd
import numpy as np
from keras.models import Sequential
from keras.utils import to_categorical
from sklearn.model_selection import train_test_split
import PreprocessingUtilities as p_u
from sklearn.preprocessing import MinMaxScaler
from keras.layers import Masking
from keras.wrappers.scikit_learn import KerasClassifier 
from sklearn.model_selection import cross_val_score
from matplotlib import pyplot as plt
from sklearn.metrics import confusion_matrix
from sklearn.metrics import plot_confusion_matrix
import seaborn as sn

## Normalizzazione

In questo snippet di codice sottostante quello che si è andato a fare è prendere il file contenente i dati processati (ossia i dati dei video con l'aggiunta dei frame composti da soli 0) e normalizzarli portandoli dunque tutti ad una grandezza compresa tra 0 e 1). Per fare ciò però è stata fatta prima una reshape in modo tale da avere tutti i dati in una sola colonna e applicare quindi lo "scaler" attraverso le funzioni fit e trasform. Una volta fatto ciò i dati sono stati "riorganizzati" di nuovo con una reshape ed è stato creato un dataframe.

In [None]:
ds = pd.read_csv('filling_nuovissimo.csv')
columns = ds.columns[2:]
y = ds['Emotion_ID']
ds = ds.drop(['Emotion_ID', 'Subject_ID'], axis=1)
X = ds.to_numpy()
X = np.reshape(X, (X.shape[0]*X.shape[1], 1))
scaler = MinMaxScaler()
scaler.fit(X)
X = scaler.transform(X)
print(X.shape[0]/2278)
X = np.reshape(X, (X.shape[0]//2278, 2278))
ds = pd.DataFrame(X, columns=columns)
print(X)

## Features Selection

Nel seguento passo è stata fatta una feature selection. In particolar modo sono state eliminate tutte le feature meno essenziali che, nel nostro caso, sono quelle riferite alla parte esterna del volto.

In [None]:
indici = p_u.indici()
c = []
for element in indici:
    if(element[0]>=32 and element[1]>=32):  
        c.append('distance'+str(element))
ds = ds[c]
print(ds)

## Estrazione dati test e training

Nel codice qui presentato, è stato diviso l'intero dataframe in 2 sottoinsiemi: training set e test set. Dato che in questo caso abbiamo video contenti diversi frame, per poter usufruire della funzione "train_test_split", abbiamo ristrutturato il dataframe in una array a 3 dimensioni, in cui ad ogni riga corrisponde un video che contiene i diversi frame.

In [None]:
features = ds.to_numpy().shape[1]
labels = []
timestep = 71
for i in range(0, y.shape[0]//timestep):
    labels.append(y[i*timestep])
y = np.asarray(labels, dtype='int64')
# 3D reshape
X = np.reshape(ds.to_numpy(), (ds.to_numpy().shape[0] // timestep, timestep, features))
X = np.asarray(X, dtype='float64')
#train test split
print(X.shape)
train_x, test_x, train_y, test_y = train_test_split(X, y, test_size=0.2, random_state=42)
test_x, val_data_x, test_y, val_data_y = train_test_split(test_x, test_y, test_size=0.5, random_state=42)
print(train_y)
print(test_y)
print(val_data_y)
#test_y = to_categorical(test_y-1)
train_y = to_categorical(train_y-1)
val_data_y = to_categorical(val_data_y-1)
y = to_categorical(y-1)
print(y)

## Costruzione rete neurale LSTM

Nella funzione "modelCostruction" si è costruito il modello che andrà ad esaminare i nostri dati e fare eventualemtne delle previsioni su altri in input.

In [None]:
verbose, epochs, batch_size = 1, 250, 6
def modelConstruction():
  # model construction
    model = Sequential()
    model.add(Masking(mask_value=0., input_shape=(timestep, features)))
    model.add(LSTM(100))
    model.add(Dropout(0.2))
    model.add(Dense(100, activation='relu'))
  # output layer
    model.add(Dense(train_y.shape[1], activation='softmax'))
    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
    return model

Una volta addestrata la rete neurale è stata testata la validità del modello costruito attraverso la Cross Validation con 5 cicli.

In [None]:
clf = KerasClassifier(build_fn = modelConstruction, batch_size=batch_size, epochs=200) 
accuracies = cross_val_score(estimator=clf, X=X, y=y, cv=5, error_score='raise') # cv = 5
print(accuracies)
print("Mean accuracy: " + str(accuracies.mean()))
print("Variance: "+str(accuracies.std()))

In [None]:
model = modelConstruction()
history = model.fit(train_x, train_y, epochs=epochs, batch_size=batch_size, verbose=verbose, validation_data=(val_data_x, val_data_y))

In [None]:
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper left')
plt.show()

In [None]:
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper left')
plt.show()

In [None]:
pred = np.argmax(model.predict(test_x), axis=-1)+1
tot = test_y.shape
count = np.sum(pred==test_y)
print("Accuracy: "+str(count/tot))
mat = confusion_matrix(test_y, pred)
cfmt_df = pd.DataFrame(mat, range(mat.shape[0]), range(mat.shape[1]))
sn.heatmap(cfmt_df, annot=True, annot_kws={"size": 16}) 
plt.show()