# Transfer Learning com VGG16 para detecção de pneumonia

Este notebook utiliza a técnica de Transfer Learning com a arquitetura VGG16 para detectar pneumonia em imagens de raios-X.
***
Repositório utilizado como inspiração:
- [Metastasis Breast Cancer
Detection](https://github.com/danyllosilva/MBCD-BK-PC-MSc)

# Importação das bibliotecas

In [None]:
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
import albumentations as A
import glob
import keras

from sklearn.metrics import *
from sklearn.model_selection import train_test_split
from functools import partial
from mlxtend.plotting import plot_confusion_matrix

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.callbacks import CSVLogger
from tensorflow.keras.callbacks import ReduceLROnPlateau

import warnings
warnings.filterwarnings('ignore')

# Importação do dataset

In [None]:
main_path = '../dataset/'


train_path = os.path.join(main_path,"train")
test_path=os.path.join(main_path,"test")

train_normal = glob.glob(train_path+"/NORMAL/*.jpeg")
train_pneumonia = glob.glob(train_path+"/PNEUMONIA/*.jpeg")

test_normal = glob.glob(test_path+"/NORMAL/*.jpeg")
test_pneumonia = glob.glob(test_path+"/PNEUMONIA/*.jpeg")

In [None]:
train_list = [x for x in train_normal]
train_list.extend([x for x in train_pneumonia])

df_train = pd.DataFrame(np.concatenate([['Normal']*len(train_normal) , ['Pneumonia']*len(train_pneumonia)]), columns = ['label'])
df_train['image'] = [x for x in train_list]

test_list = [x for x in test_normal]
test_list.extend([x for x in test_pneumonia])

df_test = pd.DataFrame(np.concatenate([['Normal']*len(test_normal) , ['Pneumonia']*len(test_pneumonia)]), columns = ['label'])
df_test['image'] = [x for x in test_list]

In [None]:
df_train

In [None]:
df_test

In [None]:
print('Total training images: % s' % str(df_train.shape[0]))
print('Total testing images: % s' % str(df_test.shape[0]))

In [None]:
df_total = pd.concat([df_train, df_test])

In [None]:
df_total

# Divisão dos conjuntos de treino e teste (80/20)

In [None]:
df_train, df_test = train_test_split(df_total, test_size=0.2)

In [None]:
df_train.label.value_counts()

In [None]:
df_test.label.value_counts()

# Definição de constantes

In [None]:
img_size = 224
batch_size = 32
activation = 'sigmoid'
epochs = 15
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=7, min_lr=1e-6, verbose=1)

# Preparando para o treinamento

In [None]:
image_dir = "../dataset/"
image_paths = df_train.image.apply(lambda x: os.path.join(image_dir, x))
print(image_dir)
print(image_paths)

In [None]:
df_train['label_int'] = df_train['label'].map({'Normal': 0, 'Pneumonia': 1})

df_train

In [None]:
df_test['label_int'] = df_test['label'].map({'Normal': 0, 'Pneumonia': 1})

df_test

# Carregando os Dados de Treinamento e Teste com TensorFlow

In [None]:
load_train = tf.data.Dataset.from_tensor_slices((df_train.image,
                                                df_train.label_int))

load_test = tf.data.Dataset.from_tensor_slices((df_test.image,
                                                df_test.label_int))

# Data augmentation

In [None]:
def load_image(path, label):
    image = tf.io.read_file(path)
    image = tf.io.decode_jpeg(image, channels=3)
    return image, label

def augument_image(image):
    transform = A.Compose([A.HorizontalFlip(p = 0.5),
                          A.Rotate(p = 0.5, limit = 15),
                          A.RandomBrightnessContrast(p=0.5, brightness_limit=(-0.2, 0.2), contrast_limit=(-0.1, 0.1),
                                                    brightness_by_max=True),
                           A.RandomResizedCrop(p=0.8, height=img_size, width=img_size,
                                              scale=(0.8, 1.0), ratio=(0.05, 1.1), interpolation=0),
                           A.Blur(blur_limit = (1, 1))

                          ])

    data = {"image": image}
    augumented_data = transform(**data)
    augumented_image = augumented_data["image"]
    augumented_image = tf.cast(augumented_image, tf.float32)
    augumented_image = tf.image.resize(augumented_image, [img_size, img_size]) / 255

    return augumented_image

def augumentor_function(image, label):
    augumented_image = tf.numpy_function(func=augument_image, inp=[image], Tout=tf.float32)
    augumented_image.set_shape([img_size, img_size, 3])
    return augumented_image, label

