<a href="https://colab.research.google.com/github/vicentcamison/idal_ia3/blob/main/2%20Aprendizaje%20profundo%20(I)/Sesion%204/nodulos_pulmon_EJERCICIO.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<font color="#CA0032"><h1 align="left">**Redes convolucionales (CNNs)**</h1></font>

<font color="#6E6E6E"><h1 align="left">**Predicción de malignidad en nódulos pulmonares**</h1></font>

<h2 align="left">Manuel Sánchez-Montañés</h2>

<font color="#6E6E6E"><h2 align="left">manuel.smontanes@gmail.com</h2></font>

In [None]:
COLAB = False

In [None]:
# Base de datos y kernel de lectura y visualización de la base de datos obtenidos de:
#
# https://www.kaggle.com/kmader/lungnodemalignancy

import cv2 # pip install opencv-python
import os
import numpy as np
from tqdm import tqdm # pip install tqdm
import matplotlib.pyplot as plt
#from skimage.util.montage import montage2d # pip install scikit-image
import h5py
import urllib
from sklearn.metrics import confusion_matrix

%matplotlib inline

## **Carga de datos**

In [None]:
if COLAB:
    from google_drive_downloader import GoogleDriveDownloader as gdd
    gdd.download_file_from_google_drive(file_id='1Ai3qxn-N-9T5wTblXNgVTIHlvwrvIXXg',
                                        dest_path='./all_patches.hdf5')

with h5py.File('all_patches.hdf5', 'r') as luna_h5:
    all_slices  = luna_h5['ct_slices'].value
    all_classes = luna_h5['slice_class'].value
    print('shape of all_slices:', all_slices.shape)
    print('shape of classes:   ', all_classes.shape)

In [None]:
!ls

In [None]:
np.unique(all_classes)

In [None]:
def mi_montage2d(data):
    n = len(data)
    lado = int(np.ceil(np.sqrt(n)))
    alto1, ancho1 = data.shape[1:]
    aux = np.zeros((lado*alto1, lado*ancho1))
    for i,image in enumerate(data):
        row = i // lado
        col = i % lado
        aux[(row*alto1):((row+1)*alto1), (col*ancho1):((col+1)*ancho1)] = image
    return aux

In [None]:
def draw_borders(ax, ntiles, tile_width, tile_height, color='r'):
    
    aux1 = int(np.ceil(np.sqrt(ntiles)))
    
    npixels_y = tile_height*aux1
    for i in range(aux1-1):
        aux2 = (i+1)*tile_width - 0.5
        ax.plot([aux2, aux2], [0, npixels_y - 1], color)
        
    npixels_x = tile_width*aux1
    for i in range(aux1-1):
        aux2 = (i+1)*tile_height - 0.5
        ax.plot([0, npixels_x - 1], [aux2, aux2], color)

In [None]:
size = 9
fig, (ax1, ax2) = plt.subplots(1,2,figsize = (16, 8))
plt_args = dict(cmap = 'bone', vmin = -600, vmax = 300)
#plt_args = dict(cmap = 'bwr', vmin = -600, vmax = 300)
ax1.imshow(mi_montage2d(all_slices[np.random.choice(np.where(all_classes==1)[0],size=size)]), **plt_args)
ax1.set_title('some malignant tiles (random sample)')
draw_borders(ax1, size, all_slices.shape[1], all_slices.shape[2], 'r')

ax2.imshow(mi_montage2d(all_slices[np.random.choice(np.where(all_classes==0)[0],size=size)]), **plt_args)
ax2.set_title('some benign tiles (random sample)')
draw_borders(ax2, size, all_slices.shape[1], all_slices.shape[2], 'r');

## **Partición y reformateo de datos**

Partiremos los datos en bases de datos de entrenamiento y validación. Recodificaremos la clase con la técnica one-hot.
Se estandarizarán los datos de la imagen.

In [None]:
all_slices.shape

In [None]:
print(all_slices.min(), all_slices.max(), all_slices.mean(), all_slices.std())

In [None]:
plt.figure(figsize=(10,3))
plt.hist(all_slices.reshape(-1), bins=100) #, density=True)
plt.xlabel('Intensidades de los pixels', fontsize=14)
plt.show()

In [None]:
plt.hist(all_classes)
plt.xlabel('Clases', fontsize=14)
plt.show()

