# Importar librerías

In [None]:
import tensorflow as tf
import json
import numpy as np
from matplotlib import pyplot as plt
import albumentations as alb
import cv2
import os
import time
import uuid

# Recolectar imágenes con OpenCV

Definir el número de imágenes a generar inicialmente.

In [None]:
PATH_IMAGENES = os.path.join('data','imagenes')
n_imagenes = 90

Crear carpeta para guardar imágenes.

In [None]:
try:
    os.makedirs(PATH_IMAGENES)
except:
    pass

- Abrir videocamara con OpenCV.
- Capturar *n_imagenes* imágenes, una cada medio segundo.

In [None]:
cap = cv2.VideoCapture(0)
for imgnum in range(n_imagenes):
    print('Imagen {}'.format(imgnum))
    ret, frame = cap.read()
    imgname = os.path.join(PATH_IMAGENES,f'{str(uuid.uuid1())}.jpg')
    cv2.imwrite(imgname, frame)
    cv2.imshow('frame', frame)
    time.sleep(0.5)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

# Crear anotaciones con LabelMe

Crear carpeta para guardar anotaciones.

In [None]:
try:
    os.makedirs(os.path.join("data", "labels"))
except:
    pass

Abrir programa LabelMe para crear manualmente las anotaciones.

- Permite hacer rectángulo (*box*) fácilmente sobre imagen y colocarle *label* (etiqueta).
- Guarda *json* por cada imagen con las coordenadas del rectángulo y el *label*.

> Si no está instalado, ejecutar comando *pip install labelme*.

In [None]:
!labelme

# Separar en conjuntos de entrenamiento, validación y prueba

Crear carpetas para imagenes y anotaciones.

In [None]:
for carpeta in ['train','test','val']:
    try: 
        os.makedirs(os.path.join("data", carpeta))
        os.makedirs(os.path.join("data", carpeta, "imagenes"))
        os.makedirs(os.path.join("data", carpeta, "labels"))
    except:
        pass

Separar imagenes en conjuntos de entrenamiento, validación y prueba.

In [None]:
porcentaje_val = 0.15
porcentaje_test = 0.15
imagenes = np.array(os.listdir(PATH_IMAGENES))

shuffled_indices = np.random.permutation(len(imagenes))

n_val = int(len(imagenes) * porcentaje_val)
n_test = int(len(imagenes) * porcentaje_test)
indices_val = shuffled_indices[: n_val]
indices_test = shuffled_indices[n_val : n_test + n_val]
indices_train = shuffled_indices[n_test + n_val :]

imagenes_train = imagenes[indices_train]
imagenes_val = imagenes[indices_val]
imagenes_test = imagenes[indices_test]

Mover las imágenes a las carpetas correspondientes.

In [None]:
for imagen in imagenes_train:
    path_inicial = os.path.join(PATH_IMAGENES, imagen)
    path_final = os.path.join("data", "train", "imagenes", imagen)
    os.replace(path_inicial, path_final)
    
for imagen in imagenes_val:
    path_inicial = os.path.join(PATH_IMAGENES, imagen)
    path_final = os.path.join("data", "val", "imagenes", imagen)
    os.replace(path_inicial, path_final)
    
for imagen in imagenes_test:
    path_inicial = os.path.join(PATH_IMAGENES, imagen)
    path_final = os.path.join("data", "test", "imagenes", imagen)
    os.replace(path_inicial, path_final)

Mover las anotaciones correspondientes.

In [None]:
for carpeta in ['train','test','val']:
    for imagen in os.listdir(os.path.join('data', carpeta, 'imagenes')):
        
        nombre = imagen.split('.')[0]+'.json'
        path_label = os.path.join('data','labels', nombre)
        if os.path.exists(path_label): 
            nuevo_path_label = os.path.join('data', carpeta ,'labels', nombre)
            os.replace(path_label, nuevo_path_label)    

# Aumento de datos (Data augmentation)

Crear carpetas para las imágenes y las etiquetas "aumentadas".

In [None]:
for carpeta in ['train','test','val']:
    try: 
        os.makedirs(os.path.join("aug_data", carpeta))
        os.makedirs(os.path.join("aug_data", carpeta, "imagenes"))
        os.makedirs(os.path.join("aug_data", carpeta, "labels"))
    except:
        pass

