In [3]:
import torch
import torch.nn as nn

# Definición de la clase UNet (la versión adaptada del artículo base de referencia)
class UNet(nn.Module):
    def __init__(self):
        super(UNet, self).__init__()
        
        self.conv1 = self._double_conv(1, 64) # Primer bloque de convolución: convierte de 1 canal (imagen en escala de grises) a 64 canales
        self.pool1 = nn.MaxPool2d(2)    # MaxPooling: reduce el tamaño espacial a la mitad
        
        self.conv2 = self._double_conv(64, 128) # Segundo bloque de convolución: convierte de 64 a 128 canales
        self.drop1 = nn.Dropout()   # Dropout para regularización (reduce el overfitting)
        self.pool2 = nn.MaxPool2d(2) # MaxPooling: reduce el tamaño espacial a la mitad nuevamente
        
        self.conv3 = self._double_conv(128, 256)  # Tercer bloque de convolución (parte más profunda de la red): convierte de 128 a 256 canales
        
        self.upconv1 = nn.Sequential(  # Primera capa de deconvolución para la expansión (Decoder)
          nn.Dropout(), # Dropout para regularización
          nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2),  # Deconvolución para aumentar el tamaño espacial
          nn.ReLU(inplace=True)  # Activación ReLU
        )
        self.conv4 = self._double_conv(256, 128)   # Bloque de convolución para procesar los datos combinados tras la concatenación
        
        self.upconv2 = nn.Sequential(  # Segunda capa de deconvolución
          nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2), # Deconvolución
          nn.ReLU(inplace=True) # Activación ReLU
        )        
        self.conv5 = self._double_conv(128, 64)  # Bloque de convolución para procesar los datos combinados tras la segunda concatenación

        self.output = nn.Conv2d(64, 1, kernel_size=1)  # Capa final de convolución: reduce a un solo canal de salida
        
    def _double_conv(self, in_channels, out_channels):   #Función que nos permite realizar dos convoluciones seguidas con activaciones ReLU.
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True)
        )
        
    def forward(self, x):   # Función que representa el flujo de la arquitectura
        conv1 = self.conv1(x)  
        pool1 = self.pool1(conv1)
        
        conv2 = self.conv2(pool1)
        drop1 = self.drop1(conv2)
        pool2 = self.pool2(drop1)
        
        conv3 = self.conv3(pool2)
        
        upconv1 = self.upconv1(conv3)
        cat1 = torch.cat([upconv1, conv2], dim=1) # Primera concatenación con características del Encoder
        conv4 = self.conv4(cat1)
        
        upconv2 = self.upconv2(conv4)
        cat2 = torch.cat([upconv2, conv1], dim=1) # Segunda concatenación con características del Encoder
        conv5 = self.conv5(cat2)
        
        output = self.output(conv5)
        
        return torch.sigmoid(output)  # Salida: pasa por una convolución y luego se aplica una función sigmoide
      
model_unet = UNet()

In [None]:
import torch
import torch.nn as nn

