In [None]:
# Importação de bibliotecas
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '1'
import tensorflow as tf
import tensorflow_addons as tfa
import numpy as np
import sklearn
import matplotlib.pyplot as plt
import sklearn as skl
from datetime import datetime
from tensorflow.keras import layers
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, TensorBoard
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report
import pandas as pd
import keras
%matplotlib inline
from tensorflow.keras.preprocessing.image import img_to_array, load_img
# pip3 install adabelief-tf==0.2.0 #Instala o otimizador AdaBelief de https://github.com/juntang-zhuang/Adabelief-Optimizer#2-tensorflow-implementation-eps-of-adabelief-in-tensorflow-is-larger-than-in-pytorch-same-for-adam (posteriormente este otimizador foi adicionado ao Tensorflow addons)
from adabelief_tf import AdaBeliefOptimizer
from matplotlib.pyplot import *

In [None]:
# Verificação se a GPU está disponível
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

In [None]:
# Indicação do armazenamento local dos subdiretórios de treino, validação, teste, checkpoint(callback utilizado para salvar o modelo ao longo do treinamento) e logs(utilizados pelo tensorboard)
train_path = '/tf/tcc/Projeto_Pos/dataset/train'
validation_path = '/tf/tcc/Projeto_Pos/dataset/val'
test_path = '/tf/tcc/Projeto_Pos/dataset/test'
checkpoint_path = '/tf/tcc/checkpoint_model'
logdir = '/tf/tcc/tensorboard_logs/'  + datetime.now().strftime("%d%m%Y-%H%M%S")

size=128 # tamanho da imagem
batch=128 # tamanho do batch


train_ds = tf.keras.utils.image_dataset_from_directory(
  train_path,
  batch_size=batch,
  image_size=(size, size),
  seed=123
)

val_ds = tf.keras.utils.image_dataset_from_directory(
  validation_path,
  batch_size=batch,  
  image_size=(size, size),
  seed=123
)

test_ds = tf.keras.utils.image_dataset_from_directory(
  test_path,
  batch_size=batch,
  image_size=(size, size),
  seed=123
)

In [None]:
# Visualização de 9 exemplos e seus respectivos labels
class_names = test_ds.class_names
print(class_names)

plt.figure(figsize=(10, 10))
for images, labels in train_ds.take(1):
  for i in range(9):
    ax = plt.subplot(3, 3, i + 1)
    plt.imshow(images[i].numpy().astype("uint8"))
    plt.title(class_names[labels[i]])
    plt.axis("off")

In [None]:
# Pré-busca em buffer com parâmetros automatizados no treino, validação e teste
AUTOTUNE = tf.data.AUTOTUNE

train_ds = train_ds.cache().prefetch(buffer_size=AUTOTUNE)
val_ds = val_ds.cache().prefetch(buffer_size=AUTOTUNE)
test_ds = test_ds.cache().prefetch(buffer_size=AUTOTUNE)

In [None]:
callback=[
    EarlyStopping(monitor='val_loss', patience=10), # Interrompe o treinamento se a validation loss não diminui após 10 épocas
    TensorBoard(log_dir=logdir), # Salva os logs do treinamento em formato visualizável pelo Tensorboard
    tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=3, min_lr=1e-7), # Aguarda 3 épocas em que a validation loss não diminua até que mude o learning rate em um fator 0.2 até atingit 0,0000001
    ModelCheckpoint(checkpoint_path, monitor='val_loss', save_best_only=True, mode='min', verbose=0) # Salva o melhor modelo até o momento, sendo o com menor validation_loss
]


# triangular_cyclical_lr=tfa.optimizers.TriangularCyclicalLearningRate(initial_learning_rate=1e-1, maximal_learning_rate=1e-3, step_size=224, scale_mode='cycle')


model = tf.keras.Sequential([
  layers.Rescaling(1./255),
  layers.Conv2D(64, 3, padding='same', activation=tfa.activations.mish),
  layers.BatchNormalization(),
  layers.MaxPooling2D(),
  layers.Conv2D(32, 3, padding='same', activation=tfa.activations.mish),
  layers.BatchNormalization(),
  layers.MaxPooling2D(),
  layers.Conv2D(16, 3, padding='same', activation=tfa.activations.mish),
  layers.BatchNormalization(),
  layers.MaxPooling2D(),
  layers.Conv2D(8, 3, padding='same', activation=tfa.activations.mish),
  layers.BatchNormalization(),
  layers.MaxPooling2D(),
  layers.Flatten(),
  layers.Dropout(0.3),
  layers.Dense(256, activation=tfa.activations.mish),
  layers.BatchNormalization(),
  layers.Dropout(0.7),
  layers.Dense(3)
])


model.compile(
  optimizer = AdaBeliefOptimizer(learning_rate=1e-3, epsilon=1e-5, rectify=True, print_change_log = False),
  loss=tf.losses.SparseCategoricalCrossentropy(from_logits=True),
  metrics=['accuracy', tfa.metrics.CohenKappa(num_classes=3, sparse_labels=True)]
)



model.fit(
  train_ds,
  validation_data=val_ds,
  callbacks=[callback],
  epochs=100
)


In [None]:
model.evaluate(test_ds) #Avalia o modelo no dataset de teste

In [None]:
model.summary() #Mostra o modelo final e seus parâmetros

In [None]:
# Gera uma Matriz de Confusão
y_pred = model.predict(test_ds)
predicted_categories = tf.argmax(y_pred, axis=1) #y_pred
true_categories = tf.concat([y for x, y in test_ds], axis=0) #y_true
labels = ['COVID19', 'Normal', 'Pneumonia']

