# <font color='blue'>Data Science Academy</font>
# <font color='blue'>Deep Learning Para Aplicações de IA com PyTorch e Lightning</font>

## <font color='blue'>Mini-Projeto 5</font>
## <font color='blue'>Segmentação de Imagens Médicas com Inteligência Artificial</font>

![DSA](imagens/MP5.png)

## Instalando e Carregando os Pacotes

In [None]:
# Versão da Linguagem Python
from platform import python_version
print('Versão da Linguagem Python Usada Neste Jupyter Notebook:', python_version())

In [None]:
# Para atualizar um pacote, execute o comando abaixo no terminal ou prompt de comando:
# pip install -U nome_pacote

# Para instalar a versão exata de um pacote, execute o comando abaixo no terminal ou prompt de comando:
# !pip install nome_pacote==versão_desejada

# Depois de instalar ou atualizar o pacote, reinicie o jupyter notebook.

# Instala o pacote watermark. 
# Esse pacote é usado para gravar as versões de outros pacotes usados neste jupyter notebook.
!pip install -q -U watermark

In [None]:
!pip install -q opencv-python==4.8.0.76

In [None]:
!pip install -q torch==2.0.0

In [None]:
!pip install -q torchvision==0.15.1

Albumentations é uma biblioteca Python para aumento de imagens (dataset augmentation). O aumento de imagem é usado em tarefas de aprendizado profundo e visão computacional para aumentar a qualidade dos modelos treinados. O objetivo do aumento de imagem é criar novas amostras de treinamento a partir dos dados existentes.

In [None]:
!pip install -q albumentations==0.4.6

In [None]:
# Imports
import os
import cv2
import time
import sklearn
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import albumentations as A
import albumentations.augmentations.transforms as AT
from torch.utils.data import Dataset, DataLoader
from albumentations.pytorch import ToTensorV2, ToTensor
from sklearn.model_selection import train_test_split
from torchvision.models import resnext50_32x4d
from mpl_toolkits.axes_grid1 import ImageGrid
from tqdm.notebook import tqdm
import warnings
warnings.filterwarnings('ignore')

In [None]:
# Versões dos pacotes usados neste jupyter notebook
%reload_ext watermark
%watermark -a "Data Science Academy" --iversions

## Carregando e Organizando as Imagens

In [None]:
# Caminho dos dados
pasta_imagens = "dados"

In [None]:
# Lista para os dados organizados
dados = []

In [None]:
# Loop para iterar através de cada diretório ou arquivo em pasta_imagens
for dir_ in os.listdir(pasta_imagens):
    
    # Concatena o caminho base com o nome do diretório ou arquivo para formar o caminho completo
    dir_path = os.path.join(pasta_imagens, dir_)
    
    # Verifica se o caminho é um diretório
    if os.path.isdir(dir_path):
        
        # Loop para iterar através de cada arquivo dentro do diretório
        for filename in os.listdir(dir_path):
            
            # Concatena o caminho do diretório com o nome do arquivo para formar o caminho completo do arquivo
            img_path = os.path.join(dir_path, filename)
            
            # Anexa o nome do diretório e o caminho completo do arquivo à lista "dados"
            dados.append([dir_, img_path])
    
    # Se o caminho não é um diretório, imprime uma mensagem de informação
    else:
        print(f"Isso não é uma pasta --> {dir_path}")

In [None]:
# Cria o dataframe
df_imagens = pd.DataFrame(dados, columns = ["nome_pasta", "caminho_imagem"])

In [None]:
# Amostra do dataframe
df_imagens.head()

In [None]:
# Caminho para a imagem de índice 2
df_imagens.caminho_imagem[2]

In [None]:
# Dataframes de imagens e máscaras
df_imgs = df_imagens[~df_imagens["caminho_imagem"].str.contains("mask")]
df_masks = df_imagens[df_imagens["caminho_imagem"].str.contains("mask")]

In [None]:
# Lista de imagens. O pandas trunca o nome e não podemos ver o final.
df_imgs.caminho_imagem[0:5]

In [None]:
# Não trunca os nomes
pd.set_option('display.max_colwidth', None)

In [None]:
# Lista de imagens. Observe o nome de algumas imagens com apenas 1 dígito.
df_imgs.caminho_imagem[0:5]

In [None]:
# Lista de máscaras. Observe o nome de algumas máscaras com apenas 1 dígito.
df_masks.caminho_imagem[0:5]

In [None]:
# Uma imagem
df_imgs.caminho_imagem[2]

In [None]:
len(df_imgs.caminho_imagem[2])

In [None]:
# Uma máscara
df_masks.caminho_imagem[1]

In [None]:
len(df_masks.caminho_imagem[1])

In [None]:
# Parâmetros para extrair caminho da imagem e nome da imagem e máscara de traz para frente
caracteres_nome_imagem = 50
fim_imagem = 4
fim_mask = 9

In [None]:
# Slice
df_imgs['caminho_imagem'].str[0:caracteres_nome_imagem][0:5]

In [None]:
# Testando
df_imgs['caminho_imagem'].str[caracteres_nome_imagem : -fim_imagem][0:5]

In [None]:
# Testando
df_masks['caminho_imagem'].str[caracteres_nome_imagem : -fim_mask][0:5]

In [None]:
# Ordena imagens e máscaras obtendo os nomes de trás para frente
imgs = sorted(df_imgs["caminho_imagem"].values, key = lambda x: int(x[caracteres_nome_imagem : -fim_imagem]))
masks = sorted(df_masks["caminho_imagem"].values, key = lambda x: int(x[caracteres_nome_imagem : -fim_mask]))

In [None]:
# Lista de imagens
imgs

In [None]:
# Lista de máscaras
masks

In [None]:
# Verifica se o caminho foi extraído corretamente
idx = random.randint(0, len(imgs)-1)
print(f"Imagem:  *{imgs[idx]}*\nMáscara: *{masks[idx]}*")

