<a href="https://colab.research.google.com/github/rrfsantos/Projeto-Redes-Neurais-OCT-Images/blob/main/Convolutional_OCT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Convolutional Neural Network, Data Augmentation, Transfer Learning
### Task: Automated methods to detect and classify human diseases from medical images.
### Dataset:  Labeled Optical Coherence Tomography (OCT) Images for Classification - Kermany, Daniel; Zhang, Kang; Goldbaum, Michael (2018), “Labeled Optical Coherence Tomography (OCT) and Chest X-Ray Images for Classification”, Mendeley Data, v2 http://dx.doi.org/10.17632/rscbjbr9sj.2

In [None]:
import pandas as pd
import numpy as np
import tensorflow as tf
from imutils import paths
import cv2
import glob
import matplotlib.pyplot as plt
from collections import Counter
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import VGG16
from tensorflow.keras.applications import Xception
from tensorflow.keras.layers import AveragePooling2D
from tensorflow.keras.layers import Dropout
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Input
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from keras.utils import plot_model
from keras.utils import to_categorical
from keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint

## Visualização dos os dados


In [None]:
#!pip install -q kaggle

In [None]:
#from google.colab import files
#files.upload()

In [None]:
#!mkdir -p ~/.kaggle
#!cp kaggle.json ~/.kaggle/
#!chmod 600 ~/.kaggle/kaggle.json

In [None]:
#!kaggle datasets download -d paultimothymooney/kermany2018 -p /content

In [None]:
!unzip /content/kermany2018.zip

In [None]:
#import shutil
#shutil.rmtree('/content/oct2017')

### Visualização de 4 imagens de cada classe do dataset

In [None]:
train_path = '/content/OCT2017/train/'
test_path = '/content/OCT2017/test/'

classes = ['CNV', 'DME', 'DRUSEN', 'NORMAL']

cnv_images = glob.glob(train_path + 'CNV/*', recursive=True)
dme_images = glob.glob(train_path + 'DME/*',recursive=True)
drusen_images = glob.glob(train_path + 'DRUSEN/*',recursive=True)
normal_images = glob.glob(train_path + 'NORMAL/*',recursive=True)

images = cnv_images[:4] + dme_images[:4] + drusen_images[:4] + normal_images[:4]

In [None]:
fig=plt.figure(figsize=(15, 15))
columns = 4
rows = 4
for i in range(columns*rows):
    img = plt.imread(images[i])
    ax = fig.add_subplot(rows, columns, i+1)
    if i%4==0:
        plt.ylabel(classes[int(i/4)], fontsize=16)
    plt.imshow(img, cmap='jet')
plt.show()

### Verificação da distribuição das classes no dataset --> O dataset está desbalanceado!

In [None]:
total_cnv_samples = len(cnv_images)
total_dme_samples = len(dme_images)
total_drusen_samples = len(drusen_images)
total_normal_samples = len(normal_images)

sample_distribution = [total_cnv_samples, total_dme_samples, total_drusen_samples, total_normal_samples]

fig = plt.figure()
ax = fig.add_axes([0,0,1,1])
ax.bar(classes, sample_distribution)
plt.show()

## Data Augmentation

In [None]:
batchSize = 32
width = 150
height = 150
init_lr = 1e-3
epochs = 50

In [None]:
trainDataGen = ImageDataGenerator(
    rescale=1./255,
    rotation_range=9,
    zoom_range=0.2,
    width_shift_range=0.2,
    height_shift_range=0.2,
    validation_split=0.1
)

trainGenerator = trainDataGen.flow_from_directory(
    train_path,
    target_size=(width, height),
    batch_size=batchSize,
    class_mode='categorical',
    shuffle=True,
    subset='training'
)

validationGenerator = trainDataGen.flow_from_directory(
    train_path,
    target_size=(width, height),
    batch_size=batchSize,
    class_mode='categorical',
    shuffle=False,
    subset='validation'
)

In [None]:
# Balanceamento do dataset - não aumenta as amostras, mas atribui pesos a cada classe para evitar qualquer viés por meio de dados não balanceados (class_weights), os pesos são passados para o model.fit
counter = Counter(trainGenerator.classes)                          
max_val = float(max(counter.values()))  
class_weights = {class_id : max_val/num_images for class_id, num_images in counter.items()}                     

weights = np.fromiter(class_weights.values(), dtype=float)
fig = plt.figure()
ax = fig.add_axes([0,0,1,1])
ax.bar(classes, sample_distribution*weights) # Distribuição das classes com o ajuste dos pesos (class_weights)
plt.show()


# VGG16

## Construção do modelo