# Definición de la clase SegNet (la versión adaptada del artículo base de referencia)
class SegNet(nn.Module):
    def __init__(self, num_classes):
        super(SegNet, self).__init__()

        self.conv1_1 = nn.Conv2d(1, 64, 3, padding=1) # Primer bloque de convolución: convierte de 1 canal (imagen en escala de grises) a 64 canales
        self.conv1_2 = nn.Conv2d(64, 128, 3, padding=1) # Convierte de 64 a 128 canales
        self.max_pooling1 = nn.MaxPool2d(2, stride=2, return_indices=True) # MaxPooling con índices para "unpooling"
        self.conv2_1 = nn.Conv2d(128, 256, 3, padding=1) # Segundo bloque de convolución: de 128 a 256 canales
        self.conv2_2 = nn.Conv2d(256, 512, 3, padding=1) # de 256 a 512 canales
        self.max_pooling2 = nn.MaxPool2d(2, stride=2, return_indices=True) # Segundo MaxPooling con índices
        self.conv3_1 = nn.Conv2d(512, 512, 3, padding=1)  # Tercer bloque de convolución:mantiene 512 canales
        self.conv3_2 = nn.Conv2d(512, 512, 3, padding=1)  
        self.max_pooling3 = nn.MaxPool2d(2, stride=2, return_indices=True) # Tercer MaxPooling con índices

        # Decoder
        self.max_unpooling1 = nn.MaxUnpool2d(2, stride=2)  # Primer "unpooling" para revertir el último MaxPooling
        self.conv4_1 = nn.Conv2d(512, 512, 3, padding=1)  # Convolución posterior al "unpooling"
        self.conv4_2 = nn.Conv2d(512, 512, 3, padding=1) # Segunda convolución del bloque
        self.max_unpooling2 = nn.MaxUnpool2d(2, stride=2) # Segundo "unpooling"
        self.conv5_1 = nn.Conv2d(512, 256, 3, padding=1) # Convolución posterior, de 512 a 256 canales
        self.conv5_2 = nn.Conv2d(256, 128, 3, padding=1) # Segunda convolución del bloque, de 256 a 128 canales
        self.max_unpooling3 = nn.MaxUnpool2d(2, stride=2) # Tercer "unpooling"
        self.conv6_1 = nn.Conv2d(128, 64, 3, padding=1) # Convolución posterior, de 128 a 64 canales
        self.conv6_2 = nn.Conv2d(64, num_classes, 3, padding=1) # Última convolución, de 64 a num_classes

        # Capas necesarias en ambas etapas
        self.relu = nn.ReLU()  # Activación ReLU
        self.sigmd = nn.Sigmoid() #Activación sigmoide
        self.batchn4 = nn.BatchNorm2d(512) # Normalización para canales de 512
        self.batchn3 = nn.BatchNorm2d(256) # Normalización para canales de 256
        self.batchn2 = nn.BatchNorm2d(128) # Normalización para canales de 128
        self.batchn1 = nn.BatchNorm2d(64) # Normalización para canales de 64
        self.batchn0 = nn.BatchNorm2d(1) # Normalización para canal de entrada

    def forward(self, img):   # Flujo de la arquitectura
        img = self.conv1_1(img)
        img = self.batchn1(img)
        img = self.relu(img)
        img = self.conv1_2(img)
        img = self.batchn2(img)
        img = self.relu(img)
        img, ind1 = self.max_pooling1(img)

        img = self.conv2_1(img)
        img = self.batchn3(img)
        img = self.relu(img)
        img = self.conv2_2(img)
        img = self.batchn4(img)
        img = self.relu(img)
        img, ind2 = self.max_pooling2(img)

        img = self.conv3_1(img)
        img = self.batchn4(img)
        img = self.relu(img)
        img = self.conv3_2(img)
        img = self.batchn4(img)
        img = self.relu(img)
        img, ind3 = self.max_pooling3(img)

        img = self.max_unpooling1(img, ind3)
        img = self.conv4_1(img)
        img = self.batchn4(img)
        img = self.relu(img)
        img = self.conv4_2(img)
        img = self.batchn4(img)
        img = self.relu(img)

        img = self.max_unpooling2(img, ind2)
        img = self.conv5_1(img)
        img = self.batchn3(img)
        img = self.relu(img)
        img = self.conv5_2(img)
        img = self.batchn2(img)
        img = self.relu(img)

        img = self.max_unpooling3(img, ind1)
        img = self.conv6_1(img)
        img = self.batchn1(img)
        img = self.relu(img)
        img = self.conv6_2(img)
        img = self.sigmd(img)

        return img

num_classes = 1   # Número de clases en la salida
model_segnet = SegNet(num_classes)

In [None]:
import torch
import torch.nn as nn