- Se crea objeto "augmentor" con parametros a usar para cambiar imagen original.
- Se pasan también las coordenadas del rectángulo y las labels para que las modifique acorde a la transformación.

In [None]:
augmentor = alb.Compose([alb.RandomCrop(width=450, height=450), 
                         alb.HorizontalFlip(p=0.5), 
                         alb.RandomBrightnessContrast(p=0.2),
                         alb.RandomGamma(p=0.2), 
                         alb.RGBShift(p=0.2), 
                         alb.VerticalFlip(p=0.5)], 
                       bbox_params=alb.BboxParams(format='albumentations', 
                                                  label_fields=['class_labels']))

- Se aplica la transformación definida en *augmentor* *n_multiplicacion* veces sobre cada imagen.
- Se obtienen *n_multiplicacion* nuevos *datapoint* por cada *datapoint* original.

In [None]:
n_multiplicacion = 60
for carpeta in ['train','test','val']: 
    for imagen in os.listdir(os.path.join('data', carpeta, 'imagenes')):
        img = cv2.imread(os.path.join('data', carpeta, 'imagenes', imagen))

        coords = [0,0,0.00001,0.00001]
        label_path = os.path.join('data', carpeta, 'labels', f'{imagen.split(".")[0]}.json')
        if os.path.exists(label_path):
            with open(label_path, 'r') as f:
                label = json.load(f)

            coords[0] = label['shapes'][0]['points'][0][0]
            coords[1] = label['shapes'][0]['points'][0][1]
            coords[2] = label['shapes'][0]['points'][1][0]
            coords[3] = label['shapes'][0]['points'][1][1]
            coords = list(np.divide(coords, [640,480,640,480]))

        try: 
            for x in range(n_multiplicacion):
                augmented = augmentor(image=img, bboxes=[coords], class_labels=['face'])
                cv2.imwrite(os.path.join('aug_data', carpeta, 'imagenes', f'{imagen.split(".")[0]}.{x}.jpg'), augmented['image'])

                anotacion = {}
                anotacion['image'] = imagen

                if os.path.exists(label_path):
                    if len(augmented['bboxes']) == 0: 
                        anotacion['bbox'] = [0,0,0,0]
                        anotacion['class'] = 0 
                    else: 
                        anotacion['bbox'] = augmented['bboxes'][0]
                        anotacion['class'] = 1
                else: 
                    anotacion['bbox'] = [0,0,0,0]
                    anotacion['class'] = 0 


                with open(os.path.join('aug_data', carpeta, 'labels', f'{imagen.split(".")[0]}.{x}.json'), 'w') as f:
                    json.dump(anotacion, f)

        except Exception as e:
            print(e)

# Procesamiento de datos

### Procesamiento de imágenes

- Se cargan las imágenes desde su *path* a un tensor de TensorFlow.
- Se escalan a *120 x 120* pixeles.
- Se escalan los valores de cada pixel entre 0 y 1.


In [None]:
def load_image(x): 
    byte_img = tf.io.read_file(x)
    img = tf.io.decode_jpeg(byte_img)
    return img

In [None]:
train_imagenes = tf.data.Dataset.list_files('aug_data\\train\\imagenes\\*.jpg', shuffle=False)
train_imagenes = train_imagenes.map(load_image)
train_imagenes = train_imagenes.map(lambda x: tf.image.resize(x, (120,120)))
train_imagenes = train_imagenes.map(lambda x: x/255)

val_imagenes = tf.data.Dataset.list_files('aug_data\\val\\imagenes\\*.jpg', shuffle=False)
val_imagenes = val_imagenes.map(load_image)
val_imagenes = val_imagenes.map(lambda x: tf.image.resize(x, (120,120)))
val_imagenes = val_imagenes.map(lambda x: x/255)

test_imagenes = tf.data.Dataset.list_files('aug_data\\test\\imagenes\\*.jpg', shuffle=False)
test_imagenes = test_imagenes.map(load_image)
test_imagenes = test_imagenes.map(lambda x: tf.image.resize(x, (120,120)))
test_imagenes = test_imagenes.map(lambda x: x/255)