In [None]:
# Dataframe final
df_final = pd.DataFrame({"paciente": df_imgs.nome_pasta.values, "caminho_imagem": imgs, "caminho_mascara": masks})

In [None]:
# Shape
df_final.shape

In [None]:
# Amostra
df_final.head()

In [None]:
# Define a função que aceita um caminho para uma máscara como argumento e retorna o diagnóstico
def func_diagnostico(mask_path):
    
    # Carrega a máscara usando OpenCV e encontra o valor máximo de pixel na imagem
    val = np.max(cv2.imread(mask_path))
    
    # Verifica se o valor máximo na imagem de máscara é maior que 0 (se for zero a máscara está vazia)
    if val > 0: 
        return 1  # Se sim, retorna 1, indicando uma "condição positiva" (por exemplo, presença de área de interesse)
    else: 
        return 0  # Se não, retorna 0, indicando uma "condição negativa" (por exemplo, ausência de área de interesse)

In [None]:
# Aplica a função e extrai o diagnóstico
df_final["diagnostico"] = df_final["caminho_mascara"].apply(lambda x: func_diagnostico(x))

In [None]:
df_final.shape

In [None]:
df_final.sample(10)

In [None]:
print("Total de Pacientes: ", len(set(df_final.paciente)))
print("Total de Imagens: ", len(df_final))

## Análise Exploratória e Visualização das Imagens

In [None]:
# Inicia a plotagem do gráfico de barras usando os valores da coluna 'diagnostico' do DataFrame 'df_final'
ax = df_final.diagnostico.value_counts().plot(kind = "bar", 
                                            stacked = True, 
                                            figsize = (12, 6), 
                                            color = ["red", "green"])

# Define os rótulos do eixo X como "Positivo" e "Negativo", com rotação de 45 graus e tamanho de fonte 12
ax.set_xticklabels(["Positivo", "Negativo"], rotation = 45, fontsize = 12)

# Define o rótulo do eixo Y como "Total Imagens" com tamanho de fonte 12
ax.set_ylabel("Total Images", fontsize = 12)

# Define o título do gráfico, o tamanho da fonte do título e a posição vertical do título
ax.set_title("Proporção de Registros Por Diagnóstico", fontsize = 18, y = 1.05)

# Loop para anotar cada barra do gráfico com o número total de ocorrências para cada diagnóstico
for i, rows in enumerate(df_final.diagnostico.value_counts().values):
    ax.annotate(int(rows), 
                xy = (i, rows-12), 
                rotation = 0, 
                color = "white", 
                ha = "center", 
                verticalalignment = 'bottom', 
                fontsize = 15, 
                fontweight = "bold")
    
# Adiciona um texto ao gráfico indicando o número total de imagens
ax.text(1.2, 2550, f"Total de {len(df_final)} Imagens", 
        size = 15,
        color = "black",
        ha = "center", 
        va = "center",
        bbox = dict(boxstyle = "round", fc = ("lightblue")))

In [None]:
# Retorna quantas imagens diagnosticadas e não diagnosticadas cada paciente possui
df_final.groupby(["paciente", "diagnostico"])["diagnostico"].size()

In [None]:
# Agrupa pacientes por diagnóstico e preenche valores NA
pacientes_por_diagnostico = df_final.groupby(["paciente", "diagnostico"])["diagnostico"].size().unstack().fillna(0)

In [None]:
# Ajusta os nomes das colunas
pacientes_por_diagnostico.columns = ["Positivo", "Negativo"]

In [None]:
# Visualiza as primeiras linhas
pacientes_por_diagnostico.head()

In [None]:
# Plot
ax = pacientes_por_diagnostico.plot(kind = "bar", 
                                    stacked = True, 
                                    figsize = (18,10), 
                                    color = ["blue", "magenta"], 
                                    alpha = 0.85)
ax.grid(False)
ax.set_xlabel('Pacientes',fontsize = 20)
ax.set_ylabel('Total de Imagens', fontsize = 20)
ax.legend(fontsize = 20, loc = "upper left")
ax.set_title("Distribuição dos Dados Agrupados Por Paciente e Diagnóstico", fontsize = 25, y = 1.005)

> Vamos agora visualizar imagens e máscaras.

In [None]:
# Separa amostras com diagnóstico positivo e negativo
sample_pos = df_final[df_final["diagnostico"] == 1].sample(5).caminho_imagem.values
sample_neg = df_final[df_final["diagnostico"] == 0].sample(5).caminho_imagem.values

In [None]:
# Lista para as amostras de imagens
sample_imgs = []

In [None]:
# Tamanho de cada imagem
IMG_SIZE = 512

In [None]:
# Carrega as imagens e separa pelo diagnóstico
for i, (pos, neg) in enumerate(zip(sample_pos, sample_neg)):
    pos = cv2.resize(cv2.imread(pos), (IMG_SIZE, IMG_SIZE))
    neg = cv2.resize(cv2.imread(neg), (IMG_SIZE, IMG_SIZE))
    sample_imgs.extend([pos, neg])

In [None]:
# Converte as amostras para array 
sample_yes_arr = np.vstack(np.array(sample_imgs[::2]))
sample_no_arr = np.vstack(np.array(sample_imgs[1::2]))

In [None]:
# Matriz de 2560 imagens, com 512 pixels de tamanho e 3 canais de cores
sample_yes_arr.shape

In [None]:
# Máscaras
sample_df = df_final[df_final["diagnostico"] == 1].sample(5).values

In [None]:
sample_imgs = []

In [None]:
# Loop para buscar imagens e máscaras na amostra
for i, data in enumerate(sample_df):
    img = cv2.resize(cv2.imread(data[1]), (IMG_SIZE, IMG_SIZE))
    mask = cv2.resize(cv2.imread(data[2]), (IMG_SIZE, IMG_SIZE))
    sample_imgs.extend([img, mask])

In [None]:
# Converte para array
sample_img_arr = np.hstack(sample_imgs[::2])
sample_mask_arr = np.hstack(sample_imgs[1::2])

