In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.utils.data import random_split

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import os
from pathlib import Path
from collections import Counter

from PIL import Image

from torchinfo import summary
from torchvision.transforms import v2 as T

from utils import (
    train,
    model_calassification_report,
    plot_taining
)

from typing import Literal, List

---

In [2]:
# Transforms base (sin augmentation) para validaciÃ³n
img_transform = T.Compose([
    T.ToImage(),
    T.Resize((572, 572)),
    T.ToDtype(torch.float32, scale=True)
])

mask_transform = T.Compose([
    T.ToImage(),
    T.Resize((572, 572)),
    T.Grayscale(num_output_channels=1),
    T.ToDtype(torch.float32, scale=False),
])


In [19]:
# Creamos la clase que nos permita cargar test dataset
class SegmentationTestDataset(Dataset):
    def __init__(self, images_dir, img_transform=None):
        self.images_dir = images_dir
        self.img_transform = img_transform

        self.images = sorted(os.listdir(images_dir))

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_path = os.path.join(self.images_dir, self.images[idx])

        img  = Image.open(img_path).convert("RGB")

        if self.img_transform:
            img = self.img_transform(img)

        return img, self.images[idx]  # retornamos tambiÃ©n el nombre del archivo

In [14]:
class DoubleConv(nn.Module):
    """
    Bloque de doble convoluciÃ³n: [Conv â†’ BN â†’ ReLU] x2
    
    SegÃºn el paper U-Net: 2x (3x3 conv + ReLU)
    AÃ±adimos Batch Normalization para estabilidad
    """
    def __init__(self, in_ch: int, out_ch: int, use_bn: bool = True):
        super().__init__()
        layers = []
        
        # Primera convoluciÃ³n
        layers.append(nn.Conv2d(in_ch, out_ch, kernel_size=3, padding=1, bias=not use_bn))
        if use_bn:
            layers.append(nn.BatchNorm2d(out_ch))
        layers.append(nn.ReLU(inplace=True))
        
        # Segunda convoluciÃ³n
        layers.append(nn.Conv2d(out_ch, out_ch, kernel_size=3, padding=1, bias=not use_bn))
        if use_bn:
            layers.append(nn.BatchNorm2d(out_ch))
        layers.append(nn.ReLU(inplace=True))
        
        self.block = nn.Sequential(*layers)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.block(x)

print("âœ… DoubleConv definido")

âœ… DoubleConv definido


In [15]:
class Down(nn.Module):
    """
    Downsampling block: MaxPool â†’ DoubleConv
    """
    def __init__(self, in_ch: int, out_ch: int, use_bn: bool = True):
        super().__init__()
        self.pool = nn.MaxPool2d(2)
        self.conv = DoubleConv(in_ch, out_ch, use_bn=use_bn)

    def forward(self, x):
        x = self.pool(x)
        x = self.conv(x)
        return x

print("âœ… Down block definido")

âœ… Down block definido