# Salva a Matriz de Confusão normalizada
cm = confusion_matrix(true_categories, predicted_categories, normalize='true')
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)
disp.plot(cmap=plt.cm.Greens, colorbar=False)
plt.title('Normalized Confusion Matrix')
plt.savefig("ds_normalized_matrix.png", format="png", dpi=300)
plt.show()

In [None]:
# Mostra o score de Precision, Recall, F1-Score, Macro e Weighted Average no dataset de teste
print(classification_report(true_categories, predicted_categories, target_names=labels))

In [None]:
# Imagens carregadas para utilização no Grad-CAM

image1 = "/tf/tcc/Projeto_Pos/dataset/test/Normal/18579_test.png" 
image2 = "/tf/tcc/Projeto_Pos/dataset/test/Normal/16324_test.png" 
image3 = "/tf/tcc/Projeto_Pos/dataset/test/Normal/18079_test.png" 

image4 = "/tf/tcc/Projeto_Pos/dataset/test/Pneumonia/4374_test.png" 
image5 = "/tf/tcc/Projeto_Pos/dataset/test/Pneumonia/11444_test.png"  
image6 = "/tf/tcc/Projeto_Pos/dataset/test/Pneumonia/8985_test.png"  

image7 = "/tf/tcc/Projeto_Pos/dataset/test/COVID19/1_test.png" 
image8 = "/tf/tcc/Projeto_Pos/dataset/test/COVID19/2158_test.png"
image9 = "/tf/tcc/Projeto_Pos/dataset/test/COVID19/2769_test.png"


In [None]:
# Código do Grad-CAM, adaptado de https://www.kaggle.com/databeru/fish-classifier-grad-cam-viz-acc-99-89


img_path = image9
size=128

# Carrega a imagem de entrada e a formata para ser recebida pelo modelo
def get_img_array(img_path, target_size):
    img = load_img(
    img_path, target_size=target_size)
    array = img_to_array(img)
    array = np.expand_dims(array, axis=0)
    return array

img_tensor = get_img_array(img_path, target_size=(128, 128))

def make_gradcam_heatmap(img_array, model, last_conv_layer_name, pred_index=None):
    # É criado um novo modelo que mapeia as ativações da última camada convolucional
    grad_model = tf.keras.models.Model(
        [model.inputs], [model.get_layer(last_conv_layer_name).output, model.output]
    )

    # O gradiente da classe com maior probabilidade é calculado
    with tf.GradientTape() as tape:
        last_conv_layer_output, preds = grad_model(img_array)
        if pred_index is None:
            pred_index = tf.argmax(preds[0])
        class_channel = preds[:, pred_index]

    # Gradiente do neuron de saída relativo ao feature map da última camada convolucional
    grads = tape.gradient(class_channel, last_conv_layer_output)

    # Vetor em que cada entrada consiste na intensidade média do gradiente relativo a um canal específico do feature map
    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))

    # Cada canal do mapa de ativação é multiplicado pelo "grau de importância"do canal em relação à classe com maior probabilidade, e ao final todos os canais são sumados de modo a obter o heatmap(mapa de calor)
    last_conv_layer_output = last_conv_layer_output[0]
    heatmap = last_conv_layer_output @ pooled_grads[..., tf.newaxis]
    heatmap = tf.squeeze(heatmap)

    # O heatmap tem sua escala normalizada entre 0 & 1
    heatmap = tf.maximum(heatmap, 0) / tf.math.reduce_max(heatmap)
    return heatmap.numpy()

def save_and_display_gradcam(img_path, heatmap, cam_path="2769.png", alpha=0.4):
    # Carrega a imagem
    img = tf.keras.preprocessing.image.load_img(img_path)
    img = tf.keras.preprocessing.image.img_to_array(img)

    # Redimensiona o heatmap em um intervalo de 0 a 255
    heatmap = np.uint8(255 * heatmap)

    # Usa o mapa de cores "jet" na colorização do mapa de calor
    jet = cm.get_cmap("jet")

    # Utiliza valores RGB no mapa de cores/mapa de calor
    jet_colors = jet(np.arange(256))[:, :3]
    jet_heatmap = jet_colors[heatmap]

    # Gera a imagem com o heatmap RGB colorizado
    jet_heatmap = tf.keras.preprocessing.image.array_to_img(jet_heatmap)
    jet_heatmap = jet_heatmap.resize((img.shape[1], img.shape[0]))
    jet_heatmap = tf.keras.preprocessing.image.img_to_array(jet_heatmap)

    # Sobrepõe o heatmap à imagem de entrada
    superimposed_img = jet_heatmap * alpha + img
    superimposed_img = tf.keras.preprocessing.image.array_to_img(superimposed_img)

    # Salva a imagem com o heatmap sobreposto
    superimposed_img.save(cam_path)

    # Retorna o resultado do Grad-CAM aplicado à imagem de entrada
    return cam_path

last_conv_layer_name = "conv2d_3"
img_size = (128,128)

# Remove a função de ativação "softmax" da última camada 
model.layers[-1].activation = None

img_array = img_tensor
make_gradcam_heatmap(img_array, model, last_conv_layer_name)
heatmap = make_gradcam_heatmap(img_array, model, last_conv_layer_name)
cam_path = save_and_display_gradcam(img_path, heatmap)
imshow(plt.imread(cam_path))
plt.tight_layout()
plt.axis("off")
plt.show()