Entrenando la arquitectura ResNet

Keras tiene a nuestra disposición la arquitectura ResNet. Vamos a entrenar este modelo. Debido a que vamos a usar la definida en Keras (aunque podríamos crearla directamente), debemos aumentar el tamaño de las imágenes a 48 píxeles. Para ello, usaremos el siguiente código:

Importando Librerías

In [None]:
import numpy as np
from scipy import misc
from PIL import Image
import glob
import matplotlib.pyplot as plt
import scipy.misc
from matplotlib.pyplot import imshow
%matplotlib inline
from IPython.display import SVG
import cv2
import seaborn as sn
import pandas as pd
import pickle
from keras import layers
from keras.layers import Flatten, Input, Add, Dense, Activation, ZeroPadding2D, BatchNormalization, Flatten, Conv2D, AveragePooling2D, MaxPooling2D, GlobalMaxPooling2D, Dropout
from keras.models import Sequential, Model, load_model
from keras.preprocessing import image
from keras.preprocessing.image import load_img
from keras.preprocessing.image import img_to_array
from keras.applications.imagenet_utils import decode_predictions
from keras.utils import layer_utils, np_utils
from keras.utils.data_utils import get_file
from keras.applications.imagenet_utils import preprocess_input
from keras.utils.vis_utils import model_to_dot
from keras.utils.vis_utils import plot_model
from keras.initializers import glorot_uniform
from keras import losses
import keras.backend as K
from keras.callbacks import ModelCheckpoint
from sklearn.metrics import confusion_matrix, classification_report
import tensorflow as tf

Preparando el conjunto de datos Como antes, usaremos el conjunto de datos CIFAR-100, que, como ya dijimos, consta de 600 imágenes por cada clase de un total de 100 clases. Se divide en 500 imágenes para entrenamiento y 100 imágenes para validación por cada clase. Las 100 clases están agrupadas en 20 superclases. Cada imagen tiene una etiqueta "fina" (la clase, de entre las 100, a la que pertenece) y una etiqueta "gruesa" (correspondiente a su superclase).

In [None]:
from keras.datasets import cifar100

(x_train_original, y_train_original), (x_test_original, y_test_original) = cifar100.load_data(label_mode='fine')

Actualmente, hemos descargado los datasets de entrenamiento y validación. x_train_original y x_test_original son los conjuntos de datos con lás imágenes de entrenamiento y validación respectivamente, mientras que y_train_original y y_test_original son los datasets con las etiquetas.

Veamos la forma de y_train_original:
    array([[19], [29], [ 0], ..., [ 3], [ 7], [73]])  
    
Como se puede ver, se trata de un array donde cada número se corresponde con la etiqueta concreta. Lo primero que hay que hacer es convertir este array en su versión one-hot-encoding    

In [None]:
y_train = np_utils.to_categorical(y_train_original, 100)
y_test = np_utils.to_categorical(y_test_original, 100)

Bien, representa la imagen en los 3 canales RGB de 256 píxeles. Vamos a verla:

In [None]:
imgplot = plt.imshow(x_train_original[3])
plt.show()

Lo que haremos a continuación, es normalizar las imágenes. Esto es, dividiremos cada elemento de x_train_original y xtestoriginal por el numero de píxeles, es decir, 255. Con esto obtenemos que el array comprenderá valores de entre 0 y 1. Con esto el entrenamiento suele aportar mejores resultados.

In [None]:
x_train = x_train_original/255  
x_test = x_test_original/255  

Preparando el entorno
El siguiente paso es definir ciertos parámetros sobre el experimento en Keras. Lo primero será especificar a Keras dónde se encuentran los canales. En un array de imágenes, pueden venir como último indice o como el primero. Esto se conoce como canales primero (channels first) o canales al final (channels last). En nuestro caso, vamos a definirlos al final.

In [None]:
K.set_image_data_format('channels_last')  

Lo siguiente que vamos a especificar es la fase del experimento. En este caso, la fase será de entrenamiento:

In [None]:
K.set_learning_phase(True)  

In [None]:
x_train_original[0]

Entrenando la arquitectura ResNet
Keras tiene a nuestra disposición ésta arquitectura, pero tiene el problema que, por defecto, el tamaño de las imágenes debe ser mayor a 187 píxeles, por lo que definiremos una arquitectura más pequeña.