In [16]:
class Up(nn.Module):
    """
    Upsampling block: Up-conv â†’ Concat con skip â†’ DoubleConv
    
    Args:
        in_ch: Canales de entrada (de la capa profunda)
        out_ch: Canales de salida
        bilinear: Si True usa interpolaciÃ³n bilinear, si False usa ConvTranspose2d
    """
    def __init__(self, in_ch: int, out_ch: int, bilinear: bool = True, use_bn: bool = True):
        super().__init__()
        
        if bilinear:
            # Upsampling bilinear + conv 1x1 para reducir canales
            self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
            self.reduce = nn.Conv2d(in_ch, in_ch // 2, kernel_size=1)
        else:
            # ConvTranspose2d (up-conv aprendida)
            self.up = nn.ConvTranspose2d(in_ch, in_ch // 2, kernel_size=2, stride=2)
            self.reduce = nn.Identity()
        
        # DoubleConv despuÃ©s de concatenar
        # in_ch // 2 (de up) + out_ch (de skip connection)
        self.conv = DoubleConv(in_ch // 2 + out_ch, out_ch, use_bn=use_bn)

    @staticmethod
    def _pad_to_match(x: torch.Tensor, ref: torch.Tensor) -> torch.Tensor:
        """Ajustar x para que coincida con el tamaÃ±o espacial de ref"""
        diff_y = ref.size(2) - x.size(2)
        diff_x = ref.size(3) - x.size(3)
        if diff_x == 0 and diff_y == 0:
            return x
        # Padding: [left, right, top, bottom]
        return F.pad(x, [diff_x // 2, diff_x - diff_x // 2, 
                        diff_y // 2, diff_y - diff_y // 2])

    def forward(self, x: torch.Tensor, skip: torch.Tensor) -> torch.Tensor:
        x = self.up(x)
        x = self.reduce(x)
        x = self._pad_to_match(x, skip)
        # Concatenar en dimensiÃ³n de canales
        x = torch.cat([skip, x], dim=1)
        x = self.conv(x)
        return x

print("âœ… Up block definido")

âœ… Up block definido


In [7]:
class OutConv(nn.Module):
  def __init__(self, in_ch: int, out_ch: int):
    super().__init__()
    self.conv = nn.Conv2d(in_ch, out_ch, kernel_size=1)

  def forward(self, x):
    return self.conv(x)

In [12]:
class UNet(nn.Module):
    """
    Arquitectura U-Net completa
    
    Args:
        in_channels: NÃºmero de canales de entrada (1 para grayscale, 3 para RGB)
        num_classes: NÃºmero de canales de salida (1 para segmentaciÃ³n binaria)
        base_ch: NÃºmero de filtros en el primer nivel (64 en paper original)
        depth: NÃºmero de niveles de pooling (4 en paper original)
        bilinear: Si True usa upsampling bilinear, si False usa ConvTranspose2d
        use_bn: Si True usa Batch Normalization
    """
    def __init__(self, in_channels=1, num_classes=1, base_ch=64, depth=4, 
                 bilinear=False, use_bn=True):
        super().__init__()
        
        # Calcular canales en cada nivel: [64, 128, 256, 512, 1024]
        chs = [base_ch * (2 ** i) for i in range(depth + 1)]
        
        # ========== ENCODER ==========
        # Primer bloque (sin maxpool)
        self.inc = DoubleConv(in_channels, chs[0], use_bn=use_bn)
        
        # Bloques descendentes con maxpool
        self.downs = nn.ModuleList()
        for i in range(depth):
            self.downs.append(Down(chs[i], chs[i + 1], use_bn=use_bn))
        
        # ========== DECODER ==========
        self.ups = nn.ModuleList()
        for i in reversed(range(depth)):
            self.ups.append(Up(chs[i + 1], chs[i], bilinear=bilinear, use_bn=use_bn))
        
        # ConvoluciÃ³n de salida
        self.outc = OutConv(chs[0], num_classes)
        
        # InicializaciÃ³n de pesos
        self._init_weights()
    
    def _init_weights(self):
        """InicializaciÃ³n Kaiming (He) para capas con ReLU"""
        for m in self.modules():
            if isinstance(m, (nn.Conv2d, nn.ConvTranspose2d)):
                nn.init.kaiming_normal_(m.weight, nonlinearity='relu')
                if m.bias is not None:
                    nn.init.zeros_(m.bias)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
    
    def forward(self, x):
        # ========== ENCODER ==========
        skips = []
        
        # Primer nivel
        x = self.inc(x)
        skips.append(x)
        
        # Descenso con skip connections
        for down in self.downs:
            x = down(x)
            skips.append(x)
        
        # ========== DECODER ==========
        # El Ãºltimo skip es el bottleneck, lo usamos como punto de partida
        x = skips.pop()
        
        # Ascenso con skip connections del encoder
        for up in self.ups:
            skip = skips.pop()
            x = up(x, skip)
        
        # ConvoluciÃ³n de salida
        return self.outc(x)

print("âœ… Arquitectura U-Net completa definida")

âœ… Arquitectura U-Net completa definida


In [10]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [17]:
model = UNet(
    in_channels=3,      
    num_classes=1,      # SegmentaciÃ³n binaria
    base_ch=64,         # Paper original
    depth=4,            # 4 niveles de pooling
    bilinear=True,     # Usar ConvTranspose2d
    use_bn=True         # Batch Normalization
).to(device)

In [18]:
model.load_state_dict(torch.load('unet_epoch_100.pth',map_location=torch.device('cpu')))
model.eval()

  model.load_state_dict(torch.load('unet_epoch_100.pth',map_location=torch.device('cpu')))


UNet(
  (inc): DoubleConv(
    (block): Sequential(
      (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU(inplace=True)
      (3): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (4): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (5): ReLU(inplace=True)
    )
  )
  (downs): ModuleList(
    (0): Down(
      (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      (conv): DoubleConv(
        (block): Sequential(
          (0): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (2): ReLU(inplace=True)
          (3): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (4): Bat

In [20]:
# Instanciamos el dataset y dataloader de test
test_ds = SegmentationTestDataset(
    "test/images",
    img_transform=img_transform
)
test_loader = DataLoader(test_ds, batch_size=16, shuffle=False)

In [21]:
from PIL import Image
import numpy as np
import os

TEST_IMAGES_DIR = "test/images"  # ajustÃ¡ si tu carpeta es distinta

def resize_mask_to_original(pred_mask, img_name):
    """
    pred_mask: mÃ¡scara [H, W] en 0/1 (572x572)
    img_name: nombre del archivo en test/images
    """
    # Abrimos la imagen original para conocer el tamaÃ±o real (p.ej. 800x800)
    img_path = os.path.join(TEST_IMAGES_DIR, img_name)
    with Image.open(img_path) as im:
        w, h = im.size  # PIL da (width, height)

    # Convertimos la mÃ¡scara a imagen 0-255
    mask_img = Image.fromarray((pred_mask * 255).astype(np.uint8))

    # Redimensionamos al tamaÃ±o original usando NEAREST (no inventa grises)
    mask_img = mask_img.resize((w, h), resample=Image.NEAREST)

    # Volvemos a numpy 0/1
    mask_resized = (np.array(mask_img) > 0).astype(np.uint8)  # [h, w]
    return mask_resized

def rle_encode(mask):
    """
    mask: array 2D (H, W) con 0/1
    Devuelve string RLE en formato Kaggle, flatten(order='F')
    """
    pixels = mask.flatten(order='F')  # columna a columna
    pixels = np.concatenate([[0], pixels, [0]])
    runs = np.where(pixels[1:] != pixels[:-1])[0] + 1
    runs[1::2] = runs[1::2] - runs[::2]
    return ' '.join(str(x) for x in runs)

In [22]:
import pandas as pd

submission = []
model.eval()

with torch.no_grad():
    for images, img_names in test_loader:
        images = images.to(device)
        outputs = model(images)  # [B, 1, 572, 572]
        probs = torch.sigmoid(outputs)
        preds = (probs > 0.5).float()

        for i in range(images.size(0)):
            img_name = img_names[i]
            pred_mask_572 = preds[i, 0].cpu().numpy()  # [572, 572]

            # 1) Reescalar al tamaÃ±o original (800x800)
            pred_mask_orig = resize_mask_to_original(pred_mask_572, img_name)  # [H, W] original

            # 2) RLE en formato Kaggle (orden Fortran)
            rle_str = rle_encode(pred_mask_orig)

            submission.append({
                'id': img_name,      # ðŸ‘ˆ revisÃ¡ si Kaggle quiere "xxx.png" o solo "xxx"
                'rle_mask': rle_str
            })

submission_df = pd.DataFrame(submission)
submission_df.to_csv("submission.csv", index=False)
print("submission_4.csv guardado")

submission_4.csv guardado
