In [None]:
from tensorflow.keras.utils import to_categorical
from pandas import read_csv
from numpy import array, concatenate, sum, save, asarray, around
from numpy.random import choice
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split

In [None]:
FILTRAR = 0             # -1 = no filtrar imagenes con MIN_SIZE
                         # 0 = filtrar imagenes con MIN_SIZE
if FILTRAR == 0:
  prop1 = 'filt'
else:
  prop1 = 'nofilt'

BALAN = 0                # -1 = no balancear clases con la misma cantidad de imagenes
                        # 0 = balancear clases con la misma cantidad de imagenes
if BALAN == 0:
  prop2 = 'balan'
else:
  prop2 = 'nobalan'

MIN_SIZE = 299          # 224 para vgg16 y efficientnet
                        # 299 xception

CLASSES = 3

In [None]:
from google.colab import drive
drive.mount('/gdrive')
ROOT = '/gdrive/My Drive'
TPATH = '/gdrive/My Drive/AffectNet/Manually_Annotated_Images/training.csv'
VPATH = '/gdrive/My Drive/AffectNet/Manually_Annotated_Images/validation.csv'
IMG_ROOT = '/gdrive/My Drive/AffectNet/Manually_Annotated_Images'
#AUTOMATICALLY_PATH = '/gdrive/My Drive/AffectNet/Automatically_Annotated_Images/automatically_annotated.csv'
#AUTOMATICALLY_IMG_ROOT = '/gdrive/My Drive/AffectNet/Automatically_Annotated_Images'
AFFECTNET_PATH = '/gdrive/My Drive/AffectNet'
PER_VAL = 0.20


Mounted at /gdrive


In [None]:
def balanceador_clases(x_train, x_val, y_train, y_val):
    # Trata de usar la misma cantidad de imagenes/etiquetas
    if BALAN == 0:
        filenames, labels = concatenar_train_val(x_train, x_val, y_train, y_val)
        num_labels = numero_de_imagenes_por_etiqueta(labels)
        size = around(min(num_labels), decimals=-3)
        new_idx = None
        if num_labels[0] > size:
            new_idx = choice(asarray(labels == 0).nonzero()[0], size = size)
        else:
            new_idx = asarray(labels == 0).nonzero()[0]
        for i in range(1,CLASSES):
            if num_labels[i] > size:
                new_idx = concatenate((new_idx, choice(asarray(labels == i).
                                                       nonzero()[0], 
                                                       size = size)))
            else:
                new_idx = concatenate((new_idx, asarray(labels == i).
                                       nonzero()[0]))
                
        filenames_shuffled, labels_shuffled = shuffle(filenames[new_idx], labels[new_idx])
        x_train, x_val, y_train, y_val = train_test_split(filenames_shuffled, labels_shuffled, test_size=PER_VAL)
        return x_train, x_val, y_train, y_val
    else:
        filenames, labels = concatenar_train_val(x_train, x_val, y_train, y_val)
        filenames_shuffled, labels_shuffled = shuffle(filenames, labels)
        x_train, x_val, y_train, y_val = train_test_split(filenames_shuffled, labels_shuffled, test_size=PER_VAL)
        return x_train, x_val, y_train, y_val

def leer_CSV():
    raw_train = read_csv(TPATH, header = 0, names = array(['file_path', 
                                                           'face_x',
                                                           'face_y', 
                                                           'face_width',
                                                           'face_height', 
                                                           'facial_landmarks',
                                                           'expression', 
                                                           'valence',
                                                           'arousal']))
    raw_val = read_csv(VPATH, header=None, names = array(['file_path',
                                                          'face_x',
                                                          'face_y',
                                                          'face_width',
                                                          'face_height',
                                                          'facial_landmarks',
                                                          'expression',
                                                          'valence',
                                                          'arousal']))
    train_dir = raw_train['file_path']
    train_dir = IMG_ROOT + '/' + train_dir
    val_dir = raw_val['file_path']
    val_dir = IMG_ROOT + '/' + val_dir
    train_width = raw_train['face_width']
    train_height = raw_train['face_height']
    val_width = raw_val['face_width']
    val_height = raw_val['face_height']
    labels_train = raw_train['expression']
    labels_val = raw_val['expression']
    return train_dir, val_dir, train_width, train_height, val_width, \
           val_height, labels_train, labels_val

def filtrado(train_dir, train_height, labels_train, val_dir, labels_val, 
             val_height):
    # Descartamos las imagenes que no tienen un minimo de tamaño necesario para el entrenamiento
    # Debe ir despues de leer el archivo CSV
    if FILTRAR == 0:
        x_train = train_dir[train_height >= MIN_SIZE].to_numpy(dtype=str)
        y_train = labels_train[train_height >= MIN_SIZE].to_numpy(dtype='uint8')
        x_val = val_dir[val_height >= MIN_SIZE].to_numpy(dtype=str)
        y_val = labels_val[val_height >= MIN_SIZE].to_numpy(dtype='uint8')
        filenames, labels = concatenar_train_val(x_train, x_val, y_train, y_val)
        filenames_shuffled, labels_shuffled = shuffle(filenames, labels)
        x_train, x_val, y_train, y_val = train_test_split(filenames_shuffled, labels_shuffled, test_size=PER_VAL)
        return x_train, x_val, y_train, y_val
    else:
        filenames, labels = concatenar_train_val(train_dir, val_dir, labels_train, labels_val)
        filenames_shuffled, labels_shuffled = shuffle(filenames, labels)
        x_train, x_val, y_train, y_val = train_test_split(filenames_shuffled, labels_shuffled, test_size=PER_VAL)
        return x_train, x_val, y_train, y_val