In [None]:
def CustomResNet50(include_top=True, input_tensor=None, input_shape=(32,32,3), pooling=None, classes=100):  
    if input_tensor is None:
        img_input = Input(shape=input_shape)
    else:
        if not K.is_keras_tensor(input_tensor):
            img_input = Input(tensor=input_tensor, shape=input_shape)
        else:
            img_input = input_tensor
    if K.image_data_format() == 'channels_last':
        bn_axis = 3
    else:
        bn_axis = 1

    x = ZeroPadding2D(padding=(2, 2), name='conv1_pad')(img_input)

    x = resnet50.conv_block(x, 3, [32, 32, 64], stage=2, block='a')
    x = resnet50.identity_block(x, 3, [32, 32, 64], stage=2, block='b')
    x = resnet50.identity_block(x, 3, [32, 32, 64], stage=2, block='c')

    x = resnet50.conv_block(x, 3, [64, 64, 256], stage=3, block='a', strides=(1, 1))
    x = resnet50.identity_block(x, 3, [64, 64, 256], stage=3, block='b')
    x = resnet50.identity_block(x, 3, [64, 64, 256], stage=3, block='c')

    x = resnet50.conv_block(x, 3, [128, 128, 512], stage=4, block='a')
    x = resnet50.identity_block(x, 3, [128, 128, 512], stage=4, block='b')
    x = resnet50.identity_block(x, 3, [128, 128, 512], stage=4, block='c')
    x = resnet50.identity_block(x, 3, [128, 128, 512], stage=4, block='d')

    x = resnet50.conv_block(x, 3, [256, 256, 1024], stage=5, block='a')
    x = resnet50.identity_block(x, 3, [256, 256, 1024], stage=5, block='b')
    x = resnet50.identity_block(x, 3, [256, 256, 1024], stage=5, block='c')
    x = resnet50.identity_block(x, 3, [256, 256, 1024], stage=5, block='d')
    x = resnet50.identity_block(x, 3, [256, 256, 1024], stage=5, block='e')
    x = resnet50.identity_block(x, 3, [256, 256, 1024], stage=5, block='f')

    x = resnet50.conv_block(x, 3, [512, 512, 2048], stage=6, block='a')
    x = resnet50.identity_block(x, 3, [512, 512, 2048], stage=6, block='b')
    x = resnet50.identity_block(x, 3, [512, 512, 2048], stage=6, block='c')

    x = AveragePooling2D((1, 1), name='avg_pool')(x)

    if include_top:
        x = Flatten()(x)
        x = Dense(classes, activation='softmax', name='fc1000')(x)
    else:
        if pooling == 'avg':
            x = GlobalAveragePooling2D()(x)
        elif pooling == 'max':
            x = GlobalMaxPooling2D()(x)

    # Ensure that the model takes into account
    # any potential predecessors of `input_tensor`.
    if input_tensor is not None:
        inputs = get_source_inputs(input_tensor)
    else:
        inputs = img_input
    # Create model.
    model = Model(inputs, x, name='resnet50')

    return model

Compilamos como hasta ahora...

In [None]:
def create_custom_resnet50():  
  model = CustomResNet50(include_top=True, input_tensor=None, input_shape=(32,32,3), pooling=None, classes=100)

  return model

custom_resnet50_model = create_custom_resnet50()  
custom_resnet50_model.compile(loss='categorical_crossentropy', optimizer='sgd', metrics=['acc', 'mse'])  

Una vez hecho esto, vamos a ver un resumen del modelo creado.

In [None]:
custom_resnet50_model.summary()

Recordemos que la arquitectura VGG-16 tenía aproximadamente 34 millones de parámetros a entrenar. Esto quiere decir que hemos aumentado la profundidad pero hemos reducido el número de parámetros a entrenar.

Bien, dicho esto, pasamos a entrenar el modelo.

In [None]:
crn50 = custom_resnet50_model.fit(x=x_train, y=y_train, batch_size=32, epochs=10, verbose=1, validation_data=(x_test, y_test), shuffle=True)

Veamos las métricas obtenidas para el entrenamiento y validación gráficamente.

In [None]:
plt.figure(0)  
plt.plot(crn50.history['acc'],'r')  
plt.plot(crn50.history['val_acc'],'g')  
plt.xticks(np.arange(0, 11, 2.0))  
plt.rcParams['figure.figsize'] = (8, 6)  
plt.xlabel("Num of Epochs")  
plt.ylabel("Accuracy")  
plt.title("Training Accuracy vs Validation Accuracy")  
plt.legend(['train','validation'])