# Definición de la clase UNetCompleto (la versión original de Ronneberger et al.) 
class UNetCompleto(nn.Module):    #Es igual que el UNet de antes pero con más capas convolucionales
    def __init__(self, num_classes=1):
        super(UNetCompleto, self).__init__()

        # Ruta Contractiva
        self.conv1 = self._double_conv(1, 64) 
        self.pool1 = nn.MaxPool2d(2)           
        
        self.conv2 = self._double_conv(64, 128)
        self.pool2 = nn.MaxPool2d(2)           
        
        self.conv3 = self._double_conv(128, 256)
        self.pool3 = nn.MaxPool2d(2)          
        
        self.conv4 = self._double_conv(256, 512)
        self.pool4 = nn.MaxPool2d(2)           
        
        self.bottleneck = self._double_conv(512, 1024)
        
        # Ruta Expansiva
        self.up6 = nn.ConvTranspose2d(1024, 512, kernel_size=2, stride=2) 
        self.conv6 = self._double_conv(1024, 512)                        
        
        self.up7 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2)   
        self.conv7 = self._double_conv(512, 256)                         
        
        self.up8 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2) 
        self.conv8 = self._double_conv(256, 128)                          
        
        self.up9 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
        self.conv9 = self._double_conv(128, 64)                         
        
        self.final_conv = nn.Conv2d(64, num_classes, kernel_size=1)      
    def _double_conv(self, in_channels, out_channels):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )
        
    def forward(self, x):
        conv1 = self.conv1(x)
        pool1 = self.pool1(conv1)
        
        conv2 = self.conv2(pool1)
        pool2 = self.pool2(conv2)
        
        conv3 = self.conv3(pool2)
        pool3 = self.pool3(conv3)
        
        conv4 = self.conv4(pool3)
        pool4 = self.pool4(conv4)
        
        bottleneck = self.bottleneck(pool4)

        up6 = self.up6(bottleneck)
        up6 = torch.cat([up6, conv4], dim=1) 
        conv6 = self.conv6(up6)
        
        up7 = self.up7(conv6)
        up7 = torch.cat([up7, conv3], dim=1)
        conv7 = self.conv7(up7)
        
        up8 = self.up8(conv7)
        up8 = torch.cat([up8, conv2], dim=1)
        conv8 = self.conv8(up8)
        
        up9 = self.up9(conv8)
        up9 = torch.cat([up9, conv1], dim=1)
        conv9 = self.conv9(up9)
        
        # Capa final
        output = self.final_conv(conv9)
        return torch.sigmoid(output)

model_unet_completo = UNetCompleto(num_classes=1)

In [None]:
from torchinfo import summary

summary(model_unet, input_size=(8, 1, 256, 256)) # 1,861,697 params entrenables
summary(model_segnet, input_size=(8, 1, 256, 256)) # 12,540,291 parametros entrenables

In [None]:
# Definimos las rutas de los directorios de imágenes y máscaras
images_dir = 'data/covid19/im/256'
masks_dir = 'data/covid19/mask/binary/256'
# images_dir = 'data/retina/images'
# masks_dir = 'data/retina/masks'

from torchvision.transforms import v2 # Importamos el módulo de transformaciones para preprocesamiento


# Cconjunto de transformaciones para las imágenes y máscaras
transform = v2.Compose([
    v2.Resize((256, 256)),
    v2.RandomVerticalFlip(p=0.5),            
    v2.RandomHorizontalFlip(p=0.5),          
    v2.RandomAffine(degrees=0, shear=[-10, 10, -10, 10]),
    v2.ToImage(),
    v2.ToDtype(torch.float32, scale=True)
])

dataset = SegmentationDataset(images_dir, masks_dir, transform=transform)  # Creamos un dataset de segmentación con las imágenes, máscaras y transformaciones definidas
data_loader = DataLoader(dataset, batch_size=8, shuffle=True)  # Cargamos el dataset completo en un DataLoader para iterar sobre los datos en lotes (batches) de manera eficiente

# Dividimos el dataset en conjuntos de entrenamiento, validación y prueba
train_size = int(0.72 * len(dataset)) # 0.72
val_size = int(0.1 * len(dataset)) # 0.10
test_size = len(dataset) - train_size - val_size # 0.18
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size]) # Realizamos la partición utilizando random_split