unicos, counts = np.unique(all_classes, return_counts=True)
print(unicos)
print(counts)

In [None]:
prioris_clases = counts / sum(counts)

In [None]:
print(prioris_clases.round(2))

In [None]:
from sklearn.model_selection import train_test_split
from keras.utils.np_utils import to_categorical

#X_vec = (np.expand_dims(all_slices,-1) - np.mean(all_slices))/np.std(all_slices)
X_vec = np.expand_dims(all_slices,-1)
print(all_slices.shape)
X_vec.shape

In [None]:
y_vec = to_categorical(all_classes)
X_train, X_test, y_train, y_test = train_test_split(X_vec, y_vec, 
                                                   train_size = 0.75,
                                                   random_state = 1, 
                                                   stratify = all_classes)

mean_train = np.mean(X_train)
std_train  = np.std(X_train)

X_train = (X_train - mean_train) / std_train
X_test  = (X_test  - mean_train) / std_train

In [None]:
import pickle

with open('mi_fichero.b', 'wb') as f:
    pickle.dump(mean_train, f)
    pickle.dump(std_train, f)

In [None]:
# En explotación:
with open('mi_fichero.b', 'rb') as f:
    a = pickle.load(f)
    b = pickle.load(f)

In [None]:
print(mean_train, std_train)
print(a,b)

In [None]:
plt.figure(figsize=(10,3))
plt.hist(X_train.reshape(-1), bins=100, density=True)
plt.show()

## **Modelo Dummy (baseline)**

In [None]:
np.shape(y_train)

In [None]:
X_tr = np.zeros((np.shape(X_train)[0], np.shape(X_train)[1]*np.shape(X_train)[2]))
for i in range(np.shape(X_train)[0]):
    X_tr[i] = X_train[i].flatten()
X_te = np.zeros((np.shape(X_test)[0], np.shape(X_test)[1]*np.shape(X_test)[2]))
for i in range(np.shape(X_test)[0]):
    X_te[i] = X_test[i].flatten()

print("X_train shape:", np.shape(X_train))
print("X_tr shape:   ", np.shape(X_tr))
print("X_test shape: ", np.shape(X_test))
print("X_te shape:   ", np.shape(X_te))

In [None]:
from sklearn.dummy import DummyClassifier

clf = DummyClassifier(strategy="most_frequent")
clf.fit(X_tr, np.argmax(y_train,1))
print('Train accuracy:', clf.score(X_tr, np.argmax(y_train,1)))
print('Test accuracy :', clf.score(X_te, np.argmax(y_test,1)))

In [None]:
try:
    from scikitplot.metrics import plot_confusion_matrix
except:
    !pip install scikit-plot
    from scikitplot.metrics import plot_confusion_matrix

plot_confusion_matrix(y_test.argmax(1), clf.predict(X_te),
                      x_tick_rotation=60, figsize=(5,5),
                      text_fontsize='large');

In [None]:
from sklearn.metrics import classification_report, roc_curve, auc

print('')
print(classification_report(np.argmax(y_test,1),
                            clf.predict(X_te)))

fpr, tpr, thresholds = roc_curve(np.argmax(y_test, 1), clf.predict_proba(X_te)[:,1])
fig, ax1 = plt.subplots(1,1)
ax1.plot(fpr, tpr, 'r-.', label = 'Simple model (%2.2f)' % auc(fpr, tpr))
ax1.set_xlabel('False Positive Rate')
ax1.set_ylabel('True Positive Rate')
ax1.plot(fpr, fpr, 'b-', label = 'Random Guess')
ax1.legend()
plt.show()

## **Modelo shallow: regresión logística**

In [None]:
X_train.shape

In [None]:
from sklearn.linear_model import LogisticRegression

model2 = LogisticRegression()
model2.fit(X_tr, np.argmax(y_train,1))

In [None]:
print('Train accuracy:', model2.score(X_tr, np.argmax(y_train,1)))
print('Test accuracy :', model2.score(X_te, np.argmax(y_test,1)))

In [None]:
np.argmax(y_test,1)

In [None]:
plot_confusion_matrix(y_test.argmax(1), model2.predict(X_te),
                      x_tick_rotation=60, figsize=(5,5),
                      text_fontsize='large');

In [None]:
from sklearn.metrics import classification_report, roc_curve, auc

