    !wget https://www.dropbox.com/s/23tutsg2m9o78bp/fer2013.zip
    !unzip -n fer2013.zip
    !rm fer2013.zip

In [None]:
import os
os.environ['TF_GPU_ALLOCATOR'] = 'cuda_malloc_async'

In [None]:
input_default_json_conf_file='cnn_face_emotion_training_default.json';

# Bibliotecas externas

In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
import datetime
import json

import platform

# Biblioteca local

In [None]:
import sys
sys.path.append('../library');

# Uso de GPU

In [5]:
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))


Num GPUs Available:  0


# Load default DATA

In [None]:
## Load json conf json file
fd = open(os.path.join('./',input_default_json_conf_file));
DATA = json.load(fd);
fd.close()

# Variables

In [2]:
#################################################################

## Seed for the random variables
seed_number=0;

## Dataset 
dataset_base_dir    = DATA['dataset_train_base_dir'];
dataset_labels_file = DATA['dataset_train_labels_file'];

dataset_base_test_dir    = DATA['dataset_test_base_dir'];
dataset_labels_test_file = DATA['dataset_test_labels_file'];

dataset_name        = DATA['dataset_name'];

## Training hyperparameters
EPOCAS     = DATA["epochs"];
BATCH_SIZE = DATA["batch_size"];

## Model of network
## 'mobilenet_v3', 'efficientnet_b3', 'inception_v3', 'inception_resnet_v2', 'resnet_v2_50'
model_type = DATA["model_type"];

## Output
output_base_dir = DATA["output_base_dir"];

## fine tuning
fine_tuning=DATA["finetuning"];

# Parametros de entrada

In [None]:


for n in range(len(sys.argv)):
    if sys.argv[n]=='--dataset-train-dir':
        dataset_base_dir=sys.argv[n+1];
    elif sys.argv[n]=='--dataset-train-file':
        dataset_labels_file=sys.argv[n+1];
    elif sys.argv[n]=='--dataset-test-dir':
        dataset_base_test_dir=sys.argv[n+1];
    elif sys.argv[n]=='--dataset-test-file':
        dataset_labels_test_file=sys.argv[n+1];
    elif sys.argv[n]=='--dataset-name':
        dataset_name=sys.argv[n+1];
    elif sys.argv[n]=='--model':
        model_type=sys.argv[n+1];
    elif sys.argv[n]=='--epochs':
        EPOCAS=int(sys.argv[n+1]);
    elif sys.argv[n]=='--batch-size':
        BATCH_SIZE=int(sys.argv[n+1]);
    elif sys.argv[n]=='--fine-tuning':
        fine_tuning=sys.argv[n+1].lower()=='true';
    elif sys.argv[n]=='--output-dir':
        output_base_dir=sys.argv[n+1];
        
print('        dataset_base_dir:',dataset_base_dir)
print('     dataset_labels_file:',dataset_labels_file)
print('   dataset_base_test_dir:',dataset_base_test_dir)
print('dataset_labels_test_file:',dataset_labels_test_file)
print('            dataset_name:',dataset_name)
print('              model_type:',model_type)
print('                  EPOCAS:',EPOCAS)
print('              BATCH_SIZE:',BATCH_SIZE)
print('             fine_tuning:',fine_tuning)
print('         output_base_dir:',output_base_dir)


# Set seed of random variables


In [8]:
np.random.seed(seed_number)
tf.keras.utils.set_random_seed(seed_number);

# Loading data of dataset

In [None]:
# Load filenames and labels
train_val_data = pd.read_csv(os.path.join(dataset_base_dir,dataset_labels_file));
print(train_val_data)

# Setting labels
Y = train_val_data[['label']];
L=np.shape(Y)[0];

# Load test filenames and labels
test_data = pd.read_csv(os.path.join(dataset_base_test_dir,dataset_labels_test_file));
print(test_data)

# Setting the cross-validation split


In [None]:
from sklearn.model_selection import train_test_split
    

training_data, validation_data = train_test_split(train_val_data, test_size=0.2,shuffle=True, stratify=Y)



# Data augmentation configuration

In [None]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator

idg    = ImageDataGenerator(rescale=1./255,
                            rotation_range = 10,
                            width_shift_range= 0.07,
                            height_shift_range= 0.07,
                            horizontal_flip=True,
                            shear_range=1.25,
                            zoom_range = [0.75, 1.25]
                            )