batch_size = 8  # Definimos el tamaño de lote (batch size)

# Creamos DataLoaders para cada conjunto
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

# Imprimimos información sobre la distribución de los datos
print(f"Dataset (total): {len(dataset)} samples")
print(f"Train dataset: {len(train_dataset)} samples")
print(f"Validation dataset: {len(val_dataset)} samples")
print(f"Test dataset: {len(test_dataset)} samples")

In [None]:
import matplotlib.pyplot as plt
import random

fig, axs = plt.subplots(2, 2, figsize=(10, 10))

for i, (image, mask) in enumerate(random.sample(list(dataset), 2)):
  axs[i, 0].imshow(image.squeeze(0), cmap='gray')
  axs[i, 0].set_title(f'Imagen')
  axs[i, 0].axis('off')
  
  axs[i, 1].imshow(mask.squeeze(0), cmap='gray')
  axs[i, 1].set_title(f'Máscara')
  axs[i, 1].axis('off')
  
plt.tight_layout()
plt.show()

In [None]:
def entrenar(model, device, train_loader, val_loader, epochs=10, lr=0.001, weight_decay=0.0001, step_size=12, gamma=0.95):  #Función para entrenar un modelo de segmentación utilizando PyTorch.
   # Parámetros: modelo a entrenar, dispositivo a utilizar, DataLoader para los datos de entrenamiento, DataLoader para los datos de validación, nº de épocas de entrenamiento, tasa de aprendizaje inicial, factor de penalización para la regularización L2, nº de épocas después de las cuales se reduce la tasa de aprendizaje, factor multiplicativo para ajustar la tasa de aprendizaje
    model = model.to(device) # Enviamos el modelo al dispositivo (GPU o CPU)
    print("training on: ", device)
    criterion = nn.BCELoss()   # Definimos la función de pérdida basada en el error cuadrático para problemas de segmentación binaria
    optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay) # Optimización con Adam
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=step_size, gamma=gamma) # Scheduler para ajustar dinámicamente la tasa de aprendizaje
    
    val_losses, train_losses = [], []  # Inicializamos listas para almacenar las pérdidas de entrenamiento y validación
    
    for epoch in range(epochs):  # Por cada época
        model.train() # Ponemos el modelo en modo de entrenamiento
        train_loss = 0.0  # Inicializamos la pérdida de entrenamiento en 0
        for images, masks in train_loader:  # Iteramos sobre los lotes de datos de entrenamiento
            images, masks = images.to(device), masks.to(device)  # Enviamos las imágenes y máscaras al dispositivo
            
            outputs = model(images)     # Obtenemos las predicciones del modelo          
            loss = criterion(outputs, masks)  # Calculamos la pérdida entre las predicciones y las máscaras verdaderas

             # Realizamos la retropropagación y actualización de pesos
            optimizer.zero_grad() # Limpiamos los gradientes acumulados
            loss.backward() # Calculamos los gradientes
            optimizer.step() # Actualizamos los pesos
            
            train_loss += loss.item()   # Acumulamos la pérdida de entrenamiento

        # Evaluación del modelo en el conjunto de validación
        model.eval() # Ponemos el modelo en modo de evaluación 
        val_loss = 0.0  # Inicializamos la pérdida de validación en 0
        for images, masks in val_loader:
            images, masks = images.to(device), masks.to(device) # Enviamos los datos al dispositivo
            outputs = model(images)    # Obtenemos las predicciones del modelo
            loss = criterion(outputs, masks)   # Calculamos la pérdida de validación
            val_loss += loss.item()

         # Calculamos las pérdidas promedio de entrenamiento y validación
        avg_train_loss = train_loss / len(train_loader)
        avg_val_loss = val_loss / len(val_loader)

         # Almacenamos las pérdidas promedio
        train_losses.append(avg_train_loss)
        val_losses.append(avg_val_loss)
        print(f"Epoch {epoch+1}/{epochs}, Train Loss: {avg_train_loss:.4f}, Validation Loss: {avg_val_loss:.4f}")

        
        scheduler.step()   # Actualizamos la tasa de aprendizaje según el scheduler
    return (train_losses, val_losses) # Devolvemos las pérdidas para análisis posterior