print('')
print(classification_report(np.argmax(y_test,1),
                            model2.predict(X_te)))

fpr, tpr, thresholds = roc_curve(np.argmax(y_test, 1), model2.predict_proba(X_te)[:,1])
fig, ax1 = plt.subplots(1,1)
ax1.plot(fpr, tpr, 'r-.', label = 'Simple model (%2.2f)' % auc(fpr, tpr))
ax1.set_xlabel('False Positive Rate')
ax1.set_ylabel('True Positive Rate')
ax1.plot(fpr, fpr, 'b-', label = 'Random Guess')
ax1.legend()
plt.show()

# **Definición de la arquitectura CNN**

Entrenaremos el modelo desde cero. Otra posibilidad es utilizar modelos pre entrenados (transfer learning).

In [None]:
from keras.models import Sequential, load_model, Model
from keras.layers import Dense, Dropout, Flatten, Conv2D, MaxPooling2D, Activation
from keras import backend as K
from keras import regularizers
from keras.losses import binary_crossentropy
from keras import optimizers
from keras.callbacks import ModelCheckpoint

from IPython.display import SVG
from keras.utils.vis_utils import model_to_dot

In [None]:
print("X_train shape:", np.shape(X_train))
print("X_test shape: ", np.shape(X_test))
print("y_train shape:", np.shape(y_train))
print("y_test shape: ", np.shape(y_test))

In [None]:
y_train[:5]

In [None]:
y_test

In [None]:
X_tr.shape

In [None]:
X_train.shape

In [None]:
input_shape = X_train.shape[1:]
input_shape

In [None]:
y_train.shape

In [None]:
num_classes = 2

In [None]:
from keras.layers import AveragePooling2D, Dropout

In [None]:
# Arquitectura de la red
# Rellenar arquitectura:

model = ...

model.compile(loss='categorical_crossentropy',
              metrics=['accuracy'])

In [None]:
model.summary()

In [None]:
# graphviz
from IPython.display import SVG
from keras.utils.vis_utils import model_to_dot
SVG(model_to_dot(model, show_shapes=True, dpi=72
                 ).create(prog='dot', format='svg'))

# **Entrenamiento del modelo CNN**

In [None]:
from matplotlib.ticker import MaxNLocator

def grafica_entrenamiento(tr_acc, val_acc):
    ax=plt.figure(figsize=(10,4)).gca()
    plt.plot(1+np.arange(len(tr_acc)), 100*np.array(tr_acc))
    plt.plot(1+np.arange(len(val_acc)), 100*np.array(val_acc))
    plt.title('tasa de acierto del modelo (%)', fontsize=18)
    plt.ylabel('tasa de acierto (%)', fontsize=18)
    plt.xlabel('epoca', fontsize=18)
    plt.legend(['entrenamiento', 'validacion'], loc='upper left')
    ax.xaxis.set_major_locator(MaxNLocator(integer=True))
    plt.show()

In [None]:
acum_tr_acc = []
acum_val_acc = []

In [None]:
from IPython.display import clear_output

In [None]:
LOAD_MODEL = False

Ntr = 4000

epochs = 100
batch_size = 16

X_tr  = X_train[:Ntr]
y_tr  = y_train[:Ntr]
X_val = X_train[Ntr:]
y_val = y_train[Ntr:]

if not LOAD_MODEL:
    filepath="model_current_best_v2.h5"
    
    checkpoint = ModelCheckpoint(filepath, monitor='val_accuracy', verbose=1,
                                 save_best_only=True,
                                 mode='max') # graba sólo los que mejoran en validación
    callbacks_list = [checkpoint]
    
    for e in range(epochs):
        history = model.fit(X_tr, y_tr,
                            batch_size=batch_size,
                            epochs=1,
                            callbacks=callbacks_list,
                            verbose=1,
                            validation_data=(X_val, y_val))
        
        acum_tr_acc = acum_tr_acc + history.history['accuracy']
        acum_val_acc = acum_val_acc + history.history['val_accuracy']
        
        if len(acum_tr_acc) > 1:
            clear_output()
            grafica_entrenamiento(acum_tr_acc, acum_val_acc)

# **Resultados obtenidos por el modelo CNN**

