<h1>Modelo con transferencia de aprendizaje (VGG16) para OCT macular</h1>

# Importación de librerías

In [None]:
# Importación de librerias
import numpy as np
import pandas as pd
import random
import sys
import cv2 as cv
import os
import re
import shutil
from shutil import rmtree
import keras
from keras.models import load_model
from keras.metrics import SensitivityAtSpecificity, Precision, Recall, SpecificityAtSensitivity
from keras.preprocessing.image import ImageDataGenerator
from keras import optimizers
from keras.models import Sequential
from keras.layers import Dropout, Flatten, Dense, Activation
from keras.layers import Convolution2D, MaxPooling2D
from keras import backend as K
from keras import applications
from sklearn.metrics import confusion_matrix
import itertools
from sklearn.metrics import classification_report
import matplotlib.pyplot as plt
import seaborn as sns

# Listado de imagenes de cada directorio

In [None]:
files_namesCNV = os.listdir("UNIDA/CNV")
files_namesDME = os.listdir("UNIDA/DME")
files_namesDRUSEN = os.listdir("UNIDA/DRUSEN")
files_namesNORMAL = os.listdir("UNIDA/NORMAL")

# Creación de arreglo con los listados de imagenes de cada clase

In [None]:
lectura = np.array([files_namesCNV,files_namesDME,files_namesDRUSEN,files_namesNORMAL])

# Rutas de conjuntos

In [None]:
rutain = ['UNIDA/CNV','UNIDA/DME','UNIDA/DRUSEN','UNIDA/NORMAL']
rutaout = ['train/CNV','train/DME','train/DRUSEN','train/NORMAL']
rutaoutVal = ['val/CNV','val/DME','val/DRUSEN','val/NORMAL']
rutaoutTest = ['test/CNV','test/DME','test/DRUSEN','test/NORMAL']

# Lista de variables para CNN y Kfold cross valitadion


In [None]:
epocas = 55
longitud, altura = 496, 496
batch_size = 32
filtrosConv1 = 256
tamano_filtro1 = (3, 3)
tamano_pool = (2, 2)
clases = 4
lr = 0.0001
num_folds = 5

# Función de generación números aleatorios

In [None]:
def semilla(total):
    f=list(range(total))
    orden=random.sample(f,total)
    return orden

# Ruta de directorios validación cruzada

In [None]:
# Cargue de las imagenes..Ruta
rutatrain = ['K1/train/',
        'K2/train/',
        'K3/train/',
        'K4/train/',
        'K5/train/']

rutaval = ['K1/val/',
        'K2/val/',
        'K3/val/',
        'K4/val/',
        'K5/val/']

rutatest = ['K1/test/',
        'K2/test/',
        'K3/test/',
        'K4/test/',
        'K5/test/']

# Activación de parámetro de multiproceso

In [None]:
import multiprocessing
manager = multiprocessing.Manager()
queue = manager.Queue(maxsize=80000)

# Creación de vectores para acumular las métricas

In [None]:
# Vectores de acumulación
acc_per_fold = []
loss_per_fold = []
se_per_fold = []
sp_per_fold = []
rc_per_fold = []
pc_per_fold = []

# Creación de modelo de transferencia de aprendizaje

In [None]:
# Generación del modelo
def modelo():
    vgg=applications.vgg16.VGG16(weights='imagenet',
                    include_top=False,
                    input_shape=(496, 496, 3)) # carga el modelo VGG16 en la variable vgg
    cnn=Sequential()               # crea un modelo vacio del tipo secuencial
    for layer in vgg.layers:       # Transfiere las capas de vgg al modelo secuencial vacio cnn
        cnn.add(layer)
    cnn.pop()                      # Elimina la última capa de predicción
    for layer in cnn.layers:       # Recorre y hace que no vuelvan a entrenar
        layer.trainable=False
      # Se adicionan capas no entrenadas
    cnn.add(MaxPooling2D(pool_size=tamano_pool))
    cnn.add(Convolution2D(filtrosConv1, tamano_filtro1, padding ="same", activation='relu'))
    cnn.add(Convolution2D(filtrosConv1, tamano_filtro1, padding ="same", activation='relu'))
    cnn.add(Convolution2D(filtrosConv1, tamano_filtro1, padding ="same", activation='relu'))
    cnn.add(MaxPooling2D(pool_size=tamano_pool))
    cnn.add(Flatten())
    cnn.add(Dense(256, activation='relu'))
    cnn.add(Dropout(0.5))
    cnn.add(Dense(clases, activation='softmax')) # agrega capa de 4 neuronas para las 4 clases a clasificar

    return cnn