### Procesamiento de etiquetas

- Se cargan los archivos de anotaciones formato *json*.
- Se extraen la etiqueta y las coordenadas de los rectángulos y se crea en tensor a partir de ellos.

In [None]:
def load_labels(label_path):
    with open(label_path.numpy(), 'r', encoding = "utf-8") as f:
        label = json.load(f)
        
    return [label['class']], label['bbox']

In [None]:
train_labels = tf.data.Dataset.list_files('aug_data\\train\\labels\\*.json', shuffle=False)
train_labels = train_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16]))

val_labels = tf.data.Dataset.list_files('aug_data\\val\\labels\\*.json', shuffle=False)
val_labels = val_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16]))

test_labels = tf.data.Dataset.list_files('aug_data\\test\\labels\\*.json', shuffle=False)
test_labels = test_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16]))

### Unir dataset

Se unen los tensores correspondientes a las imágenes con los tensores de las etiquetas, por cada conjunto.

In [None]:
train = tf.data.Dataset.zip((train_imagenes, train_labels))
train = train.shuffle(5000)
train = train.batch(8)
train = train.prefetch(4)

val = tf.data.Dataset.zip((val_imagenes, val_labels))
val = val.shuffle(1000)
val = val.batch(8)
val = val.prefetch(4)

test = tf.data.Dataset.zip((test_imagenes, test_labels))
test = test.shuffle(1300)
test = test.batch(8)
test = test.prefetch(4)

# Creación de modelo

### Importación de capas

In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, Dense, GlobalMaxPooling2D
from tensorflow.keras.applications import VGG16

### Descarga de VGG16

Se descarga la red neuronal convolucional VGG16 ya preentrenada, sin las capas finales.

In [None]:
vgg = VGG16(include_top=False)

### Construir el modelo

El modelo consiste en dos partes:
   - Clasificación: Para determinar la etiqueta de la detección.
   - Regresión: Para predecir las coordenadas del rectángulo donde se encuentra el objeto.
   
Cada parte se basa en las *feature* entregadas por la red VGG16 y agrega capas sobre esta.
   - Clasificación: Agrega una capa de *Max Pooling*, luego una capa oculta *fully connected* y finalmente la capa de salida con una neurona y activación sigmoidea.
   - Regresión: Agrega una capa de *Max Pooling*, luego una capa oculta *fully connected* y finalmente la capa de salida con 4 neuronas de salida y función de activación sigmoidea.
   
   


In [None]:
def build_model(): 
    capa_entrada = Input(shape=(120,120,3))
    
    vgg = VGG16(include_top=False)(capa_entrada)

    # Modelo para clasificación
    capa_pooling_clasificador = GlobalMaxPooling2D()(vgg)
    capa_oculta_clasificador = Dense(2048, activation='relu')(capa_pooling_clasificador)
    salida_clasificador = Dense(1, activation='sigmoid')(capa_oculta_clasificador)
    
    # Modelo para predicción de coordenadas del rectángulo
    capa_pooling_regresor = GlobalMaxPooling2D()(vgg)
    capa_oculta_regresor = Dense(2048, activation='relu')(capa_pooling_regresor)
    salida_regresor = Dense(4, activation='sigmoid')(capa_oculta_regresor)
    
    modelo = Model(inputs=capa_entrada, outputs=[salida_clasificador, salida_regresor])
    return modelo

In [None]:
modelo = build_model()

### Definir optimizador y paramétros de entrenamiento

Se usa Adam como optimizador.

In [None]:
batches_por_epoca = len(train)
lr_decay = (1./0.75 -1)/batches_por_epoca

In [None]:
optimizador = tf.keras.optimizers.Adam(learning_rate=0.0001)

### Definir función de loss para regresión

Se usa la función de pérdida:

$$loss = \sum (x - \hat{x})^2 + (y - \hat{y})^2 + (w - \hat{w})^2 + (h - \hat{h})^2$$

- Donde *x* corresponde a la primera coordenada del punto superior izquierdo del rectángulo.
- Donde *y* corresponde a la segunda coordenada del punto superior izquierdo del rectángulo.
- Donde *w* corresponde al ancho del rectángulo.
- Donde *h* corresponde al alto del rectángulo.