In [None]:
if LOAD_MODEL:
    if COLAB:
        urllib.request.urlretrieve("https://drive.google.com/uc?export=download&id=1UKpxwZsfPAp0kHywBucwWBtkxYAXy0lq",
                                   "modelo_entrenado.h5")
    model = load_model('./modelo_entrenado.h5')
else:
    model = load_model('model_current_best_v2.h5')

In [None]:
model.summary()

In [None]:
score_tr = model.evaluate(X_tr, y_tr, verbose=0)
print('Train loss    :', score_tr[0])
print('Train accuracy:', score_tr[1])

score_val = model.evaluate(X_val, y_val, verbose=0)
print('Val loss    :', score_val[0])
print('Val accuracy:', score_val[1])

score_te = model.evaluate(X_test, y_test, verbose=0)
print('Test loss     :', score_te[0])
print('Test accuracy :', score_te[1])

In [None]:
model.predict(X_test[:5])

In [None]:
umbral_decision_maligno = 0.5 # 0.4, 0.3, 0.2 ...

In [None]:
alarmas = 1*(model.predict(X_test)[:,1]>umbral_decision_maligno)
alarmas

In [None]:
clases_test = y_test.argmax(1) # clase real test

In [None]:
confusion_matrix(clases_test, alarmas)

In [None]:
# en el siguiente cálculo y en adelante es equivalente
# a asumir que umbral_decision_maligno = 0.5
plot_confusion_matrix(y_test.argmax(1), model.predict(X_test).argmax(1),
                      x_tick_rotation=60, figsize=(5,5),
                      text_fontsize='large');

In [None]:
y_pred_proba = model.predict(X_test)
y_pred = np.argmax(y_pred_proba,1)
print('')
print(classification_report(clases_test, y_pred))

In [None]:
fpr, tpr, thresholds = roc_curve(clases_test, y_pred_proba[:,1])
fig, ax1 = plt.subplots(1,1)
ax1.plot(fpr, tpr, 'r-.', label = 'CNN (%2.2f)' % auc(fpr, tpr))
ax1.set_xlabel('False Positive Rate')
ax1.set_ylabel('True Positive Rate')
ax1.plot(fpr, fpr, 'b-', label = 'Random Guess')
ax1.legend();

# **Análisis de los pesos de la primera capa**

In [None]:
weights = model.get_weights()
print(np.shape(weights))
for i in range(len(weights)):
    print('shape of weights[%d]: ' % i, np.shape(weights[i]))

In [None]:
weights[0][:,:,0,0]

In [None]:
# kernels de la primera capa

nfilters = weights[0].shape[3]
lado = int(np.ceil(np.sqrt(nfilters)))
plt.subplots(lado,lado,figsize = (12, 15))

ma = abs(weights[0]).max()

for i in range(nfilters):
    kernel = weights[0][:,:,0,i]
    plt.subplot(lado,lado,i+1)
    plt.imshow(kernel, vmin=-ma, vmax=ma, cmap='bwr')
    plt.title('kernel %d' % i)


In [None]:
capa = 0 # 4
intermediate_layer_model = Model(inputs=model.input,
                                 outputs=model.layers[capa].output)
intermediate_layer_model.summary()
SVG(model_to_dot(intermediate_layer_model,show_shapes=True,dpi=72).create(prog='dot', format='svg'))

In [None]:
id_imagen = 1
imagen = X_test[id_imagen,:,:,0]
salidas_capa0 = intermediate_layer_model.predict(imagen.reshape([1,imagen.shape[0],
                                                                 imagen.shape[1],1]))

In [None]:
salidas_capa0.shape

In [None]:
plt.figure(figsize=(2,2))
plt.imshow(imagen, cmap='gray')
plt.title('input image', size=12)
plt.xticks(())
plt.yticks(())
plt.show()

plt.subplots(lado,lado,figsize = (12, 15))

ma = abs(salidas_capa0).max()

for i in range(salidas_capa0.shape[-1]):
    plt.subplot(lado,lado,i+1)
    plt.imshow(salidas_capa0[0,:,:,i], vmin=-ma, vmax=ma, cmap='bwr')
    plt.title('salida kernel %d' % i, fontsize=10)
plt.show()

### **¿A qué partes de la imagen de entrada es más sensible la salida de la red?**

### **GradCam:**