In [None]:
# Plot
fig = plt.figure(figsize = (25., 25.))
grid = ImageGrid(fig, 111,  nrows_ncols = (2, 1), axes_pad = 0.1)
grid[0].imshow(sample_img_arr)
grid[0].set_title("Imagens", fontsize = 25)
grid[0].axis("off")
grid[0].grid(False)
grid[1].imshow(sample_mask_arr)
grid[1].set_title("Máscaras", fontsize = 25, y = 0.9)
grid[1].axis("off")
grid[1].grid(False)
plt.show()

## Dataset Augmentation e DataLoader

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
device

In [None]:
# Se CUDA estiver instalado, verifica a GPU
if torch.cuda.is_available():
    print('Número de GPUs:', torch.cuda.device_count())
    print('Modelo da GPU:',torch.cuda.get_device_name(0))
    print('Total de Memória da GPU [GB]:',torch.cuda.get_device_properties(0).total_memory/1e9)

In [None]:
# Define a classe para criar o dataset
class PreparaDataset:
    
    # Método construtor para inicializar os atributos da classe
    def __init__(self, df, transforms):
        
        # DataFrame contendo informações sobre as imagens e máscaras
        self.df = df  
        
        # Transformações a serem aplicadas nas imagens e máscaras
        self.transforms = transforms  

    # Método para obter um item específico do conjunto de dados usando um índice
    def __getitem__(self, idx):
        
        # Lê a imagem da localização especificada na coluna 1 do DataFrame
        image = cv2.imread(self.df.iloc[idx, 1])
        
        # Lê a máscara da localização especificada na coluna 2 do DataFrame
        # '0' indica que a imagem é lida em escala de cinza
        mask = cv2.imread(self.df.iloc[idx, 2], 0)
        
        # Aplica as transformações especificadas em 'self.transforms' tanto na imagem quanto na máscara
        augmented = self.transforms(image = image, mask = mask)
        
        # Atualiza a imagem e a máscara com as versões transformadas
        image = augmented["image"]
        mask = augmented["mask"]
        
        # Retorna a imagem e a máscara como uma tupla
        return image, mask

    # Método para obter o número total de itens no conjunto de dados
    def __len__(self):
        
        # Retorna o número de linhas no DataFrame
        return len(self.df)  

In [None]:
# Tamanho do patch
PATCH_SIZE = 128

In [None]:
# Aqui definimos uma sequência de transformações a serem aplicadas nas imagens
transforms = A.Compose([
    
    # Redimensiona a imagem para o tamanho especificado (PATCH_SIZE x PATCH_SIZE)
    A.Resize(width = PATCH_SIZE, height = PATCH_SIZE, p = 1.0),
    
    # Espelha a imagem horizontalmente com probabilidade de 0,5
    A.HorizontalFlip(p = 0.5),
    
    # Espelha a imagem verticalmente com probabilidade de 0,5
    A.VerticalFlip(p = 0.5),
    
    # Gira a imagem aleatoriamente em 90 graus com probabilidade de 0,5
    A.RandomRotate90(p = 0.5),
    
    # Transpõe a imagem (inverte altura e largura) com probabilidade de 0,5
    A.Transpose(p = 0.5),
    
    # Realiza pequenos deslocamentos, redimensionamentos e rotações na imagem com probabilidade de 0,25
    A.ShiftScaleRotate(shift_limit = 0.01, scale_limit = 0.04, rotate_limit = 0, p = 0.25),

    # Normaliza a imagem para ter uma média de 0 e um desvio padrão de 1
    A.Normalize(p = 1.0),
    
    # Converte a imagem para um tensor do PyTorch
    ToTensor(),
])

In [None]:
# Cria amostra de treino e validação
df_treino, df_valid = train_test_split(df_final, stratify = df_final.diagnostico, test_size = 0.1)

In [None]:
# Reset do índice
df_treino = df_treino.reset_index(drop = True)

In [None]:
# Reset do índice
df_valid = df_valid.reset_index(drop = True)

In [None]:
# Divide dados de treino em treino e teste
df_treino, df_teste = train_test_split(df_treino, stratify = df_treino.diagnostico, test_size = 0.12)

In [None]:
# Reset do índice
df_treino = df_treino.reset_index(drop = True)

In [None]:
print(f"Treino: {df_treino.shape} \nValid: {df_valid.shape} \nTeste: {df_teste.shape}")

In [None]:
# Aplica as transformações em treino
dataset_treino = PreparaDataset(df_treino, transforms = transforms)

In [None]:
print(len(dataset_treino))

In [None]:
# Cria o dataloader de treino
dl_treino = DataLoader(dataset_treino, batch_size = 26, shuffle = True)

In [None]:
# Aplica as transformações em validação
dataset_valid = PreparaDataset(df_valid, transforms = transforms)

In [None]:
print(len(dataset_valid))

In [None]:
# Cria o dataloader de validação
dl_valid = DataLoader(dataset_valid, batch_size = 26, num_workers = 2, shuffle = True)

In [None]:
# Aplica as transformações em teste
dataset_teste = PreparaDataset(df_teste, transforms = transforms)

In [None]:
print(len(dataset_teste))

In [None]:
# Cria o dataloader de teste
dl_teste = DataLoader(dataset_teste, batch_size = 26, num_workers = 2, shuffle = True)

> Vamos definir a função para exibir imagens após a aplicação das transformações.