- El "sombrero" hace referencia a la predicción.

In [None]:
def loss_regresion(y_true, yhat):            
    delta_coord = tf.reduce_sum(tf.square(y_true[:,:2] - yhat[:,:2]))
                  
    h_true = y_true[:,3] - y_true[:,1] 
    w_true = y_true[:,2] - y_true[:,0] 

    h_pred = yhat[:,3] - yhat[:,1] 
    w_pred = yhat[:,2] - yhat[:,0] 
    
    delta_size = tf.reduce_sum(tf.square(w_true - w_pred) + tf.square(h_true-h_pred))
    
    return delta_coord + delta_size

### Definir loss para clasificación
Se usa Entropía cruzada para la pérdida de clasificación.

In [None]:
loss_clasificacion = tf.keras.losses.BinaryCrossentropy()

# Entrenamiento del modelo

### Crear clase Modelo personalizada

Requiere definir los siguientes métodos:

- **compile**: Para declarar las funciones de *loss* y el optimizador a utilizar para entrenar.
- **train_step**: Para ajustar los pesos del modelo usando gradiente descendiente. Notar que se usa como función de *loss* una suma ponderada del *loss* de regresión y el de clasificación.
- **test_step**: Para realizar una predicciones sobre el conjunto de *test*. Guarda las *losses* en un diccionario.
- **call**: Para hacer predicciones con el modelo, se llama con el método *predict*.

In [None]:
class FaceTracker(Model): 
    def __init__(self, eyetracker,  **kwargs): 
        super().__init__(**kwargs)
        self.model = eyetracker

    def compile(self, optimizador, loss_clasificacion, loss_regresion, **kwargs):
        super().compile(**kwargs)
        self.loss_clasificacion = loss_clasificacion
        self.loss_regresion = loss_regresion
        self.opt = optimizador
    
    def train_step(self, batch, **kwargs): 
        
        X, y = batch
        
        with tf.GradientTape() as tape: 
            clases, coords = self.model(X, training=True)
            
            loss_clasificacion_batch = self.loss_clasificacion(y[0], clases)
            loss_regresion_batch = self.loss_regresion(tf.cast(y[1], tf.float32), coords)
            
            loss_total = loss_regresion_batch+0.5*loss_clasificacion_batch
            
            grad = tape.gradient(loss_total, self.model.trainable_variables)
        
        self.opt.apply_gradients(zip(grad, self.model.trainable_variables))
        
        return {"loss_total":loss_total, "loss_clasificacion":loss_clasificacion_batch, "loss_regresion":loss_regresion_batch}
    
    def test_step(self, batch, **kwargs): 
        X, y = batch
        
        clases, coords = self.model(X, training=False)
        
        loss_clasificacion_batch = self.closs(y[0], clases)
        loss_regresion_batch = self.lloss(tf.cast(y[1], tf.float32), coords)
        loss_total = loss_regresion_batch +0.5*loss_clasificacion_batch
        
        {"loss_total":loss_total, "loss_clasificacion":loss_clasificacion_batch, "loss_regresion":loss_regresion_batch}
    
        
    def call(self, X, **kwargs): 
        return self.model(X, **kwargs)

In [None]:
modelo = FaceTracker(modelo)

In [None]:
modelo.compile(optimizador, loss_clasificacion, loss_regresion)

### GPU
Verificar si se tiene GPU para entrenamiento.

In [None]:
print("Número de GPUs disponibles: ", len(tf.config.list_physical_devices('GPU')))

Limitar crecimiento de uso de memoría por GPU.

In [None]:
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus: 
    tf.config.experimental.set_memory_growth(gpu, True)

### Entrenar el modelo

- Se define el directorio para el *callback* de Tensorboard (Permite ver los resultados del entrenamiento en tiempo real).
- Se ejecuta el método *fit* para ajustar los pesos con los datos de entrenamiento.
- Se ingresa el conjunto de validación para ir calculando el *loss* tanto en entrenamiento como en validación para poder evaluar si existe *overfitting* o *underfittting*.

In [None]:
logdir='logs'

In [None]:
callback_tensorboard = tf.keras.callbacks.TensorBoard(log_dir=logdir)