(de https://medium.com/analytics-vidhya/visualizing-activation-heatmaps-using-tensorflow-5bdba018f759)

1- Calcular para una imagen la salida del modelo y la salida de la última capa convolucional

2- Encuentrar la neurona de salida más activa (que es la que determina la clase predicha)

3- Calcular el gradiente de dicha neurona de salida con respecto a la última capa convolucional

3- Promediar y pesar esto con la salida de la última capa convolucional

4- Normalizar entre 0 y 1 para visualizar

5- Convertir a RGB y superponerla a la imagen original

In [None]:
def find_ind_last_conv2D(model):
    ind_last_conv2D_layer = None
    for i,x in enumerate(model.layers):
        if x.__class__.__name__ == "Conv2D":
            ind_last_conv2D_layer = i
    return ind_last_conv2D_layer

In [None]:
ind_last_conv2D_layer = find_ind_last_conv2D(model)
ind_last_conv2D_layer

In [None]:
model.layers[ind_last_conv2D_layer]

In [None]:
import tensorflow as tf

In [None]:
def show_heatmap(model, im, es_maligna):
    imag = np.expand_dims(im, axis=0) # de 1 imagen pasamos a 1 conjunto de 1 imagen
        
    # The is the output feature map of the last convolutional layer
    last_conv_layer = model.layers[find_ind_last_conv2D(model)]
    
    # This is the gradient of the "benign" class with regard to
    # the output feature map of last convolutional layer
    with tf.GradientTape() as tape:
        iterate = tf.keras.models.Model([model.inputs], [model.output, last_conv_layer.output])
        model_out, last_conv_layer = iterate(imag)
        class_out = model_out[:, np.argmax(model_out[0])]
        grads = tape.gradient(class_out, last_conv_layer)

        # mean intensity of the gradient over a specific feature map channel:
        pooled_grads = K.mean(grads, axis=(0, 1, 2))
    
    heatmap = tf.reduce_mean(tf.multiply(pooled_grads, last_conv_layer), axis=-1)    
    heatmap = np.maximum(heatmap, 0) # se quitan los negativos (se ponen a 0)
    heatmap /= np.max(heatmap) # se normaliza entre 0 y 1
    heatmap = heatmap[0] # pasamos de 1 conjunto de 1 heatmap a 1 heatmap
        
    
    # We use cv2 to load the original image
    #img = cv2.imread(img_path)
    img = imag[0]
    
    img = np.zeros((im.shape[0],im.shape[1],3))
#    print(im.shape, imag.shape)
    for i in range(3):
        img[:,:,i] = imag[0,:,:,0]

    
    # We resize the heatmap to have the same size as the original image
    heatmap = cv2.resize(heatmap, (img.shape[1], img.shape[0]))
    
    # We convert the heatmap to RGB
    heatmap = np.uint8(255 * heatmap)
    
    # We apply the heatmap to the original image
    #heatmap = cv2.applyColorMap(heatmap, cv2.COLORMAP_JET)
    heatmap = cv2.applyColorMap(heatmap, cv2.COLORMAP_HOT)
    
    img = np.maximum(img, -2)
    img = np.minimum(img,  2)
    img = (img+2)/4;
    # 0.4 here is a heatmap intensity factor
    superimposed_img = heatmap * 0.2 / 255 + 0.8*img
    #print(heatmap.min(), heatmap.max(), heatmap.mean(), heatmap.std())
    #print(img.min(), img.max(), img.mean(), img.std())
    #print(superimposed_img.min(),  superimposed_img.max(),
    #      superimposed_img.mean(), superimposed_img.std())
    
    plt.figure(figsize=(15,5))
    plt.subplot(1,3,1)
    plt.imshow(img, vmin=0, vmax=1)
    plt.subplot(1,3,2)
    plt.imshow(heatmap, vmin=0, vmax=1)
    plt.colorbar()
    plt.subplot(1,3,3)
    plt.imshow(superimposed_img, vmin=0, vmax=1)
    plt.show()
    #print(np.shape(imag))
    print("- Probabilidad clase maligna:", model.predict(imag)[0][1])
    print("-", "Clase real:", "maligna" if es_maligna else "benigna")
    print("\n\n\n")
    return heatmap, superimposed_img

In [None]:
ind = 0
for i in range(ind, ind+10):
    heat_map, superimposed_img = show_heatmap(model, X_test[i], y_test[i,1])