In [1]:
import os
import cntk as ct

from src.ferplus import FERPlusReader, FERPlusParameters
from src.models import *

In [2]:
def cost_func(training_mode, prediction, target):
    '''
    We use cross entropy in most mode, except for the multi-label mode, which require treating
    multiple labels exactly the same.
    '''
    train_loss = None
    if training_mode == 'majority' or training_mode == 'probability' or training_mode == 'crossentropy': 
        # Cross Entropy.
        train_loss = ct.negate(ct.reduce_sum(ct.element_times(target, ct.log(prediction)), axis=-1))
    elif training_mode == 'multi_target':
        train_loss = ct.negate(ct.log(ct.reduce_max(ct.element_times(target, prediction), axis=-1)))

    return train_loss

In [3]:
emotion_table = {'neutral'  : 0, 
                 'happiness': 1, 
                 'surprise' : 2, 
                 'sadness'  : 3, 
                 'anger'    : 4, 
                 'disgust'  : 5, 
                 'fear'     : 6, 
                 'contempt' : 7}

emotion_labels = sorted(emotion_table, key=emotion_table.get)

num_classes = len(emotion_table)

In [4]:
## parametros
model_name='VGG13'

base_folder = 'data'
test_folders  = ['FER2013Test']

# training_mode and best_epoch
tmbe = [("majority",80), ("probability",78), ("crossentropy",98), ("multi_target",89)]

In [15]:
for training_mode, best_epoch in tmbe:
    ## folders models
    output_model_path   = os.path.join(base_folder, R'models')
    output_model_folder = os.path.join(output_model_path, model_name + '_' + training_mode)

    if not os.path.exists(output_model_folder):
        os.makedirs(output_model_folder)
        
    ## folders tests
    output_test_path   = os.path.join(base_folder, R'tests')
    output_test_folder = os.path.join(output_test_path, model_name + '_' + training_mode)

    if not os.path.exists(output_test_folder):
        os.makedirs(output_test_folder)
        
    ## leitura do modelo
    model = build_model(num_classes, model_name)
    
    ## set the input variables.
    input_var = ct.input((1, model.input_height, model.input_width), np.float32)
    label_var = ct.input((num_classes), np.float32)
    
    # params and reader FERPlus (in original work, training_mode is static set to "majority" in test_val_params. We kept this until understand why.)
    # test_and_val_params = FERPlusParameters(num_classes, model.input_height, model.input_width, training_mode, determinisitc=True, shuffle=False)
    test_and_val_params = FERPlusParameters(num_classes, model.input_height, model.input_width, "majority", determinisitc=True, shuffle=False)
    test_data_reader = FERPlusReader.create(base_folder, test_folders, "label.csv", test_and_val_params)
    
    epoch_size = test_data_reader.size()
    minibatch_size = 32
    
    # get the probalistic output of the model.
    z    = model.model(input_var)
    pred = ct.softmax(z)
    
    # Training config
    lr_per_minibatch       = [model.learning_rate]*20 + [model.learning_rate / 2.0]*20 + [model.learning_rate / 10.0]
    mm_time_constant       = -minibatch_size/np.log(0.9)
    lr_schedule            = ct.learning_rate_schedule(lr_per_minibatch, unit=ct.UnitType.minibatch, epoch_size=epoch_size)
    mm_schedule            = ct.momentum_as_time_constant_schedule(mm_time_constant)

    # loss and error cost
    train_loss = cost_func(training_mode, pred, label_var)
    pe         = ct.classification_error(z, label_var)    
    
    # construct the trainer
    learner = ct.momentum_sgd(z.parameters, lr_schedule, mm_schedule)
    trainer = ct.Trainer(z, (train_loss, pe), learner)
    
    # restore trained model
    print(os.path.join(output_model_folder, "model_{}".format(best_epoch)))
    trainer.restore_from_checkpoint(os.path.join(output_model_folder, "model_{}".format(best_epoch)))
    print('trainer.total_number_of_samples_seen: ', trainer.total_number_of_samples_seen)
    
    print('test_data_reader.size: ',test_data_reader.size())
    # init predictions
    y_true_batch = []
    y_pred_batch = []

    while test_data_reader.has_more():
        images, labels, current_batch_size = test_data_reader.next_minibatch(minibatch_size)
        y_true_batch.append(labels)
        y_pred_batch.append(trainer.model.eval({input_var: images}))
        
    # concatene all minibatchs
    y_true = np.concatenate(y_true_batch, axis=0)
    y_pred = np.concatenate(y_pred_batch, axis=0)
    
    np.set_printoptions(precision=2)
    print('some example of data; probabilistic; deterministic')
    print('y_true[2]: ',y_true[2],';',y_true[2].argmax())
    print('y_pred[2]: ',y_pred[2],';',y_pred[2].argmax())

    np.savez_compressed(os.path.join(output_test_folder, "test_{}.npz".format(best_epoch)), y_true=y_true, y_pred=y_pred)
    print('save y_true and y_pred in ', os.path.join(output_test_folder, "test_{}.npz".format(best_epoch)))
    print('\n\n')

data/models/VGG13_majority/model_80
trainer.total_number_of_samples_seen:  2028645
test_data_reader.size:  3137
some example of data; probabilistic; deterministic
y_true[2]:  [ 0.  0.  0.  0.  1.  0.  0.  0.] ; 4
y_pred[2]:  [ -6.02  -2.97   0.87  -4.37  12.58   4.58  -0.14  -2.42] ; 4
save y_true and y_pred in  data/tests/VGG13_majority/test_80.npz



data/models/VGG13_probability/model_78
trainer.total_number_of_samples_seen:  2207813
test_data_reader.size:  3137
some example of data; probabilistic; deterministic
y_true[2]:  [ 0.  0.  0.  0.  1.  0.  0.  0.] ; 4
y_pred[2]:  [-2.66 -0.12  2.11 -2.92  7.01 -0.15  0.6  -3.82] ; 4
save y_true and y_pred in  data/tests/VGG13_probability/test_78.npz



data/models/VGG13_crossentropy/model_98
trainer.total_number_of_samples_seen:  2766753
test_data_reader.size:  3137
some example of data; probabilistic; deterministic
y_true[2]:  [ 0.  0.  0.  0.  1.  0.  0.  0.] ; 4
y_pred[2]:  [-2.72 -1.01  1.98 -3.4   8.08  0.79  0.35 -3.75] ; 4
save y_tr