In [None]:
# Remover as camadas FC (Full Connected) do modelo VGG-16 pré-treinado - carregar a rede VGG16,
#certificando-se de que os conjuntos de camadas principais Full Connected (FC) sejam deixados de fora
baseModel = VGG16(weights="imagenet", include_top=False, input_tensor=Input(shape=(width, height, 3)))

# Construir as camadas Full Connected (FC) "top" do modelo, que substituirão a base da VGG16
headModel = baseModel.output
headModel = AveragePooling2D(pool_size=(4, 4))(headModel)
headModel = Flatten(name="flatten")(headModel)
headModel = Dense(512, activation="relu")(headModel)
headModel = Dropout(0.5)(headModel)
headModel = Dense(64, activation="relu")(headModel)
headModel = Dropout(0.5)(headModel)
headModel = Dense(4, activation="softmax")(headModel) # quatro categorias

# Construir o modelo CNN
modelV = Model(inputs=baseModel.input, outputs=headModel)

# Faz um loop sobre todas as camadas no modelo base e as congela para que * não * sejam atualizadas durante o primeiro processo de treinamento
for layer in baseModel.layers:
    layer.trainable = False

# Compilar o modelo
opt = Adam(lr=init_lr, decay=init_lr / epochs)
modelV.compile(loss="categorical_crossentropy", optimizer=opt, metrics=["accuracy"])

plot_model(modelV)

## Treinamento do modelo

In [None]:
# Callbacks
es = EarlyStopping(patience=10,monitor="val_loss")
rlr = ReduceLROnPlateau(monitor='val_loss', patience=10)
mc = ModelCheckpoint(filepath='best.h5', save_best_only=True)

In [None]:
stepsValidation = validationGenerator.samples // batchSize
stepsTraining = trainGenerator.samples // batchSize

historyV = modelV.fit_generator(generator = trainGenerator,
    steps_per_epoch = stepsTraining,
    epochs=epochs,
    validation_data = validationGenerator,
    validation_steps = stepsValidation,
    class_weight=class_weights,
    callbacks=[es,rlr,mc]
)

## Avaliação do Modelo

In [None]:
acc = historyV.history['accuracy']
val_acc = historyV.history['val_accuracy']
loss = historyV.history['loss']
val_loss = historyV.history['val_loss']

num_epochs = range(len(acc))

plt.figure(figsize=(7,7))

plt.plot(num_epochs, acc, 'r', label='Training accuracy')
plt.plot(num_epochs, val_acc, 'b', label='Validation accuracy')
plt.title('Training and validation accuracy')
plt.legend()

plt.figure(figsize=(7,7))

plt.plot(num_epochs, loss, 'r', label='Training Loss')
plt.plot(num_epochs, val_loss, 'b', label='Validation Loss')
plt.title('Training and validation loss')
plt.legend()

## Avaliação do modelo com a base de teste

In [None]:
test_images_path = glob.glob(test_path + '*/*.jpeg')
x_test = []
y_test = []
for i in range(len(test_images_path)):
    img = cv2.imread(test_images_path[i])
    img = cv2.resize(img, (150,150))
    img = np.array(img/255.0)
    x_test.append(img)
    if 'CNV' in test_images_path[i]:
        y_test.append(0)
    elif 'DME' in test_images_path[i]:
        y_test.append(1)
    elif 'DRUSEN' in test_images_path[i]:
        y_test.append(2)
    elif 'NORMAL' in test_images_path[i]:
        y_test.append(3)
y_test = np.array(y_test)
x_test = np.array(x_test)
print(x_test.shape, y_test.shape)

In [None]:
y_test_cat = tf.keras.utils.to_categorical(y_test)
loss_and_metrics = modelV.evaluate(x_test, y_test_cat)

y_pred = modelV.predict(x_test)
y_pred = np.argmax(y_pred,axis=1)

labels = ('CNV', 'DME', 'DRUSEN', 'NORMAL')

y_actu = pd.Series(y_test, name='Actual')
y_pred = pd.Series(y_pred, name='Predicted')
df_confusion = pd.crosstab(y_actu, y_pred)

df_conf_norm = df_confusion / df_confusion.sum(axis=1)
print(df_confusion)
print(df_conf_norm)

plt.figure(figsize=(20, 20))
plt.matshow(df_confusion, cmap=plt.get_cmap('Blues'), fignum=1)  # imshow
plt.colorbar()
tick_marks = np.arange(len(labels))
plt.xticks(tick_marks, labels,fontsize=16, rotation=60)
plt.yticks(tick_marks, labels, fontsize=16)
thresh = 0.6