In [None]:
# Função
def mostra_imagens(inputs, nrows = 5, ncols = 5, norm = True):
    
    # Cria uma figura para o plot das imagens
    plt.figure(figsize = (10, 10))
    
    # Ajusta o espaço entre subplots
    plt.subplots_adjust(wspace = 0., hspace = 0.)
    
    # Inicializa o contador para subplots
    i_ = 0
    
    # Limita o número de entradas para no máximo 25
    if len(inputs) > 25:
        inputs = inputs[:25]
    
    # Loop para iterar através de todas as imagens na lista de entrada
    for idx in range(len(inputs)):
        
        # Normaliza as imagens se 'norm' for True
        if norm:
            
            # Transpõe a imagem e converte para o formato NumPy
            img = inputs[idx].numpy().transpose(1, 2, 0)
            
            # Define a média e o desvio padrão para normalização
            mean = [0.485, 0.456, 0.406]
            std = [0.229, 0.224, 0.225]
            
            # Desnormaliza a imagem
            img = (img * std + mean).astype(np.float32)
        
        # Caso contrário, apenas converte para o formato NumPy sem normalização
        else:
            img = inputs[idx].numpy().astype(np.float32)
            img = img[0, :, :]
        
        # Adiciona um subplot na posição i_ + 1
        plt.subplot(nrows, ncols, i_ + 1)
        
        # Plota a imagem em escala de cinza se ela tiver menos de 3 dimensões
        if len(img.shape) < 3:
            plt.imshow(img, cmap = "gray")
        
        # Caso contrário, plota a imagem colorida
        else:
            plt.imshow(img)
        
        # Remove os eixos do subplot
        plt.axis('off')
        
        # Incrementa o contador de subplots
        i_ += 1
    
    # Exibe a figura com todos os subplots
    return plt.show()

In [None]:
# Verifica se o script está sendo executado como o programa principal
if __name__ == '__main__':
    
    # Obtém o próximo lote de imagens e máscaras do dataloader de treinamento
    images, masks = next(iter(dl_treino))
    
    # Exibe as dimensões das matrizes de imagens e máscaras
    print(images.shape, masks.shape)
    
    # Usa a função 'mostra_imagens' para exibir as imagens
    # Aqui, a normalização é assumida como verdadeira por padrão
    mostra_imagens(images)
    
    # Usa a função 'mostra_imagens' para exibir as máscaras
    # Neste caso, 'norm' é definido como falso para evitar a normalização
    mostra_imagens(masks, norm = False)

## Modelagem Para Segmentação de Imagens com Arquitetura U-Net

A arquitetura U-Net é um tipo de rede neural convolucional que foi inicialmente projetada para tarefas de segmentação semântica em imagens biomédicas. A estrutura foi apresentada por Olaf Ronneberger, Philipp Fischer e Thomas Brox em 2015, e desde então tem sido adaptada e utilizada em várias outras tarefas de segmentação e análise de imagens. Link do paper de pesquisa:

https://arxiv.org/abs/1505.04597

**Estrutura Básica**

A U-Net tem uma estrutura simétrica que se assemelha a um "U", razão pela qual é chamada de U-Net. A rede é composta por duas partes principais:

Codificador (Downsampling Path): A primeira metade da rede consiste em várias camadas convolucionais e camadas de pooling para reduzir as dimensões espaciais da imagem de entrada. Isso permite que a rede capture as características contextuais da imagem. Geralmente, cada etapa de downsampling é composta por duas convoluções seguidas por uma operação de pooling (geralmente max pooling).

Decodificador (Upsampling Path): A segunda metade da rede faz o oposto do codificador. Ela recebe a saída do codificador e aumenta suas dimensões espaciais usando camadas de convolução transposta (ou upsampling). Para capturar informações de localização com precisão, as saídas de algumas das camadas do codificador são concatenadas com as entradas das camadas correspondentes no decodificador.

**Características Importantes**

Skip Connections: Uma característica marcante da U-Net é o uso de conexões de salto (skip connections) entre as camadas do codificador e do decodificador. Isso permite que a rede preserve informações de localização, que são fundamentais para tarefas como segmentação de imagem.

Camadas Totalmente Conectadas: Diferentemente de outras redes neurais convolucionais, a U-Net geralmente não contém camadas totalmente conectadas, o que a torna mais eficiente em termos de uso de memória.

Campo Receptivo Grande: Devido às múltiplas camadas de downsampling e upsampling, a U-Net possui um campo receptivo grande, permitindo que capture mais contexto em torno de cada pixel.

Treinamento com Poucos Dados: Uma das principais vantagens da U-Net é sua eficácia mesmo quando treinada com uma quantidade relativamente pequena de dados anotados.

**Aplicações**

Embora inicialmente projetada para imagens biomédicas, a arquitetura U-Net foi adotada em diversas áreas, como detecção de objetos, visão robótica, análise de imagens médicas não biomédicas e até mesmo em processamento de linguagem natural.

Referência:

https://arxiv.org/abs/1505.04597

![DSA](imagens/unet.jpg)

In [None]:
# Define uma função para criar uma operação de convolução dupla
def double_conv(in_channels, out_channels):
    
    # Retorna uma sequência de operações em uma rede neural (nn.Sequential)
    return nn.Sequential(
        
        # Primeira convolução: recebe 'in_channels' e retorna 'out_channels'
        # O kernel é de tamanho 3x3 e o padding é 1 para manter as dimensões
        nn.Conv2d(in_channels, out_channels, 3, padding = 1),
        
        # Aplica a função de ativação ReLU in-place após a primeira convolução
        nn.ReLU(inplace = True),
        
        # Segunda convolução: recebe 'out_channels' (saída da primeira convolução) e também retorna 'out_channels'
        # O kernel é de tamanho 3x3 e o padding é 1 para manter as dimensões
        nn.Conv2d(out_channels, out_channels, 3, padding=1),
        
        # Aplica a função de ativação ReLU in-place após a segunda convolução
        nn.ReLU(inplace = True)
    )