In [None]:
# determinar el dispositivo
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device {device}")

In [None]:
# entrenar el modelo adaptivo de U-Net
u1_t_losses, u1_v_losses = entrenar(model_unet, device, train_loader, val_loader, epochs=10)
torch.save(model_unet.state_dict(), "model_unet.pth")

In [None]:
# entrenar el modelo adaptivo de SegNet
s1_t_losses, s1_v_losses = entrenar(model_segnet, device, train_loader, val_loader, epochs=10)
torch.save(model_segnet.state_dict(), "model_segnet.pth")

In [None]:
# entrenar el modelo original de U-Net
u2_t_losses, u2_v_losses = entrenar(model_unet_completo, device, train_loader, val_loader, epochs=50)
torch.save(model_unet_completo.state_dict(), "model_unet_completo.pth")

In [None]:
import matplotlib.pyplot as plt

def plot_train_val_losses(u1_t_losses, u1_v_losses):
    plt.figure(figsize=(10, 6))
    plt.plot(u1_t_losses, label='Pérdida de entrenamiento')
    plt.plot(u1_v_losses, label='Pérdida de validación')
    plt.xlabel('Época')
    plt.ylabel('Pérdida')
    plt.legend()
    plt.show()

In [None]:
plot_train_val_losses(u1_t_losses, u1_v_losses)
plot_train_val_losses(s1_t_losses, s1_v_losses)
plot_train_val_losses(u2_t_losses, u2_v_losses)

In [None]:
def print_precisions(model, device, dataloader, model_name):
    model.eval()
    total, correct = 0, 0
    intersection, union = 0, 0

    with torch.no_grad():
        for images, masks in dataloader:
            images = images.to(device)
            masks = masks.to(device)

            outputs = model(images)
            predictions = (outputs > 0.5).float()

            # Calcular la precisión de píxeles
            correct += (predictions == masks).sum().item()
            total += masks.numel()
            
            # Calcular la IoU
            pred_mask = (predictions == 0)
            true_mask = (masks == 0)

            intersection += (pred_mask & true_mask).sum().item()
            union += (pred_mask | true_mask).sum().item()
            
    accuracy = correct / total
    iou = (intersection / union) if union > 0 else 0
    print(f"Modelo: {model_name} - Precisión de píxeles: {accuracy*100:.2f}% - IoU: {iou*100:.2f}%")

In [None]:
print_precisions(model_unet, device, test_loader, 'U-Net')  
print_precisions(model_segnet, device, test_loader, 'SegNet')
print_precisions(model_unet_completo, device, test_loader, 'U-Net Completo')  

In [None]:
from matplotlib import pyplot as plt

def display_predictions(model, device, dataloader): 
  model.eval()
  
  with torch.no_grad():
    fig, axs = plt.subplots(3, 3, figsize=(15, 15))
    
    for images, masks in dataloader:
      images, masks = images.to(device), masks.to(device)
      outputs = model(images)
      outputs = (outputs > 0.5).float()
      
      for i in range(3):
        image = images[i].squeeze().cpu()
        mask = masks[i].squeeze().cpu()
        pred_mask = outputs[i].squeeze().cpu()
        
        axs[i, 0].imshow(image, cmap='gray')
        axs[i, 0].set_title(f'Imagen')
        axs[i, 0].axis('off')
        
        axs[i, 1].imshow(mask, cmap='gray')
        axs[i, 1].set_title(f'Máscara')
        axs[i, 1].axis('off')
        
        axs[i, 2].imshow(pred_mask, cmap='gray')
        axs[i, 2].set_title(f'Predicción')
        axs[i, 2].axis('off')
        
      break
    
    plt.tight_layout()
    plt.show()

In [None]:
display_predictions(model_unet, device, test_loader)
display_predictions(model_segnet, device, test_loader)
display_predictions(model_unet_completo, device, test_loader)