idg_val = ImageDataGenerator(rescale=1./255 )

idg_test= ImageDataGenerator(rescale=1./255 )


# Creating output directory

In [None]:
if fine_tuning:
    output_dir = os.path.join(output_base_dir,dataset_name,'training_validation_holdout_fine_tuning',model_type);
else:
    output_dir = os.path.join(output_base_dir,dataset_name,'training_validation_holdout',model_type);

os.makedirs(output_dir,exist_ok = True);


# Create new model

In [None]:
import lib_model as mpp

# CREATE NEW MODEL
if   dataset_name=='fer2013':
    model, target_size = mpp.create_model(file_of_weight='',model_type=model_type,nout=7,tuning_feature_extractor=False);
elif dataset_name=='affectnet':
    model, target_size = mpp.create_model(file_of_weight='',model_type=model_type,nout=8,tuning_feature_extractor=False);
elif dataset_name=='mcfer_v1.0':
    model, target_size = mpp.create_model(file_of_weight='',model_type=model_type,nout=7,tuning_feature_extractor=False);
elif dataset_name=='ber2024-face':
    model, target_size = mpp.create_model(file_of_weight='',model_type=model_type,nout=4,tuning_feature_extractor=False);
else:
    print('Error in the dataset name.');
    exit();

model.summary()

mpp.save_model_parameters(model, os.path.join(output_dir,'parameters_stats.m'));


# Defining directories

In [None]:

train_data_generator = idg.flow_from_dataframe( training_data, 
                                                directory = dataset_base_dir,
                                                target_size=target_size,
                                                x_col = "filename", 
                                                y_col = "label",
                                                batch_size=BATCH_SIZE,
                                                class_mode="categorical",
                                                shuffle = True);

valid_data_generator  = idg_val.flow_from_dataframe(validation_data, 
                                                    directory = dataset_base_dir,
                                                    target_size=target_size,
                                                    x_col = "filename", 
                                                    y_col = "label",
                                                    batch_size=BATCH_SIZE,
                                                    class_mode="categorical",
                                                    shuffle = True);

test_data_generator  = idg_test.flow_from_dataframe(test_data, 
                                                    directory = dataset_base_test_dir,
                                                    target_size=target_size,
                                                    x_col = "filename", 
                                                    y_col = "label",
                                                    batch_size=BATCH_SIZE,
                                                    class_mode="categorical",
                                                    shuffle = False);

# Train and validation


In [None]:
import matplotlib.pyplot as plt


#STEPS_BY_EPOCHS=len(train_data_generator);

# COMPILE NEW MODEL
model.compile(loss='categorical_crossentropy',
              optimizer='adam',
              metrics=['categorical_accuracy'])

# CREATE CALLBACKS
best_model_file=os.path.join(output_dir,'model.h5');
checkpoint = tf.keras.callbacks.ModelCheckpoint(filepath=best_model_file, 
                                                save_weights_only=True,
                                                monitor='val_loss', 
                                                save_best_only=True, 
                                                verbose=1);

log_dir = os.path.join(output_dir,"logs","fit", 'coarse_tunning-'+datetime.datetime.now().strftime("%Y%m%d-%H%M%S"));
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)

# There can be other callbacks, but just showing one because it involves the model name
# This saves the best model
# FIT THE MODEL
history = model.fit(train_data_generator,
                    #steps_per_epoch=STEPS_BY_EPOCHS,
                    epochs=EPOCAS,
                    validation_data=valid_data_generator,
                    callbacks=[checkpoint,tensorboard_callback],
                    verbose=1
                   );


mpp.save_model_history( history,
                        os.path.join(output_dir,"historical.csv"),
                        show=False,
                        labels=['categorical_accuracy','loss']);