In [None]:
# Define a classe UNet, herdando de nn.Module (PyTorch)
class UNet(nn.Module):

    # Construtor da classe
    def __init__(self, n_classes):
        
        # Chama o construtor da classe pai (nn.Module)
        super().__init__()
                
        # Camadas de convolução descendentes (Encoder)
        self.conv_down1 = double_conv(3, 64)
        self.conv_down2 = double_conv(64, 128)
        self.conv_down3 = double_conv(128, 256)
        self.conv_down4 = double_conv(256, 512)
        
        # Camada de Max pooling para reduzir dimensões
        self.maxpool = nn.MaxPool2d(2)
        
        # Camada de Upsample para aumentar dimensões (Decoder)
        self.upsample = nn.Upsample(scale_factor = 2, mode = 'bilinear', align_corners = True)
        
        # Camadas de convolução ascendentes (Decoder)
        self.conv_up3 = double_conv(256 + 512, 256)
        self.conv_up2 = double_conv(128 + 256, 128)
        self.conv_up1 = double_conv(128 + 64, 64)
        
        # Última camada de convolução para ajustar o número de classes
        self.last_conv = nn.Conv2d(64, n_classes, kernel_size = 1)

    # Método para a passagem para a frente (forward pass)
    def forward(self, x):
        
        # Convoluções descendentes e Max pooling
        conv1 = self.conv_down1(x)
        x = self.maxpool(conv1)
        
        conv2 = self.conv_down2(x)
        x = self.maxpool(conv2)
        
        conv3 = self.conv_down3(x)
        x = self.maxpool(conv3)
        
        # Última convolução descendente
        x = self.conv_down4(x)
        
        # Inicia o processo de upsampling (decodificação)
        x = self.upsample(x)
        
        # Concatenação e convolução ascendente
        x = torch.cat([x, conv3], dim = 1)
        x = self.conv_up3(x)
        
        x = self.upsample(x)
        
        x = torch.cat([x, conv2], dim = 1)
        x = self.conv_up2(x)
        
        x = self.upsample(x)
        
        x = torch.cat([x, conv1], dim = 1)
        x = self.conv_up1(x)
        
        # Última camada de convolução e ativação
        out = self.last_conv(x)
        
        # Aplica a função de ativação sigmóide para normalizar a saída
        out = torch.sigmoid(out)
        
        # Retorna a saída
        return out

In [None]:
# Cria instância da classe e manda para o device
modelo_unet_padrao = UNet(n_classes = 1).to(device)

In [None]:
# Arquitetura final
modelo_unet_padrao.parameters

## Métricas de Avaliação Para Segmentação de Imagens

O Coeficiente de Dice é uma métrica estatística utilizada para avaliar a semelhança entre dois conjuntos. O coeficiente é amplamente usado em diversas áreas, incluindo ecologia, informática e, mais notavelmente, em processamento de imagem e aprendizado de máquina para tarefas como segmentação de imagem.

In [None]:
# Função para métrica de segmentação
def dice_coef_metric(inputs, target):
    
    # Calcula a interseção entre o alvo (target) e as entradas (inputs), multiplicando-os 
    # elemento a elemento e somando o resultado.
    # Multiplica por 2.0 para seguir a fórmula do coeficiente de Dice.
    intersection = 2.0 * (target * inputs).sum()

    # Calcula a união entre o alvo e as entradas, somando todos os elementos de cada um.
    union = target.sum() + inputs.sum()

    # Verifica se tanto o alvo quanto as entradas são vetores de zeros.
    # Se forem, o coeficiente de Dice é definido como 1.0 nesse caso.
    if target.sum() == 0 and inputs.sum() == 0:
        return 1.0
    
    # Calcula e retorna o coeficiente de Dice usando a fórmula: 2 * |X ∩ Y| / (|X| + |Y|)
    return intersection / union

A função dice_coef_loss abaixo é uma versão modificada do coeficiente de Dice, adaptada para ser usada como uma função de perda em algoritmos de aprendizado de máquina. A ideia é minimizar essa perda durante o treinamento para que a rede neural gere segmentações que são o mais próximo possível dos rótulos verdadeiros.

A adição do termo smooth é uma técnica comum para evitar divisão por zero e para suavizar o gradiente, tornando o treinamento mais estável. O termo é adicionado tanto no numerador quanto no denominador da fração para manter a simetria.

In [None]:
# Função para o erro de segmentação
def dice_coef_loss(inputs, target):
    
    # Adiciona um valor suavizador ("smooth") para evitar divisão por zero
    smooth = 1.0 

    # Calcula a interseção entre a entrada e o alvo, multiplicando-os elemento a elemento e somando o resultado.
    # Multiplica por 2.0 para seguir a fórmula do coeficiente de Dice.
    # Adiciona o valor de "smooth" para suavização.
    intersection = 2.0 * (target * inputs).sum() + smooth

    # Calcula a união entre o alvo e as entradas, somando todos os elementos de cada um.
    # Adiciona o valor de "smooth" para suavização.
    union = target.sum() + inputs.sum() + smooth

    # Calcula a perda como 1 menos o coeficiente de Dice.
    # O objetivo durante o treinamento é minimizar essa perda, o que maximiza o coeficiente de Dice.
    return 1 - (intersection / union)

A função bce_dice_loss abaixo combina duas métricas de perda comuns em tarefas de segmentação de imagem: a perda de coeficiente de Dice e a perda de entropia cruzada binária (BCE, do inglês "Binary Cross Entropy").

**Dice Loss**: dice_coef_loss(inputs, target) computa a perda baseada no coeficiente de Dice, que é útil para medir a similaridade entre a saída prevista (inputs) e a verdadeira (target).

**BCE Loss**: nn.BCELoss() inicializa a perda de entropia cruzada binária, que é comumente usada para problemas de classificação binária. A função bce_loss(inputs, target) então calcula esta perda entre as entradas e os alvos.

**Combinação**: O valor retornado é a soma das duas perdas. Isso é feito para aproveitar os benefícios de ambas as métricas: enquanto a BCE é eficaz para a classificação pixel a pixel, o coeficiente de Dice leva em consideração a relação espacial entre os pixels.

Esta abordagem combinada é frequentemente mais robusta do que usar qualquer uma das métricas isoladamente.