K.clear_session()

# visualización del modelo y sus parámetros

In [None]:
cnn=modelo()
cnn.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 block1_conv1 (Conv2D)       (None, 496, 496, 64)      1792      
                                                                 
 block1_conv2 (Conv2D)       (None, 496, 496, 64)      36928     
                                                                 
 block1_pool (MaxPooling2D)  (None, 248, 248, 64)      0         
                                                                 
 block2_conv1 (Conv2D)       (None, 248, 248, 128)     73856     
                                                                 
 block2_conv2 (Conv2D)       (None, 248, 248, 128)     147584    
                                                                 
 block2_pool (MaxPooling2D)  (None, 124, 124, 128)     0         
                                                                 
 block3_conv1 (Conv2D)       (None, 124, 124, 256)     2

# Funciones de creación y eliminación de directorios

In [None]:
def eliminar(directorio):
    estado=os.path.exists(directorio)
    if estado == True:
        rmtree(directorio)

def crear(Rct,Rcv,Rcp):
    Rt=Rct
    Rv=Rcv
    Rp=Rcp
    for i in range(0,5):
        os.makedirs(Rt[i]+'/CNV', exist_ok = True)
        os.makedirs(Rt[i]+'/DME', exist_ok = True)
        os.makedirs(Rt[i]+'/DRUSEN', exist_ok = True)
        os.makedirs(Rt[i]+'/NORMAL', exist_ok = True)
        os.makedirs(Rv[i]+'/CNV', exist_ok = True)
        os.makedirs(Rv[i]+'/DME', exist_ok = True)
        os.makedirs(Rv[i]+'/DRUSEN', exist_ok = True)
        os.makedirs(Rv[i]+'/NORMAL', exist_ok = True)
        os.makedirs(Rp[i]+'/CNV', exist_ok = True)
        os.makedirs(Rp[i]+'/DME', exist_ok = True)
        os.makedirs(Rp[i]+'/DRUSEN', exist_ok = True)
        os.makedirs(Rp[i]+'/NORMAL', exist_ok = True)

# Arraglo para ruta de k-fold

In [None]:
rutak=["K1","K2","K3","K4","K5"]

# Función para graficar matriz de confusión

In [None]:
def plot_confusion_matrix(cm,
                          target_names,
                          title='Confusion matrix',
                          cmap=None,
                          normalize=True):
    """
    given a sklearn confusion matrix (cm), make a nice plot

    Arguments
    ---------
    cm:           confusion matrix from sklearn.metrics.confusion_matrix

    target_names: given classification classes such as [0, 1, 2]
                  the class names, for example: ['high', 'medium', 'low']

    title:        the text to display at the top of the matrix

    cmap:         the gradient of the values displayed from matplotlib.pyplot.cm
                  see http://matplotlib.org/examples/color/colormaps_reference.html
                  plt.get_cmap('jet') or plt.cm.Blues

    normalize:    If False, plot the raw numbers
                  If True, plot the proportions

    Usage
    -----
    plot_confusion_matrix(cm           = cm,                  # confusion matrix created by
                                                              # sklearn.metrics.confusion_matrix
                          normalize    = True,                # show proportions
                          target_names = y_labels_vals,       # list of names of the classes
                          title        = best_estimator_name) # title of graph

    Citiation
    ---------
    http://scikit-learn.org/stable/auto_examples/model_selection/plot_confusion_matrix.html

    """
    import matplotlib.pyplot as plt
    import numpy as np
    import itertools

    accuracy = np.trace(cm) / float(np.sum(cm))
    misclass = 1 - accuracy

    if cmap is None:
        cmap = plt.get_cmap('Blues')

    plt.figure(figsize=(8, 6))
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()

    if target_names is not None:
        tick_marks = np.arange(len(target_names))
        plt.xticks(tick_marks, target_names, rotation=45)
        plt.yticks(tick_marks, target_names)

    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]


    thresh = cm.max() / 1.5 if normalize else cm.max() / 2
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        if normalize:
            plt.text(j, i, "{:0.4f}".format(cm[i, j]),
                     horizontalalignment="center",
                     color="white" if cm[i, j] > thresh else "black")
        else:
            plt.text(j, i, "{:,}".format(cm[i, j]),
                     horizontalalignment="center",
                     color="white" if cm[i, j] > thresh else "black")


    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label\naccuracy={:0.4f}; misclass={:0.4f}'.format(accuracy, misclass))
    plt.show()