if fine_tuning:
    tf.keras.backend.clear_session();
    #import torch
    #torch.cuda.empty_cache();
    del model
    del history
    
    # CREATE NEW MODEL
    if   dataset_name=='fer2013':
        model, target_size = mpp.create_model(file_of_weight=best_model_file,model_type=model_type,nout=7,tuning_feature_extractor=True);
    elif dataset_name=='affectnet':
        model, target_size = mpp.create_model(file_of_weight=best_model_file,model_type=model_type,nout=8,tuning_feature_extractor=True);
    elif dataset_name=='mcfer_v1.0':
        model, target_size = mpp.create_model(file_of_weight=best_model_file,model_type=model_type,nout=7,tuning_feature_extractor=True);
    elif dataset_name=='ber2024-face':
        model, target_size = mpp.create_model(file_of_weight=best_model_file,model_type=model_type,nout=4,tuning_feature_extractor=True);
    else:
        print('Error in the dataset name.');
        exit();

    #necessary for these changes to take effect
    model.compile(  loss='categorical_crossentropy',
                    optimizer='adam',
                    metrics=['categorical_accuracy'])
    
    model.summary();
    
    log_dir = os.path.join(output_dir,"logs","fit",'fine_tunning-'+datetime.datetime.now().strftime("%Y%m%d-%H%M%S"));
    tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)

    history = model.fit(train_data_generator,
                        #steps_per_epoch=STEPS_BY_EPOCHS,
                        epochs=EPOCAS,
                        validation_data=valid_data_generator,
                        callbacks=[checkpoint,tensorboard_callback],
                        verbose=1
                    );


    mpp.save_model_history( history,
                            os.path.join(output_dir,"historical-fine_tuning.csv"),
                            show=False,
                            labels=['categorical_accuracy','loss']);

# Evaluate best model

In [None]:
# LOAD BEST MODEL to evaluate the performance of the model
model.load_weights(best_model_file);
data_results=dict();

# Evaluate training
results = model.evaluate(train_data_generator)
results = dict(zip(model.metrics_names,results))
print('training',results,"\n\n");
for key,value in results.items():
    data_results['train_'+key]=value;

# Evaluate validation
results = model.evaluate(valid_data_generator)
results = dict(zip(model.metrics_names,results))
print('validation',results,"\n\n");
for key,value in results.items():
    data_results['val_'+key]=value;

# Evaluate testing
results = model.evaluate(test_data_generator)
results = dict(zip(model.metrics_names,results))
print('testing',results,"\n\n");
for key,value in results.items():
    data_results['test_'+key]=value;

data_results['number_of_parameters']=mpp.get_model_parameters(model);

# final all json
with open(os.path.join(output_dir,"training_data_results.json"), 'w') as f:
    json.dump(data_results, f,indent=4);
    f.close()

# final test txt
with open(os.path.join(output_dir,"results_testing.txt"), 'w') as f: 
    for key, value in results.items(): 
        f.write('%s=%s;\n' % (key, value));
    f.close()

tf.keras.backend.clear_session()

In [None]:
from sklearn.metrics import classification_report,confusion_matrix
from sklearn.metrics import ConfusionMatrixDisplay
import matplotlib.pyplot as plt

if   dataset_name=='fer2013':
    target_names = ['angry','disgusted','fearful','happy','neutral','sad','surprised'];
elif dataset_name=='affectnet':
    target_names = ['neutral','happy','sad','surprised','fearful','disgusted','angry','unknown'];
elif dataset_name=='mcfer_v1.0':
    target_names = ['angry','disgusted','fearful','happy','neutral','sad','surprised'];
elif dataset_name=='ber2024-face':
    target_names = ['negative','neutral','pain','positive'];
else:
    print('Error in the dataset name.');
    exit();

# Predict
Y_pred = model.predict(test_data_generator,verbose=1);
y_pred = np.argmax(Y_pred, axis=1);

# Confusion matrix

CM=confusion_matrix(test_data_generator.classes, y_pred);

fname=os.path.join(output_dir,"confusion_matrix.eps");
fig, ax = plt.subplots(figsize=(8,6), dpi=100)
disp = ConfusionMatrixDisplay(confusion_matrix=CM, display_labels=target_names)
disp.plot(ax=ax,cmap=plt.cm.Blues)
plt.savefig(fname)

cm_dict=dict();
cm_dict['matrix']=CM.tolist();
cm_dict['label']=target_names;
# final all json
with open(os.path.join(output_dir,"confusion_matrix.json"), 'w') as f:
    json.dump(cm_dict, f,indent=4);
    f.close()

# Classification report
fname=os.path.join(output_dir,"classification_report.txt")
str_dat=classification_report(test_data_generator.classes, y_pred, target_names=target_names);
print(str_dat)
with open(fname, 'w') as f: 
    f.write('%s\n' % str_dat);
    f.close()

In [None]:

tmp_name='model_'+model_type+'.h5';

os.rename(best_model_file,os.path.join(output_dir,tmp_name));