In [None]:
# Função para o erro final de segmentação
def bce_dice_loss(inputs, target):
    
    # Calcula o valor da função de perda baseada no coeficiente de Dice
    dice_score = dice_coef_loss(inputs, target)
    
    # Inicializa a função de perda de entropia cruzada binária (BCE)
    bce_loss = nn.BCELoss()
    
    # Calcula o valor da função de perda de entropia cruzada binária para as entradas e alvos dados
    bce_score = bce_loss(inputs, target)
    
    # Soma as duas funções de perda (Dice e BCE) para obter uma métrica de perda combinada
    return bce_score + dice_score

In [None]:
# Testando
bce_dice_loss(torch.tensor([0.7, 1., 1.]), torch.tensor([1.,1.,1.]))

## Loop de Treinamento

### Early Stopping

Leia o manual em pdf no Capítulo 14.

In [None]:
# Define a classe EarlyStopping para interromper o treinamento quando não houver melhora
class EarlyStopping():

    # Inicializa a classe com parâmetros de tolerância e variação mínima (min_delta)
    def __init__(self, tolerance = 4, min_delta = 0):
        
        # Número máximo de épocas para tolerar sem melhora
        self.tolerance = tolerance  
        
        # A diferença mínima entre a perda de treino e validação para considerar como melhora
        self.min_delta = min_delta  
        
        # Contador para rastrear o número de épocas sem melhora
        self.counter = 0  
        
        # Sinalizador para indicar se o treinamento deve ser interrompido
        self.early_stop = False  

    # Método chamado em cada época para verificar as condições de parada antecipada
    def __call__(self, train_loss, validation_loss):
        
        # Verifica se a diferença entre a perda de validação e a perda de treino é maior que min_delta
        if (validation_loss - train_loss) > self.min_delta:
            
            # Incrementa o contador se a condição for atendida
            self.counter += 1  
            
            # Verifica se o contador atingiu o limite de tolerância
            if self.counter >= self.tolerance:
                
                # Ativa o sinalizador para interromper o treinamento
                self.early_stop = True  

In [None]:
# Cria instância da classe
early_stopping = EarlyStopping(tolerance = 4, min_delta = 9)

### Função Para Calcular o IoU

In [None]:
# Define uma função para calcular a métrica IOU (Intersection Over Union) usando um modelo e um DataLoader
def compute_iou(model, loader, threshold = 0.3):
    
    # Inicializa a variável de perda de validação
    valloss = 0
    
    # Desativa o cálculo de gradientes para economizar memória e acelerar os cálculos
    with torch.no_grad():
        
        # Itera sobre o DataLoader, obtendo batches de dados e seus respectivos rótulos (target)
        for i_step, (data, target) in enumerate(loader):
            
            # Move o tensor de dados para a GPU ou outro dispositivo especificado
            data = data.to(device)
            
            # Move o tensor de rótulos (target) para a GPU ou outro dispositivo especificado
            target = target.to(device)
            
            # Utiliza o modelo para gerar previsões
            outputs = model(data)

            # Copia as saídas para a CPU e converte para um array NumPy, desanexando do gráfico de computação
            out_cut = np.copy(outputs.data.cpu().detach())
            
            # Aplica um limiar para definir os pixels como pertencentes à classe de interesse ou não
            out_cut[np.nonzero(out_cut < threshold)] = 0.0
            
            # Aplica um limiar para definir os pixels como pertencentes à classe de interesse ou não
            out_cut[np.nonzero(out_cut >= threshold)] = 1.0

            # Calcula a métrica de perda usando coeficiente de Dice 
            picloss = dice_coef_metric(out_cut, target.data.cpu().numpy())
            
            # Acumula as perdas para posterior média
            valloss += picloss

    # Retorna a média da perda de validação
    return valloss / i_step

### Função Para Ajustar a Taxa de Aprendizado

In [None]:
# Define a função warmup_lr_scheduler para ajustar a taxa de aprendizado durante as iterações iniciais
def warmup_lr_scheduler(optimizer, warmup_iters, warmup_factor):
    
    # Define a função f(x) que modifica a taxa de aprendizado com base na iteração atual x
    def f(x):
        
        # Se a iteração atual for maior ou igual ao número de iterações de aquecimento, retorna 1
        # Isso significa que após warmup_iters iterações, a taxa de aprendizado não será mais ajustada
        if x >= warmup_iters:
            return 1
        
        # Calcula o fator alpha como a razão da iteração atual para as iterações de aquecimento totais
        alpha = float(x) / warmup_iters
        
        # Calcula o valor da taxa de aprendizado usando o fator de aquecimento e alpha
        # Durante o aquecimento, este valor será menor que 1, escalando a taxa de aprendizado original
        return warmup_factor * (1 - alpha) + alpha

    # Retorna um agendador de taxa de aprendizado que ajusta a taxa de aprendizado do otimizador usando f(x)
    return torch.optim.lr_scheduler.LambdaLR(optimizer, f)

### Função de Treino