# Montecarlo y K-fold

In [None]:
# Montecarlo
for m in range(1,11):
    aleatorios=semilla(8616)

    kf1=aleatorios[0:1551]
    kf2=aleatorios[1551:3102]
    kf3=aleatorios[3102:4653]
    kf4=aleatorios[4653:6204]
    kf5=aleatorios[6204:7755]
    K=np.array([kf1,kf2,kf3,kf4,kf5])
    t1=aleatorios[1551:7755]
    t2=np.concatenate((kf1, aleatorios[3102:7755]))
    t3=np.concatenate((aleatorios[0:3102], aleatorios[4653:7755]))
    t4=np.concatenate((aleatorios[0:4653], aleatorios[6204:7755]))
    t5=aleatorios[0:6204]
    tk=np.array([t1,t2,t3,t4,t5])

    # Eliminación y creación de directorios para k-fold
    for g in range(0,5):
        eliminar(rutak[g])
    crear(rutatrain,rutaval,rutatest)

    # Creación de conjunto de entrenamiento, validación y prueba
    for j in range(len(rutain)):
        files_names=lectura[j]
        for p in range(0,5):
                for t in K[p]:
                    image_path = str(rutain[j]) + "/" + str(files_names[t])
                    nombre=str(files_names[t]).rsplit('.', 1)[0]
                    outputk1="K"+str(p+1)+"/"+rutaoutVal[j]+"/"+nombre+".jpeg"
                    imagen = cv.imread(image_path)
                    cv.imwrite(outputk1,imagen)
                for h in tk[p]:
                    image_path = str(rutain[j]) + "/" + str(files_names[aleatorios[h]])
                    nombre=str(files_names[aleatorios[h]]).rsplit('.', 1)[0]
                    outputTrain="K"+str(p+1)+"/"+rutaout[j]+"/"+nombre+".jpeg"
                    imagen = cv.imread(image_path)
                    cv.imwrite(outputTrain,imagen)
                for k in range(round(len(aleatorios)*0.9),len(aleatorios)):
                    image_path = str(rutain[j]) + "/" + str(files_names[aleatorios[k]])
                    nombre=str(files_names[aleatorios[k]]).rsplit('.', 1)[0]
                    outputTest="K"+str(p+1)+"/"+rutaoutTest[j]+"/"+nombre+".jpeg"
                    imagen = cv.imread(image_path)
                    cv.imwrite(outputTest,imagen)
        print("clase "+str(j) +" terminada")
    print("Conjuntos train, val y test fueron creados")

    # K-fold Cross Validation
    ruta=0
    for fold_no in range(1,num_folds+1):

        #Preparamos nuestras imagenes de entrenamiento con diferentes transformaciones para generalizar el aprendizaje
        entrenamiento_datagen = ImageDataGenerator(
            rescale=1. / 255,
            shear_range=0.2,
            zoom_range=0.2,
            horizontal_flip=True)

        #Preparamos nuestras imagenes de validación
        valid_datagen = ImageDataGenerator(rescale=1. / 255)

        for i in range(1,3):
            #Se leen las imagenes de entrenamiento del directorio
            entrenamiento_generador = entrenamiento_datagen.flow_from_directory(
                rutatrain[ruta],
                target_size=(altura, longitud),
                batch_size=batch_size,
                class_mode='categorical')

            #Se leen las imagenes de validación del directorio
            validacion_generador = valid_datagen.flow_from_directory(
                rutaval[ruta],
                target_size=(altura, longitud),
                batch_size=batch_size,
                class_mode='categorical')

        #Se generan los pasos de entrenamiento y validación
        pasos = entrenamiento_generador.n // (entrenamiento_generador.batch_size*7)
        validation_steps = validacion_generador.n // (validacion_generador.batch_size*5)

        #Se compila el modelo
        cnn=modelo()
        #cnn.summary()
        cnn.compile(optimizer = 'adam',
        loss='categorical_crossentropy',
        metrics=['accuracy','mse',Recall(),Precision()])


        # Genera impresión de cada iteración de k fold
        print('------------------------------------------------------------------------')
        print(f'Entrenamiento para K-fold {fold_no} ...')

        # Entrenamiento del modelo
        history = cnn.fit(
            entrenamiento_generador,
            steps_per_epoch=pasos,
            epochs=epocas,
            validation_data=validacion_generador,
            validation_steps=validation_steps)

        # Guarda el modelo y pesos entrenados en una carpeta por cada iteración
        target_dir = 'modelos/'+str(m)+"/K"+str(fold_no)
        if not os.path.exists(target_dir):
            os.mkdir(target_dir)
            cnn.save(target_dir+'/K'+str(fold_no)+'.h5')
            cnn.save_weights(target_dir+'/Kp'+str(fold_no)+'.h5')


        # Generación de métricas
        scores = cnn.evaluate(validacion_generador, steps=194, verbose=2, callbacks=None)
        print(f'Score para fold {fold_no}: {cnn.metrics_names[0]} of {scores[0]}; {cnn.metrics_names[1]} of {scores[1]*100}%; {cnn.metrics_names[3]} of {scores[3]*100}%; {cnn.metrics_names[4]} of {scores[4]*100}%')
        acc_per_fold.append(scores[1] * 100)
        loss_per_fold.append(scores[0])
        se_per_fold.append(scores[3])
        sp_per_fold.append(scores[4])

        #Confution Matrix and Classification Report
        Y_pred = cnn.predict(validacion_generador)
        y_pred = np.argmax(Y_pred, axis=1)

        print('******** Confusion Matrix para K-fold '+str(fold_no)+ '********')
        cm=confusion_matrix(validacion_generador.classes, y_pred)
        print(cm)
        print('Classification Report')
        target_names = ['CNV', 'DME', 'DRUSSEN', 'NORMAL']
        print(classification_report(validacion_generador.classes, y_pred, target_names=target_names))

        plot_confusion_matrix(cm,
        normalize    = False,
        target_names = ['CNV', 'DME', 'DRUSSEN', 'NORMAL'],
        title        = "Confusion Matrix")
        plt.savefig(target_dir+'/cm'+str(fold_no)+'.svg')

        # Visualización history o proceso de entrenamiento

        # Plot history: Loss
        plt.figure(figsize=(15,7))
        plt.subplot(121)
        plt.plot(history.history['loss'])
        plt.plot(history.history['val_loss'])
        plt.title('Validation loss history')
        plt.ylabel('Loss value')
        plt.xlabel('No. epoch')
        plt.legend(['loss', 'val_loss'], loc='upper right')
        plt.grid(linestyle=':')

        # Plot history: Accuracy
        plt.subplot(122)
        plt.plot(history.history['accuracy'])
        plt.plot(history.history['val_accuracy'])
        plt.title('Validation accuracy history')
        plt.ylabel('Accuracy value')
        plt.xlabel('No. epoch')
        plt.legend(['accuracy', 'Val_accuracy'], loc='upper left')
        plt.grid(linestyle=':')
        plt.show()
        plt.savefig(target_dir+"/TrainVsVal"+str(fold_no)+".svg", format="svg")


        # Plot history: loss vs Accuracy
        plt.figure(figsize=(8, 7))
        plt.plot(history.history['val_loss'])
        plt.plot(history.history['val_accuracy'])
        plt.title('Validation loss and accuracy history')
        plt.xlabel('No. epoch')
        plt.ylabel('Loss value')
        plt.legend(['val_loss', 'val_accuracy'], loc='best')
        plt.grid(linestyle=':')
        plt.twinx()
        plt.ylabel('Accuracy value')
        plt.show()
        plt.savefig(target_dir+"/ValVsVal"+str(fold_no)+".svg", format="svg")


        #Preparamos nuestras imagenes de test
        test_datagen = ImageDataGenerator(rescale=1. / 255)
        #Se leen las imagenes de test del directorio
        test_generador = test_datagen.flow_from_directory(
            data_test,
            target_size=(altura, longitud),
            batch_size=batch_size,
            class_mode='categorical',
            shuffle=False)

        #predicción conjunto test
        t_pred = cnn.predict(test_generador)
        t_pred = np.argmax(t_pred, axis=1)

        print('Confusion Matrix')
        cm=confusion_matrix(test_generador.classes, t_pred)
        print(cm)
        print('Classification Report')
        target_names = ['CNV', 'DME', 'DRUSSEN', 'NORMAL']
        print(classification_report(test_generador.classes, t_pred, target_names=target_names))

        plot_confusion_matrix(cm,
        normalize    = False,
        target_names = ['CNV', 'DME', 'DRUSSEN', 'NORMAL'],
        title        = "Confusion Matrix")
        plt.savefig(target_dir+"/cmtest"+str(fold_no)+".svg", format="svg")


    # Promedio de los score
    print('------------------------------------------------------------------------')
    print('Score for fold')
    for i in range(0, len(acc_per_fold)-1):
        print('------------------------------------------------------------------------')
        print(f'> Fold {i+1} - Loss: {loss_per_fold[i]} - Accuracy: {acc_per_fold[i]}% - Recall: {rc_per_fold[i]}% - Precision: {pc_per_fold[i]}%')
        print('------------------------------------------------------------------------')

    # Tabla de los promedios de métricas de cada iteración de la validación
    print('Average scores for all folds:')
    print(f'> Accuracy: {np.mean(acc_per_fold)} (+- {np.std(acc_per_fold)})')
    print(f'> Loss: {np.mean(loss_per_fold)}')
    print(f'> Recall: {np.mean(rc_per_fold)} (+- {np.std(rc_per_fold)})')
    print(f'> Precision: {np.mean(pc_per_fold)} (+- {np.std(pc_per_fold)})')
    print('------------------------------------------------------------------------')
    print('Montecarlo '+str(m))