for i in range(n_classes):
    for j in range(n_classes):
        plt.text(i, j, "{:0.2f}%".format(df_conf_norm[i][j] * 100),
                 horizontalalignment='center',
                 color='white' if df_conf_norm[i][j] > thresh else 'black',
                fontsize = 16)

# plt.tight_layout()
plt.ylabel(df_confusion.index.name, fontsize=16)
plt.xlabel(df_confusion.columns.name,fontsize=16)
plt.show()

# Xception

In [None]:
# Remover as camadas FC (Full Connected) do modelo Xception pré-treinado - carregar a rede VGG16,
#certificando-se de que os conjuntos de camadas principais Full Connected (FC) sejam deixados de fora
head_model = Xception(include_top=False, weights="imagenet", input_shape=(224,224,3))

# Remover as camadas FC (Full Connected) do modelo VGG-16 pré-treinado - carregar a rede VGG16,
#certificando-se de que os conjuntos de camadas principais Full Connected (FC) sejam deixados de fora
baseModel = VGG16(weights="imagenet", include_top=False, input_tensor=Input(shape=(width, height, 3)))

# Construir as camadas Full Connected (FC) "top" do modelo, que substituirão a base da VGG16
headModel = baseModel.output
headModel = AveragePooling2D(pool_size=(4, 4))(headModel)
headModel = Flatten(name="flatten")(headModel)
headModel = Dense(512, activation="relu")(headModel)
headModel = Dropout(0.5)(headModel)
headModel = Dense(64, activation="relu")(headModel)
headModel = Dropout(0.5)(headModel)
headModel = Dense(4, activation="softmax")(headModel) # quatro categorias

# Construir o modelo CNN
modelX = Model(inputs=baseModel.input, outputs=headModel)

# Faz um loop sobre todas as camadas no modelo base e as congela para que * não * sejam atualizadas durante o primeiro processo de treinamento
for layer in baseModel.layers:
    layer.trainable = False

# Compilar o modelo
opt = Adam(lr=init_lr, decay=init_lr / epochs)
modelX.compile(loss="categorical_crossentropy", optimizer=opt, metrics=["accuracy"])

plot_model(modelX)

## Treinando o Modelo 

In [None]:
# Callbacks
es = EarlyStopping(patience=10,monitor="val_loss")
rlr = ReduceLROnPlateau(monitor='val_loss', patience=10)
mc = ModelCheckpoint(filepath='best.h5', save_best_only=True)

In [None]:
stepsValidation = validationGenerator.samples // batchSize
stepsTraining = trainGenerator.samples // batchSize

historyX = modelX.fit_generator(generator = trainGenerator,
    steps_per_epoch = stepsTraining,
    epochs=epochs,
    validation_data = validationGenerator,
    validation_steps = stepsValidation,
    class_weight=class_weights,
    callbacks=[es,rlr,mc]
)

## Avaliação do Modelo

In [None]:
acc = historyX.history['accuracy']
val_acc = historyX.history['val_accuracy']
loss = historyX.history['loss']
val_loss = historyX.history['val_loss']

num_epochs = range(len(acc))

plt.figure(figsize=(7,7))

plt.plot(num_epochs, acc, 'r', label='Training accuracy')
plt.plot(num_epochs, val_acc, 'b', label='Validation accuracy')
plt.title('Training and validation accuracy')
plt.legend()

plt.figure(figsize=(7,7))

plt.plot(num_epochs, loss, 'r', label='Training Loss')
plt.plot(num_epochs, val_loss, 'b', label='Validation Loss')
plt.title('Training and validation loss')
plt.legend()

## Avaliação do modelo com a base de teste

In [None]:
y_test_cat = tf.keras.utils.to_categorical(y_test)
loss_and_metrics = modelV.evaluate(x_test, y_test_cat)

y_pred = modelX.predict(x_test)
y_pred = np.argmax(y_pred,axis=1)

labels = ('CNV', 'DME', 'DRUSEN', 'NORMAL')

y_actu = pd.Series(y_test, name='Actual')
y_pred = pd.Series(y_pred, name='Predicted')
df_confusion = pd.crosstab(y_actu, y_pred)

df_conf_norm = df_confusion / df_confusion.sum(axis=1)
print(df_confusion)
print(df_conf_norm)

plt.figure(figsize=(20, 20))
plt.matshow(df_confusion, cmap=plt.get_cmap('Blues'), fignum=1)  # imshow
plt.colorbar()
tick_marks = np.arange(len(labels))
plt.xticks(tick_marks, labels,fontsize=16, rotation=60)
plt.yticks(tick_marks, labels, fontsize=16)
thresh = 0.6