In [None]:
# Função para treinar um modelo
def treina_modelo(model_name, model, train_loader, val_loader, train_loss, optimizer, lr_scheduler, num_epochs):
    
    # Mostra uma mensagem informando que o treinamento foi iniciado
    print(f"Iniciando o Treinamento do Modelo {model_name}")
    
    # Listas para armazenar histórico de perda e métricas
    loss_history = []
    train_history = []
    val_history = []
        
    # Loop sobre o número total de épocas
    for epoch in range(num_epochs):
        
        # Configura o modelo para o modo de treinamento
        model.train()
        
        # Lista para armazenar as perdas para esta época
        losses = []
        
        # Lista para armazenar as métricas IOU para esta época
        train_iou = []
        
        # Verifica se um programador de taxa de aprendizado foi fornecido
        if lr_scheduler:
            
            # Fator inicial para o aquecimento da taxa de aprendizado
            warmup_factor = 1.0 / 100
            
            # Número de iterações para o aquecimento
            warmup_iters = min(100, len(train_loader) - 1)
            
            # Aplica o aquecimento ao agendador da taxa de aprendizado
            lr_scheduler = warmup_lr_scheduler(optimizer, warmup_iters, warmup_factor)
        
        # Itera sobre o conjunto de treinamento
        for i_step, (data, target) in enumerate(tqdm(train_loader)):
            
            # Move os dados e os rótulos para o device
            data = data.to(device)
            target = target.to(device)
            
            # Realiza a inferência para gerar previsões
            outputs = model(data)
            
            # Binariza as saídas do modelo com base em um limiar
            out_cut = np.copy(outputs.data.cpu().numpy())
            out_cut[np.nonzero(out_cut < 0.5)] = 0.0
            out_cut[np.nonzero(out_cut >= 0.5)] = 1.0
            
            # Calcula a métrica DICE para as previsões
            train_dice = dice_coef_metric(out_cut, target.data.cpu().numpy())
            
            # Calcula a perda
            loss = train_loss(outputs, target)
            
            # Adiciona a perda e o IOU às listas correspondentes
            losses.append(loss.item())
            train_iou.append(train_dice)
            
            # Zera os gradientes acumulados
            optimizer.zero_grad()
            
            # Calcula os gradientes com base na perda
            loss.backward()
            
            # Atualiza os parâmetros do modelo
            optimizer.step()
    
            # Atualiza a taxa de aprendizado se um agendador foi fornecido
            if lr_scheduler:
                lr_scheduler.step()
                
        # Calcula a métrica IOU no conjunto de validação
        val_mean_iou = compute_iou(model, val_loader)
        
        # Armazena as métricas e a perda para esta época
        loss_history.append(np.array(losses).mean())
        train_history.append(np.array(train_iou).mean())
        val_history.append(val_mean_iou)
        
        # Aplica o critério de parada antecipada (early stopping)
        early_stopping(np.array(losses).mean(), val_mean_iou)
        
        # Verifica se o critério de parada antecipada foi atingido
        if early_stopping.early_stop:
            print("Early stopping na epoch:", i)
            break
        
        # Exibe as métricas e a perda para esta época
        print("Epoch [%d]" % (epoch))
        print("Erro Médio em Treino:", np.array(losses).mean(), 
              "\nDICE Médio em Treino:", np.array(train_iou).mean(), 
              "\nDICE Médio em Validação:", val_mean_iou)
        
    print("\nTreinamento Concluído!\n")
    
    # Retorna os históricos de perda e métricas
    return loss_history, train_history, val_history

### Otimizador

O otimizador torch.optim.Adamax é uma variação do otimizador Adam no PyTorch. Enquanto o otimizador Adam utiliza estimativas do primeiro momento (a média) e do segundo momento (a variância não centralizada) das gradientes, o Adamax apenas utiliza uma estimativa do infinito momento (norma máxima) das gradientes. Esta abordagem foi introduzida na mesma publicação original do Adam e é considerada uma variante que pode ser mais robusta em alguns cenários em relação às estimativas do segundo momento do otimizador Adam.

De maneira mais técnica, enquanto o otimizador Adam usa a norma L2 dos gradientes para escalar a taxa de aprendizado, o Adamax usa a norma L∞ (norma infinita).

Os hiperparâmetros para o otimizador Adamax são semelhantes aos do Adam. A fórmula de atualização para o Adamax é diferente, mas ele ainda possui parâmetros como a taxa de aprendizado (lr), os coeficientes de decaimento (betas) e um termo para estabilidade numérica (eps).

Na prática, embora o Adam seja mais popular e frequentemente mostre um desempenho melhor em uma ampla variedade de tarefas, o Adamax pode ser útil em cenários onde o Adam é instável ou não converge bem. Vale a pena experimentar ambas as variações quando você estiver otimizando um modelo de Deep Learning para entender qual funciona melhor para sua aplicação específica.

In [None]:
# Otimizador
unet_optimizer = torch.optim.Adamax(modelo_unet_padrao.parameters(), weight_decay = 1e-2)

### Treinamento

In [None]:
# Número de épocas
num_ep = 50

In [None]:
%%time
unet_loss_history, unet_train_history, unet_val_history = treina_modelo("UNet_Padrao", 
                                                                        modelo_unet_padrao, 
                                                                        dl_treino, 
                                                                        dl_valid, 
                                                                        bce_dice_loss, 
                                                                        unet_optimizer, 
                                                                        False, 
                                                                        num_ep)

### Avaliação do Modelo em Treino

In [None]:
# Função para plot do histórico de treino e validação
def plot_model_history(model_name, train_history, val_history, num_epochs):
    
    x = np.arange(num_epochs)
    fig = plt.figure(figsize = (10, 6))
    plt.plot(x, train_history, label = 'DICE em Treino', lw = 3, c = "springgreen")
    plt.plot(x, val_history, label = 'DICE em Validação', lw = 3, c = "deeppink")
    plt.title(f"{model_name}", fontsize = 15)
    plt.legend(fontsize = 12)
    plt.xlabel("Epoch", fontsize = 15)
    plt.ylabel("DICE", fontsize = 15)
    fn = str(int(time.time())) + ".png"
    plt.show()

In [None]:
plot_model_history("UNet_Padrao", unet_train_history, unet_val_history, num_ep)

## Avaliação nos Dados de Teste

In [None]:
# Calcula o IoU em teste
test_iou = compute_iou(modelo_unet_padrao, dl_teste)
print(f"""Modelo UNet Padrão\nMédia de IoU nas imagens de teste - {np.around(test_iou, 2)*100}%""")

In [None]:
# Extrai uma imagem de teste de forma aleatória. Extraímos imagens com diagnóstico igual a 1 (o que nos interessa)
amostra_teste = df_teste[df_teste["diagnostico"] == 1].sample(1).values[0]
print(amostra_teste)

# Carrega imagem e máscara
image = cv2.resize(cv2.imread(amostra_teste[1]), (128, 128))
mask = cv2.resize(cv2.imread(amostra_teste[2]), (128, 128))