In [None]:
train_dataset = (
                load_train.shuffle(len(df_train))
                .map(load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE)
                .map(partial(augumentor_function), num_parallel_calls=tf.data.experimental.AUTOTUNE)
                .batch(batch_size)
                .prefetch(tf.data.experimental.AUTOTUNE)
                )
test_dataset = (
                load_test.shuffle(len(df_test))
                .map(load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE)
                .map(partial(augumentor_function), num_parallel_calls=tf.data.experimental.AUTOTUNE)
                .batch(batch_size)
                .prefetch(tf.data.experimental.AUTOTUNE)
                )

train_dataset

# Construindo o modelo

In [None]:
input_shape = (224, 224, 3)

def model_vgg16(input_shape, activation):

    model = tf.keras.applications.vgg16.VGG16(
        input_shape   = input_shape,
        include_top   = False,
        weights = 'imagenet')

    x = model.layers[-1].output
    x = layers.GlobalAveragePooling2D()(x)
    output = layers.Dense(1, activation=activation)(x)
    print('Output shape: ', output.shape)
    model.trainable = False
    model = keras.Model(inputs=model.input, outputs=output)

    return model

if __name__ == "__main__":
    model = model_vgg16(input_shape, activation)
    model.summary()
    print('Input shape: ', input_shape)

# Plot das métricas durante o treinamento

In [None]:
def Plot_Train(hlist, start=1):

    history = {}
    for k in hlist[0].history.keys():
        history[k] = sum([h.history[k] for h in hlist], [])

    epoch_range = range(start, len(history['loss']) +1)
    s           = slice(start-1, None)
    n           = int(len(history.keys()) / 2)

    plt.figure(figsize=[14,4])
    for i in range(n):
        k = list(history.keys())[i]
        plt.subplot(1, n, i+1)
        plt.plot(epoch_range, history[k][s], label='Training')
        plt.plot(epoch_range, history['val_' + k][s], label='Test')
        plt.xlabel('Epoch'); plt.ylabel(k); plt.title(k)
        plt.grid()
        plt.legend()

    plt.tight_layout()
    plt.show()

# Realizando o treinamento do modelo

In [None]:
class_weights = {0: 1.2, 1: 1.0}

In [None]:
def fine_tune_vgg16(model, num_layers_to_unfreeze=4):
    for layer in model.layers[-num_layers_to_unfreeze:]:
        layer.trainable = True
    return model

model = fine_tune_vgg16(model)

model_checkpoint = tf.keras.callbacks.ModelCheckpoint('vgg16_best_model.keras', save_best_only=True)

csv_logger = CSVLogger('cnn_vgg16_train_model_logs.csv', append=True)

model.compile(loss='binary_crossentropy', optimizer=tf.keras.optimizers.Adam(learning_rate=2e-5),
              metrics=['accuracy', tf.keras.metrics.Precision(name='Precision'),
                       tf.keras.metrics.Recall(name='Recall')])

train_history = model.fit(
    train_dataset,
    epochs=epochs,
    batch_size=batch_size,
    verbose=1,
    callbacks=[model_checkpoint, csv_logger, reduce_lr],
    validation_data=test_dataset,
    class_weight=class_weights
)

In [None]:
Plot_Train([train_history])

# Avaliando o modelo

In [None]:
test_data = load_test
test_dataset = (
                test_data
                .map(load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE)
                .map(partial(augumentor_function), num_parallel_calls=tf.data.experimental.AUTOTUNE)
                .batch(batch_size)
                .prefetch(tf.data.experimental.AUTOTUNE)
                )

In [None]:
ytrue = df_test['label_int']
pred = model.predict(test_dataset, verbose=1)
ypred = np.where(pred > 0.5, 1, 0)

In [None]:
df_preds = pd.DataFrame(ypred, ytrue)
df_preds.head(10)

In [None]:
cm = confusion_matrix(y_true=ytrue, y_pred=ypred)
fig, ax = plot_confusion_matrix(conf_mat=cm,
                                show_absolute=True,
                                show_normed=True,
                                colorbar=True,
                               cmap = 'Blues')
plt.show()

In [None]:
report = classification_report(ytrue, ypred, target_names = ['normal', 'pneumonia'])
print(report)

In [None]:
y_probs = pred.ravel()

fpr, tpr, _ = roc_curve(ytrue, y_probs)
auc_score = roc_auc_score(ytrue, y_probs)


plt.figure(figsize=(12, 8))
plt.plot(fpr, tpr, color='blue', label=f'AUC = {auc_score:.4f}')
plt.plot([0, 1], [0, 1], color='red', linestyle='--')
plt.xlabel('Taxa de Falsos Positivos (FPR)')
plt.ylabel('Taxa de Verdadeiros Positivos (TPR)')
plt.legend(loc='lower right')
plt.grid(True)
plt.show()