plt.figure(1)  
plt.plot(crn50.history['loss'],'r')  
plt.plot(crn50.history['val_loss'],'g')  
plt.xticks(np.arange(0, 11, 2.0))  
plt.rcParams['figure.figsize'] = (8, 6)  
plt.xlabel("Num of Epochs")  
plt.ylabel("Loss")  
plt.title("Training Loss vs Validation Loss")  
plt.legend(['train','validation'])

plt.show()  

Matriz de confusión
Pasemos ahora a ver la matriz de confusión y las métricas de Accuracy, Recall y F1-score.

Vamos a hacer una predicción sobre el dataset de validación y, a partir de ésta, generamos la matriz de confusión y mostramos las métricas mencionadas anteriormente.

In [None]:
crn50_pred = custom_resnet50_model.predict(x_test, batch_size=32, verbose=1)  
crn50_predicted = np.argmax(crn50_pred, axis=1)

crn50_cm = confusion_matrix(np.argmax(y_test, axis=1), crn50_predicted)

# Visualizing of confusion matrix
crn50_df_cm = pd.DataFrame(crn50_cm, range(100), range(100))  
plt.figure(figsize = (20,14))  
sn.set(font_scale=1.4) #for label size  
sn.heatmap(crn50_df_cm, annot=True, annot_kws={"size": 12}) # font size  
plt.show() 

Y por último, mostramos las métricas

In [None]:
crn50_report = classification_report(np.argmax(y_test, axis=1), crn50_predicted)  
print(crn50_report)

Curva ROC (tasas de verdaderos positivos y falsos positivos)
Vamos a codificar la curva ROC.

In [None]:
rom sklearn.datasets import make_classification  
from sklearn.preprocessing import label_binarize  
from scipy import interp  
from itertools import cycle

n_classes = 100

from sklearn.metrics import roc_curve, auc

# Plot linewidth.
lw = 2

# Compute ROC curve and ROC area for each class
fpr = dict()  
tpr = dict()  
roc_auc = dict()  
for i in range(n_classes):  
    fpr[i], tpr[i], _ = roc_curve(y_test[:, i], crn50_pred[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])

# Compute micro-average ROC curve and ROC area
fpr["micro"], tpr["micro"], _ = roc_curve(y_test.ravel(), crn50_pred.ravel())  
roc_auc["micro"] = auc(fpr["micro"], tpr["micro"])

# Compute macro-average ROC curve and ROC area

# First aggregate all false positive rates
all_fpr = np.unique(np.concatenate([fpr[i] for i in range(n_classes)]))

# Then interpolate all ROC curves at this points
mean_tpr = np.zeros_like(all_fpr)  
for i in range(n_classes):  
    mean_tpr += interp(all_fpr, fpr[i], tpr[i])

# Finally average it and compute AUC
mean_tpr /= n_classes

fpr["macro"] = all_fpr  
tpr["macro"] = mean_tpr  
roc_auc["macro"] = auc(fpr["macro"], tpr["macro"])

# Plot all ROC curves
plt.figure(1)  
plt.plot(fpr["micro"], tpr["micro"],  
         label='micro-average ROC curve (area = {0:0.2f})'
               ''.format(roc_auc["micro"]),
         color='deeppink', linestyle=':', linewidth=4)

plt.plot(fpr["macro"], tpr["macro"],  
         label='macro-average ROC curve (area = {0:0.2f})'
               ''.format(roc_auc["macro"]),
         color='navy', linestyle=':', linewidth=4)

colors = cycle(['aqua', 'darkorange', 'cornflowerblue'])  
for i, color in zip(range(n_classes-97), colors):  
    plt.plot(fpr[i], tpr[i], color=color, lw=lw,
             label='ROC curve of class {0} (area = {1:0.2f})'
             ''.format(i, roc_auc[i]))

plt.plot([0, 1], [0, 1], 'k--', lw=lw)  
plt.xlim([0.0, 1.0])  
plt.ylim([0.0, 1.05])  
plt.xlabel('False Positive Rate')  
plt.ylabel('True Positive Rate')  
plt.title('Some extension of Receiver operating characteristic to multi-class')  
plt.legend(loc="lower right")  
plt.show()