def es_cuadrado(train_width, train_height, val_width, val_height):
    # Comprobamos si el dataset tiene las imagenes cuadradas
    if any(train_width != train_height):
        return False
    elif any(val_width != val_height):
        return False
    else:
        return True

def concatenar_train_val(x_train, x_val, y_train, y_val):
    # Unimos los datos de entrenamiento y validacion en un mismo array
    return concatenate((x_train, x_val), axis=0), concatenate((y_train, 
                                                               y_val), axis=0)

def numero_de_imagenes_por_etiqueta(labels):
    # Cuenta el numero de imagenes/etiqueta
    return sum(to_categorical(labels, num_classes = CLASSES, dtype = 'uint32'), 
               axis=0)

def save_generate_train_val_sets(x_train, x_val, y_train, y_val):
    # Generamos archivos con los datos para entrenar y validar
    # Devuelve las rutas y etiquetas para entrenamiento y validación

    save(AFFECTNET_PATH + '/x_train_data_' + str(CLASSES) + 'classes_' + prop1 + '_' + prop2 + '_size=' + str(MIN_SIZE) + '.npy', x_train)
    save(AFFECTNET_PATH + '/x_val_data_' + str(CLASSES) + 'classes_' + prop1 + '_' + prop2 + '_size=' + str(MIN_SIZE) + '.npy', x_val)
    save(AFFECTNET_PATH + '/y_train_data_' + str(CLASSES) + 'classes_' + prop1 + '_' + prop2 + '_size=' + str(MIN_SIZE) + '.npy', to_categorical(y_train, num_classes = CLASSES, dtype = 'uint8'))
    save(AFFECTNET_PATH + '/y_val_data_' + str(CLASSES) + 'classes_' + prop1 + '_' + prop2 + '_size=' + str(MIN_SIZE) + '.npy', to_categorical(y_val, num_classes = CLASSES, dtype = 'uint8'))
    
    return x_train, x_val, to_categorical(y_train, num_classes = CLASSES, dtype = 'uint8'), to_categorical(y_val, num_classes = CLASSES, dtype = 'uint8')

def despreciar_clases(x_train, x_val, y_train, y_val):
    if CLASSES == 3:
        indice_train = ((y_train == 0) | (y_train == 1) | (y_train == 2) | (y_train == 3)| (y_train == 4)| (y_train == 5)| (y_train == 6)| (y_train == 7))
        indice_val = ((y_val == 0) | (y_val == 1) | (y_val == 2) | (y_val == 3)| (y_val == 4)| (y_val == 5)| (y_val == 6)| (y_val == 7))
        
        positive_train_idx = ((y_train == 1) | (y_train == 3))
        negative_train_idx = ((y_train == 2) | (y_train == 4) | (y_train == 5) | (y_train == 6) | (y_train == 7))
        neutral_train_idx = (y_train == 0)

        positive_val_idx = ((y_val == 1) | (y_val == 3))
        negative_val_idx = ((y_val == 2) | (y_val == 4) | (y_val == 5) | (y_val == 6) | (y_val == 7))
        neutral_val_idx = (y_val == 0)

        y_train[positive_train_idx] = 0
        y_train[negative_train_idx] = 1
        y_train[neutral_train_idx] = 2

        y_val[positive_val_idx] = 0
        y_val[negative_val_idx] = 1
        y_val[neutral_val_idx] = 2
    if CLASSES == 2:
        indice_train = ((y_train == 1) | (y_train == 2))
        indice_val = ((y_val == 1) | (y_val == 2))
        
        positive_train_idx = (y_train == 1)
        negative_train_idx = (y_train == 2)

        positive_val_idx = (y_val == 1)
        negative_val_idx = (y_val == 2)

        y_train[positive_train_idx] = 0
        y_train[negative_train_idx] = 1

        y_val[positive_val_idx] = 0
        y_val[negative_val_idx] = 1

    return x_train[indice_train], x_val[indice_val], y_train[indice_train], y_val[indice_val]

In [None]:
print('Cargando CSV')
train_dir, val_dir, train_width, train_height, val_width, val_height, \
                                        labels_train, labels_val = leer_CSV()
if es_cuadrado(train_width, train_height, val_width, val_height):
    print('Imagenes cuadradas')
else:
    print('Imagenes no cuadradas')

print('Filtrado')
x_train, x_val, y_train, y_val = filtrado(train_dir, train_height, 
                                          labels_train, val_dir, 
                                          labels_val, val_height)


x_train, x_val, y_train, y_val = despreciar_clases(x_train, x_val, y_train, y_val)
x_train, x_val, y_train, y_val = balanceador_clases(x_train, x_val, y_train, y_val)

print('Guardando...')
print("Imagenes por etiquetas: ")
print(numero_de_imagenes_por_etiqueta(concatenate((y_train, y_val), axis=0)))
print("Imagenes de entrenamiento: " + str(len(x_train)))
print("Imagenes de validación: " + str(len(x_val)))
x_train, x_val, y_train, y_val = save_generate_train_val_sets(x_train, x_val, y_train, y_val)


Cargando CSV
Imagenes cuadradas
Filtrado
Guardando...
Imagenes por etiquetas: 
[32000 31676 32000]
Imagenes de entrenamiento: 76540
Imagenes de validación: 19136