print("Entrenamiento y prueba mediante montecarlo=10 y validación cruzada=5 finalizó")


# Cargue del modelo para predicción independinete del montecarlo (prueba)

In [None]:
# Carga modelo
modelo = '/content/Drive/My Drive/IA/Proyectos/modelo/KV7/K2.h5'
pesos_modelo = '/content/Drive/My Drive/IA/Proyectos/modelo/KV7/Kp2.h5'
cnn = load_model(modelo)
cnn.load_weights(pesos_modelo)
#cnn.summary()

# Ruta de imagenes de test

In [None]:
data_test = '/content/Drive/My Drive/IA/Proyectos/test/'

# Preparación de imágenes de test

In [None]:
#Preparamos nuestras imagenes de test
test_datagen = ImageDataGenerator(rescale=1. / 255)
#Se leen las imagenes de test del directorio
test_generador = test_datagen.flow_from_directory(
  data_test,
  target_size=(altura, longitud),
  batch_size=batch_size,
  class_mode='categorical',
  shuffle=False)

Found 3448 images belonging to 4 classes.


# Cálculo de la predicción con conjunto test

In [None]:
#predicción conjunto test
t_pred = cnn.predict(test_generador)
t_pred = np.argmax(t_pred, axis=1)

# Matriz de confusión y reporte de métricas

In [None]:
print('Confusion Matrix')
cm=confusion_matrix(test_generador.classes, t_pred)
print(cm)
print('Classification Report')
target_names = ['CNV', 'DME', 'DRUSSEN', 'NORMAL']
print(classification_report(test_generador.classes, t_pred, target_names=target_names))

# Gráfica de la matriz de confusión

In [None]:
plot_confusion_matrix(cm,
normalize    = False,
target_names = ['CNV', 'DME', 'DRUSSEN', 'NORMAL'],
title        = "Confusion Matrix")
plt.savefig('/content/Drive/My Drive/IA/Proyectos/modelo/cmtest.svg')