# Zoom in view of the upper left corner.
plt.figure(2)  
plt.xlim(0, 0.2)  
plt.ylim(0.8, 1)  
plt.plot(fpr["micro"], tpr["micro"],  
         label='micro-average ROC curve (area = {0:0.2f})'
               ''.format(roc_auc["micro"]),
         color='deeppink', linestyle=':', linewidth=4)

plt.plot(fpr["macro"], tpr["macro"],  
         label='macro-average ROC curve (area = {0:0.2f})'
               ''.format(roc_auc["macro"]),
         color='navy', linestyle=':', linewidth=4)

colors = cycle(['aqua', 'darkorange', 'cornflowerblue'])  
for i, color in zip(range(10), colors):  
    plt.plot(fpr[i], tpr[i], color=color, lw=lw,
             label='ROC curve of class {0} (area = {1:0.2f})'
             ''.format(i, roc_auc[i]))

plt.plot([0, 1], [0, 1], 'k--', lw=lw)  
plt.xlabel('False Positive Rate')  
plt.ylabel('True Positive Rate')  
plt.title('Some extension of Receiver operating characteristic to multi-class')  
plt.legend(loc="lower right")  
plt.show()  

Salvaremos los datos del histórico de entrenamiento para compararlos con otros modelos. Además, vamos a salvar el modelo con los pesos entrenados para usarlos en el futuro.

In [None]:
#Modelo
custom_resnet50_model.save('crn50.h5')

#Histórico
with open('crn50_history.txt', 'wb') as file_pi:  
  pickle.dump(crn50.history, file_pi)

A continuación, vamos a comparar las métricas con los modelos anteriores (obviaremos el código que carga los datos de dichos modelos).

In [None]:
plt.figure(0)  
plt.plot(snn.history['val_acc'],'r')  
plt.plot(scnn.history['val_acc'],'g')  
plt.plot(vgg16.history['val_acc'],'b')  
plt.plot(vgg19.history['val_acc'],'y')  
plt.plot(vgg16Bis.history['val_acc'],'m')  
plt.plot(crn50.history['val_acc'],'gold')  
plt.xticks(np.arange(0, 11, 2.0))  
plt.rcParams['figure.figsize'] = (8, 6)  
plt.xlabel("Num of Epochs")  
plt.ylabel("Accuracy")  
plt.title("Simple NN Accuracy vs simple CNN Accuracy")  
plt.legend(['simple NN','CNN','VGG 16','VGG 19','Custom VGG','Custom ResNet'])  

In [None]:
plt.figure(0)  
plt.plot(snn.history['val_loss'],'r')  
plt.plot(scnn.history['val_loss'],'g')  
plt.plot(vgg16.history['val_loss'],'b')  
plt.plot(vgg19.history['val_loss'],'y')  
plt.plot(vgg16Bis.history['val_loss'],'m')  
plt.plot(crn50.history['val_loss'],'gold')  
plt.xticks(np.arange(0, 11, 2.0))  
plt.rcParams['figure.figsize'] = (8, 6)  
plt.xlabel("Num of Epochs")  
plt.ylabel("Loss")  
plt.title("Simple NN Loss vs simple CNN Loss")  
plt.legend(['simple NN','CNN','VGG 16','VGG 19','Custom VGG','Custom ResNet'])  

In [None]:
plt.figure(0)  
plt.plot(snn.history['val_mean_squared_error'],'r')  
plt.plot(scnn.history['val_mean_squared_error'],'g')  
plt.plot(vgg16.history['val_mean_squared_error'],'b')  
plt.plot(vgg19.history['val_mean_squared_error'],'y')  
plt.plot(vgg16Bis.history['val_mean_squared_error'],'m')  
plt.plot(crn50.history['val_mean_squared_error'],'gold')  
plt.xticks(np.arange(0, 11, 2.0))  
plt.rcParams['figure.figsize'] = (8, 6)  
plt.xlabel("Num of Epochs")  
plt.ylabel("Mean Squared Error")  
plt.title("Simple NN MSE vs simple CNN MSE")  
plt.legend(['simple NN','CNN','VGG 16','VGG 19','Custom VGG','Custom ResNet'])  

Conclusión sobre el trabajo realizado
Como se puede ver, la arquitectura marca un punto de inflexión. No sólo porque sea de los mejores resultados que las anteriores arquitecturas, sino también en los tiempos de entrenamiento, ya que permite aumentar las capas con un tiempo aceptable; y también en el número de parámetros, que se ha reducido considerablemente respecto a la arquitectura VGG.