# Faz a previsão (os pixels da máscara são a previsão do modelo)
pred = torch.tensor(image.astype(np.float32) / 255.).unsqueeze(0).permute(0,3,1,2)
pred = modelo_unet_padrao(pred.to(device))
pred = pred.detach().cpu().numpy()[0, 0, :, :]

# Plot
fig, ax = plt.subplots(nrows = 1, ncols = 2, figsize = (8, 8))

ax[0].imshow(image)
ax[0].set_title("Imagem Original")

ax[1].imshow(pred)
ax[1].set_title("Máscara Prevista")

plt.show()

## Modelagem com Arquitetura U-Net e ResNet Backbone

https://arxiv.org/pdf/2004.05645.pdf

https://arxiv.org/abs/2204.12084

In [None]:
# Classe ConvRelu
class ConvRelu(nn.Module):
    
    def __init__(self, in_channels, out_channels, kernel, padding):
        super().__init__()
        
        self.convrelu = nn.Sequential(nn.Conv2d(in_channels, 
                                                out_channels, 
                                                kernel, 
                                                padding = padding),
                                      nn.ReLU(inplace = True))
        
    def forward(self, x):
        x = self.convrelu(x)
        return x

In [None]:
# Classe DecoderBlock
class DecoderBlock(nn.Module):
    
    def __init__(self, in_channels, out_channels):
        
        super().__init__()
        
        self.conv1 = ConvRelu(in_channels, in_channels//4, 1, 0)
        
        self.deconv = nn.ConvTranspose2d(in_channels//4, 
                                         in_channels//4, 
                                         kernel_size = 4, 
                                         stride = 2, 
                                         padding = 1, 
                                         output_padding = 0)
        
        self.conv2 = ConvRelu(in_channels//4, out_channels, 1, 0)
        
    def forward(self, x):
        x = self.conv1(x)
        x = self.deconv(x)
        x = self.conv2(x)
        
        return x

In [None]:
# Classe ResNeXtUNet
class ResNeXtUNet(nn.Module):
    
    def __init__(self, n_classes = 1):
        super().__init__()
        
        self.base_model = resnext50_32x4d(pretrained = True)
        self.base_layers = list(self.base_model.children())
        filters = [4*64, 4*128, 4*256, 4*512]
        
        self.encoder0 = nn.Sequential(*self.base_layers[:3])
        self.encoder1 = nn.Sequential(*self.base_layers[4])
        self.encoder2 = nn.Sequential(*self.base_layers[5])
        self.encoder3 = nn.Sequential(*self.base_layers[6])
        self.encoder4 = nn.Sequential(*self.base_layers[7])
        
        self.decoder4 = DecoderBlock(filters[3], filters[2])
        self.decoder3 = DecoderBlock(filters[2], filters[1])
        self.decoder2 = DecoderBlock(filters[1], filters[0])
        self.decoder1 = DecoderBlock(filters[0], filters[0])
        
        self.last_conv0 = ConvRelu(256, 128, 3, 1)
        self.last_conv1 = nn.Conv2d(128, n_classes, 3, padding = 1)
        
    def forward(self, x):
        
        x = self.encoder0(x)
        e1 = self.encoder1(x)
        e2 = self.encoder2(e1)
        e3 = self.encoder3(e2)
        e4 = self.encoder4(e3)
        
        d4 = self.decoder4(e4) + e3
        d3 = self.decoder3(d4) + e2
        d2 = self.decoder2(d3) + e1
        d1 = self.decoder1(d2)
        
        out = self.last_conv0(d1)
        out = self.last_conv1(out)
        
        out = torch.sigmoid(out)
        
        return out

In [None]:
# Cria instância da classe
modelo_resnet_unet = ResNeXtUNet().to(device)

In [None]:
# Arquitetura final
modelo_resnet_unet.parameters

### Treinamento

In [None]:
# Otimizador
resnextunet_optimizer = torch.optim.Adamax(modelo_resnet_unet.parameters(), weight_decay = 1e-3)

In [None]:
# Número de épocas
num_ep = 50

In [None]:
%%time
resnextunet_lh, resnextunet_th, resnextunet_vh = treina_modelo("ResNeXtUNet", 
                                                               modelo_resnet_unet, 
                                                               dl_treino, 
                                                               dl_valid, 
                                                               bce_dice_loss, 
                                                               resnextunet_optimizer, 
                                                               False, 
                                                               num_ep)

### Avaliação do Modelo em Treino

In [None]:
plot_model_history("ResNeXtUNet", resnextunet_th, resnextunet_vh, num_ep)

## Avaliação nos Dados de Teste

In [None]:
# Calcula o IoU nos dados de teste
test_iou = compute_iou(modelo_resnet_unet, dl_teste)
print(f"""Modelo ResNeXtUNet\nMédia de IoU nos dados de teste - {np.around(test_iou, 2)*100}%""")

In [None]:
# Extrai uma imagem de teste de forma aleatória. Extraímos imagens com diagnóstico igual a 1 (o que nos interessa)
amostra_teste = df_teste[df_teste["diagnostico"] == 1].sample(1).values[0]
print(amostra_teste)

# Carrega imagem e máscara
image = cv2.resize(cv2.imread(amostra_teste[1]), (128, 128))
mask = cv2.resize(cv2.imread(amostra_teste[2]), (128, 128))

# Faz a previsão (os pixels da máscara são a previsão do modelo)
pred = torch.tensor(image.astype(np.float32) / 255.).unsqueeze(0).permute(0,3,1,2)
pred = modelo_resnet_unet(pred.to(device))
pred = pred.detach().cpu().numpy()[0, 0, :, :]

# Plot
fig, ax = plt.subplots(nrows = 1, ncols = 2, figsize = (8, 8))

ax[0].imshow(image)
ax[0].set_title("Imagem Original")

ax[1].imshow(pred)
ax[1].set_title("Máscara Prevista")

plt.show()

# Fim