In [None]:
hist = modelo.fit(train, epochs=10, validation_data=val, callbacks=[callback_tensorboard])

### Graficar losses en entrenamiento y en validación

- Se hacen 3 gráficos de *losses*: *loss* total, *loss* de clasificación y *loss* de regresión.
- Por cada uno se grafican tanto el *loss* de entrenamiento como el *loss* de validación.

In [None]:
fig, ax = plt.subplots(ncols=3, figsize=(20,5))

ax[0].plot(hist.history['loss_total'], color='teal', label='train loss')
ax[0].plot(hist.history['val_loss_total'], color='orange', label='val loss')
ax[0].title.set_text('Loss')
ax[0].legend()

ax[1].plot(hist.history['loss_clasificacion'], color='teal', label='train loss')
ax[1].plot(hist.history['val_loss_clasificacion'], color='orange', label='val loss')
ax[1].title.set_text('Loss de clasificación')
ax[1].legend()

ax[2].plot(hist.history['loss_regresion'], color='teal', label='train loss')
ax[2].plot(hist.history['val_loss_regresion'], color='orange', label='val loss')
ax[2].title.set_text('Loss de regresión')
ax[2].legend()

plt.show()

### Guardar el modelo

In [None]:
modelo.save('modelo.h5')

### Cargar el modelo

In [None]:
from tensorflow.keras.models import load_model
modelo = load_model('modelo.h5')

# Probar el modelo

###  Predicciones en el conjunto de prueba

Se carga un batch de conjunto de prueba y se hace predicción con el modelo obtenido.

In [None]:
test_data = test.as_numpy_iterator()
ejemplo_test = test_data.next()
prediccion = modelo.predict(ejemplo_test[0])

Se grafican 4 imágenes del batch de prueba y se dibuja encima el rectángulo predicho por el modelo.

In [None]:
fig, ax = plt.subplots(ncols=4, figsize=(20,20))
for idx in range(4): 
    ejemplo_imagen = ejemplo_test[0][idx]
    ejemplo_coords = prediccion[1][idx]
    
    if prediccion[0][idx] > 0.9:
        cv2.rectangle(ejemplo_imagen, 
                      tuple(np.multiply(ejemplo_coords[:2], [120,120]).astype(int)),
                      tuple(np.multiply(ejemplo_coords[2:], [120,120]).astype(int)), 
                            (255,0,0), 2)
    
    ax[idx].imshow(ejemplo_imagen)

# Implementar el modelo

- Se abre videocamera con OpenCV.
- Se toman los *frame* y se recortan, tomando los primeros *450 x 450* pixeles.
- Se reescala y normaliza el *frame* para poder ingresarlo al modelo.
- Se realiza predicción con el modelo.
- Se dibuja rectangulo con cv2.rectangle y label con cv2.putText.
- Se muestra el *frame* modificado en pantalla.
- Salir presionando "q".

In [None]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    _ , frame = cap.read()
    
    tamano_frame = [450, 450]
    print(frame.shape)
    frame = frame[0:450, 0:450,:]
    
    print(frame.shape)
    
    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    img_escalada = tf.image.resize(rgb, (120,120))
    
    prediccion = modelo.predict(np.expand_dims(img_escalada/255,0))
    coords = prediccion[1][0]
    
    if prediccion[0] > 0.5: 
        # Recuadro de prediccion
        cv2.rectangle(frame, 
                      tuple(np.multiply(coords[:2], tamano_frame).astype(int)),
                      tuple(np.multiply(coords[2:], tamano_frame).astype(int)),
                            (255,0,0), 2)
        # Recuadro para label
        cv2.rectangle(frame, 
                      tuple(np.add(np.multiply(coords[:2], tamano_frame).astype(int), 
                                    [0,-30])),
                      tuple(np.add(np.multiply(coords[:2], tamano_frame).astype(int),
                                    [80,0])), 
                            (255,0,0), -1)
        
        # Texto con label de clasificación
        cv2.putText(frame, 'face', tuple(np.add(np.multiply(coords[:2], tamano_frame).astype(int),
                                               [0,-5])),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
    
    cv2.imshow('FaceTracker', frame)
    
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()