In [1]:
import os

import numpy as np

from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset

### Modelo

Pelo que entendi, U-Net é uma rede neural convolucional totalmente convolucional (FCN) usada em segmentação semântica de imagens afim de prever uma classe para cada pixel da imagem.

A arquitetura segue o formato de “U”, composta por duas partes principais:

- Caminho de contração (encoder):
- Camadas `DoubleConv` e `Down` reduzem progressivamente a resolução espacial (com MaxPool2d) enquanto aumentam o número de canais.
- Extrai características de alto nível.

Caminho de expansão (decoder):
- Camadas `Up` aumentam a resolução com `Upsample` (ou `ConvTranspose2d`) e combinam (concatenação) os recursos do encoder via skip connections.
- Permite que a rede recupere detalhes espaciais perdidos na compressão.

Camada de saída (`OutConv`):
- Usa `Conv2d(kernel_size=1)` para mapear o resultado final para n_classes, gerando o mapa de segmentação.

O modelo recebe uma imagem, comprime suas informações em níveis profundos de abstração e reconstrói uma saída pixel a pixel.

In [3]:
class DoubleConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.double_conv(x)

class Down(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.maxpool_conv = nn.Sequential(
            nn.MaxPool2d(2),
            DoubleConv(in_channels, out_channels)
        )

    def forward(self, x):
        return self.maxpool_conv(x)

class Up(nn.Module):
    def __init__(self, in_channels, out_channels, bilinear=True):
        super().__init__()

        if bilinear:
            self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
            self.conv = DoubleConv(in_channels, out_channels)
        else:
            self.up = nn.ConvTranspose2d(in_channels , in_channels // 2, kernel_size=2, stride=2)
            self.conv = DoubleConv(in_channels, out_channels)


    def forward(self, x1, x2):
        x1 = self.up(x1)
        
        diffY = x2.size()[2] - x1.size()[2]
        diffX = x2.size()[3] - x1.size()[3]

        x1 = nn.functional.pad(x1, [diffX // 2, diffX - diffX // 2, diffY // 2, diffY - diffY // 2])

        x = torch.cat([x2, x1], dim=1)
        return self.conv(x)

class OutConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(OutConv, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1)

    def forward(self, x):
        return self.conv(x)

class UNet(nn.Module):
    def __init__(self, n_channels, n_classes, bilinear=True):
        super(UNet, self).__init__()
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.bilinear = bilinear

        self.inc = DoubleConv(n_channels, 64)
        self.down1 = Down(64, 128)
        self.down2 = Down(128, 256)
        self.down3 = Down(256, 512)
        factor = 2 if bilinear else 1
        self.down4 = Down(512, 1024 // factor)
        self.up1 = Up(1024, 512 // factor, bilinear)
        self.up2 = Up(512, 256 // factor, bilinear)
        self.up3 = Up(256, 128 // factor, bilinear)
        self.up4 = Up(128, 64, bilinear)
        self.outc = OutConv(64, n_classes)

    def forward(self, x):
        x1 = self.inc(x)
        x2 = self.down1(x1)
        x3 = self.down2(x2)
        x4 = self.down3(x3)
        x5 = self.down4(x4)
        x = self.up1(x5, x4)
        x = self.up2(x, x3)
        x = self.up3(x, x2)
        x = self.up4(x, x1)
        logits = self.outc(x)
        return logits

### Geração de Mapas de Saliência

O mapa de saliência é a própria probabilidade máxima, que indica a "confiança" do modelo em sua segmentação.<br>
Áreas com alta probabilidade máxima são consideradas mais "saliêntes" para a decisão de segmentação.

In [4]:
class SaliencyMapGenerator:
    def __init__(self, model):
        self.model = model

    def generate(self, input_tensor):
        with torch.no_grad():
            logits = self.model(input_tensor)

        probabilities = torch.softmax(logits, dim=1)   # Converte logits para probabilidades (softmax)   
        max_probs, _ = torch.max(probabilities, dim=1) # Encontra a probabilidade máxima para cada pixel
        saliency_map = max_probs                       # Normaliza o mapa de saliência (0-1)

        return saliency_map.cpu().numpy()

### Identificação de Áreas Relevantes e Re-treinamento

A função `identify_relevant_areas` identifica os pixels mais relevantes com base no mapa de saliência e retorna uma máscara binária.

A função `apply_spatial_reduction` simula a redução espacial (como no NeuLens) aplicando uma máscara. 

A máscara é (B, H, W). Expande para (B, 1, H, W) para multiplicação e no final simplesmente retorna a máscara para ser usada como peso na função de perda

Em um cenário real, isso envolveria processar apenas os patches relevantes com uma sub-rede de alta resolução e o restante com uma sub-rede de baixa resolução/compressão. Aqui, vamos simular o "reuso de dados" ponderando a perda durante o re-treinamento.

In [5]:
def identify_relevant_areas(saliency_map, threshold=0.8):
    relevant_mask = (saliency_map > threshold).astype(np.uint8)
    return relevant_mask

def apply_spatial_reduction(image_tensor, relevant_mask):
    mask_tensor = torch.from_numpy(relevant_mask).unsqueeze(1).float().to(image_tensor.device)
    return mask_tensor

### Função de Perda Ponderada

Função de perda que aplica os pesos espaciais calculado a partir da máscara de saliência para dar mais importância às áreas relevantes.

- prediction: (B, C, H, W)
- target: (B, H, W)
- weight_mask: (B, 1, H, W) - Máscara de pesos (saliência)

Calcula a perda média apenas sobre os pixels relevantes (onde weight_mask > 0)

Para evitar divisão por zero, adicionamos um pequeno epsilon

Retorna a média normal se não houver pesos

In [None]:
class WeightedCrossEntropyLoss(nn.Module):
    def __init__(self):
        super().__init__()
        self.base_loss = nn.CrossEntropyLoss(reduction='none')

    def forward(self, prediction, target, weight_mask):
        loss_per_pixel = self.base_loss(prediction, target)     # (B, H, W)
        weighted_loss = loss_per_pixel * weight_mask.squeeze(1) # (B, H, W)
        total_weight = weight_mask.sum()
        if total_weight == 0:
            return weighted_loss.mean()

        return weighted_loss.sum() / total_weight

### Simulação de Dados e Treinamento

Aqui temos a parte da montagem do Dataset de simulação para segmentação que retorna um tensor de imagem e um tensor de máscara de segmentação.

In [7]:
class DummySegmentationDataset(Dataset):
    def __init__(self, num_samples=100, img_size=128, n_classes=2):
        self.num_samples = num_samples
        self.img_size = img_size
        self.n_classes = n_classes
        self.transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
        ])

    def __len__(self):
        return self.num_samples

    def __getitem__(self, idx):
        # Imagem simulada (3 canais)
        image = np.random.rand(self.img_size, self.img_size, 3).astype(np.float32)
        image = Image.fromarray((image * 255).astype(np.uint8))
        image_tensor = self.transform(image)

        # Máscara de segmentação simulada (n_classes)
        # O target para CrossEntropyLoss deve ser (H, W) com valores de classe (0 a n_classes-1)
        mask = np.random.randint(0, self.n_classes, (self.img_size, self.img_size), dtype=np.int64)
        mask_tensor = torch.from_numpy(mask)

        return image_tensor, mask_tensor

def train_one_epoch(model, dataloader, optimizer, criterion, device, iteration=0, saliency_generator=None, threshold=0.8):
    model.train()
    running_loss = 0.0
    for images, masks in dataloader:
        images = images.to(device)
        masks = masks.to(device)

        optimizer.zero_grad()

        # Passo 1: Treinamento com segmentação semântica (normal ou ponderado)
        outputs = model(images)

        if iteration == 0:
            # Iteração 0: Treinamento tradicional (Passo 1 do seu pipeline)
            loss = nn.CrossEntropyLoss()(outputs, masks)
            weight_mask = None
        else:
            # Iterações > 0: Treinamento ponderado (Passo 4 do seu pipeline)
            # 2. Forma o mapa de saliência
            saliency_map_np = saliency_generator.generate(images)

            # 3. Identifica as áreas mais relevantes
            relevant_mask_np = identify_relevant_areas(saliency_map_np, threshold)

            # 4. Aplica a redução espacial (cria a máscara de pesos)
            weight_mask = apply_spatial_reduction(images, relevant_mask_np)
            weight_mask = weight_mask.to(device)

            # Usa a função de perda ponderada
            loss = criterion(outputs, masks, weight_mask)

        loss.backward()
        optimizer.step()

        running_loss += loss.item() * images.size(0)

    return running_loss / len(dataloader.dataset)

def evaluate_model(model, dataloader, device):
    model.eval()
    total_correct = 0
    total_pixels = 0
    with torch.no_grad():
        for images, masks in dataloader:
            images = images.to(device)
            masks = masks.to(device)

            outputs = model(images)
            _, predicted = torch.max(outputs, 1)

            total_pixels += masks.numel()
            total_correct += (predicted == masks).sum().item()

    accuracy = total_correct / total_pixels
    return accuracy

def iterative_training_pipeline(model, train_loader, val_loader, device, num_iterations=5, epochs_per_iteration=5, saliency_threshold=0.8):
    """
    Implementa o pipeline de treinamento iterativo.
    """
    print(f"Iniciando pipeline de treinamento iterativo em {device}...")

    # Otimizador e Critério de Perda
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    weighted_criterion = WeightedCrossEntropyLoss()
    saliency_generator = SaliencyMapGenerator(model)

    best_accuracy = 0.0

    for iteration in range(num_iterations):
        print(f"\n--- Iteração {iteration} ---")

        if iteration == 0:
            print("Fase 1: Treinamento inicial com segmentação semântica (tradicional).")
        else:
            print("Fase 2-5: Re-treinamento com pesos de saliência.")

        for epoch in range(epochs_per_iteration):
            loss = train_one_epoch(
                model,
                train_loader,
                optimizer,
                weighted_criterion,
                device,
                iteration=iteration,
                saliency_generator=saliency_generator,
                threshold=saliency_threshold
            )
            print(f"  Época {epoch+1}/{epochs_per_iteration}, Perda: {loss:.4f}")

        # Avaliação
        accuracy = evaluate_model(model, val_loader, device)
        print(f"  Acurácia de Validação (Pixel-wise): {accuracy:.4f}")

        # 5. Está bom? (Critério de parada: melhoria ou número máximo de iterações)
        if accuracy > best_accuracy:
            best_accuracy = accuracy
            print("  Melhoria na acurácia. Continuar.")
        else:
            # Critério de parada simplificado: se a acurácia não melhorar
            print("  Acurácia não melhorou. Fim do treinamento iterativo.")
            # Se você quisesse um critério mais robusto, poderia verificar se
            # a acurácia se estabilizou ou se o número máximo de iterações foi atingido.
            # Aqui, vamos continuar até o final das iterações para demonstração.
            # break # Descomente para parada antecipada

    print("\nPipeline de treinamento concluído.")
    print(f"Melhor Acurácia de Validação: {best_accuracy:.4f}")

### Execução Principal

- N_CLASSES -> Número de classes de segmentação
- NUM_ITERATIONS -> Número de iterações do pipeline (Passos 2-5)
- EPOCHS_PER_ITERATION -> Número de épocas por iteração

In [8]:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
N_CLASSES = 5
IMG_SIZE = 128
BATCH_SIZE = 4
NUM_SAMPLES = 100
NUM_ITERATIONS = 3
EPOCHS_PER_ITERATION = 2

model = UNet(n_channels=3, n_classes=N_CLASSES).to(DEVICE)

train_dataset = DummySegmentationDataset(num_samples=NUM_SAMPLES, img_size=IMG_SIZE, n_classes=N_CLASSES)
val_dataset = DummySegmentationDataset(num_samples=20, img_size=IMG_SIZE, n_classes=N_CLASSES)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)

iterative_training_pipeline(
    model,
    train_loader,
    val_loader,
    DEVICE,
    num_iterations=NUM_ITERATIONS,
    epochs_per_iteration=EPOCHS_PER_ITERATION
)

Iniciando pipeline de treinamento iterativo em cuda...

--- Iteração 0 ---
Fase 1: Treinamento inicial com segmentação semântica (tradicional).
  Época 1/2, Perda: 1.6167
  Época 2/2, Perda: 1.6097
  Acurácia de Validação (Pixel-wise): 0.1998
  Melhoria na acurácia. Continuar.

--- Iteração 1 ---
Fase 2-5: Re-treinamento com pesos de saliência.
  Época 1/2, Perda: 0.0000
  Época 2/2, Perda: 0.0000
  Acurácia de Validação (Pixel-wise): 0.1999
  Melhoria na acurácia. Continuar.

--- Iteração 2 ---
Fase 2-5: Re-treinamento com pesos de saliência.
  Época 1/2, Perda: 0.0000
  Época 2/2, Perda: 0.0000
  Acurácia de Validação (Pixel-wise): 0.1995
  Acurácia não melhorou. Fim do treinamento iterativo.

Pipeline de treinamento concluído.
Melhor Acurácia de Validação: 0.1999


O código gerado simula o pipeline proposto:
1. Treinamento inicial (Iteração 0) com perda CrossEntropy padrão.
2. Nas iterações seguintes, o SaliencyMapGenerator simula a criação do mapa de saliência.
3. A função identify_relevant_areas simula a identificação das áreas mais relevantes.
4. A função apply_spatial_reduction simula a criação de uma máscara de pesos.
5. A WeightedCrossEntropyLoss usa essa máscara para ponderar a perda, dando mais foco às áreas 'saliêntes' no re-treinamento.

Este é um esqueleto funcional que precisa ser adaptado com dados reais e uma implementação de saliência mais sofisticada (e.g., Grad-CAM) para um caso de uso prático.