## Model

In [2]:
from keras.models import Model
from keras.layers import (
    Input,
    Conv2D,
    UpSampling2D,
    BatchNormalization,
    Activation,
    Add,
    Concatenate
)


def res_block(x, nb_filters, strides):
    # Residual path
    res_path = BatchNormalization()(x)
    res_path = Activation("relu")(res_path)
    res_path = Conv2D(nb_filters[0], (3, 3), padding='same', strides=strides[0])(res_path)

    res_path = BatchNormalization()(res_path)
    res_path = Activation("relu")(res_path)
    res_path = Conv2D(nb_filters[1], (3, 3), padding='same', strides=strides[1])(res_path)

    # Shortcut path
    shortcut = Conv2D(nb_filters[1], (1, 1), strides=strides[0])(x)
    shortcut = BatchNormalization()(shortcut)

    # Merge
    return Add()([shortcut, res_path])


def encoder(x):
    to_decoder = []

    # Initial residual block
    main_path = Conv2D(64, (3, 3), padding='same')(x)
    main_path = BatchNormalization()(main_path)
    main_path = Activation("relu")(main_path)

    main_path = Conv2D(64, (3, 3), padding='same')(main_path)
    shortcut = Conv2D(64, (1, 1))(x)
    shortcut = BatchNormalization()(shortcut)

    main_path = Add()([shortcut, main_path])
    to_decoder.append(main_path)

    # Downsampling residual blocks
    main_path = res_block(main_path, [128, 128], [(2, 2), (1, 1)])
    to_decoder.append(main_path)

    main_path = res_block(main_path, [256, 256], [(2, 2), (1, 1)])
    to_decoder.append(main_path)

    return to_decoder


def decoder(x, from_encoder):
    # First up + concat + residual
    main_path = UpSampling2D((2, 2))(x)
    main_path = Concatenate(axis=3)([main_path, from_encoder[2]])
    main_path = res_block(main_path, [256, 256], [(1, 1), (1, 1)])

    # Second
    main_path = UpSampling2D((2, 2))(main_path)
    main_path = Concatenate(axis=3)([main_path, from_encoder[1]])
    main_path = res_block(main_path, [128, 128], [(1, 1), (1, 1)])

    # Third
    main_path = UpSampling2D((2, 2))(main_path)
    main_path = Concatenate(axis=3)([main_path, from_encoder[0]])
    main_path = res_block(main_path, [64, 64], [(1, 1), (1, 1)])

    return main_path


def build_res_unet(input_shape):
    inputs = Input(shape=input_shape)

    # Encoder
    to_decoder = encoder(inputs)

    # Bottleneck
    x = res_block(to_decoder[2], [512, 512], [(2, 2), (1, 1)])

    # Decoder
    x = decoder(x, to_decoder)

    # Final segmentation output
    outputs = Conv2D(1, (1, 1), activation='sigmoid')(x)

    return Model(inputs=inputs, outputs=outputs)

## Util

In [6]:
import os
import numpy as np
import tensorflow as tf
from keras import Sequential
from keras.layers import RandomFlip, RandomRotation, RandomTranslation, RandomZoom
from keras.utils import Sequence
from keras.preprocessing.image import load_img, img_to_array
from keras.config import floatx

smooth = 1.


def dice_coef(y_true, y_pred):
    y_true_f = tf.reshape(y_true, [-1])
    y_pred_f = tf.reshape(y_pred, [-1])
    intersection = tf.reduce_sum(y_true_f * y_pred_f)
    return (2. * intersection + smooth) / (
        tf.reduce_sum(y_true_f) + tf.reduce_sum(y_pred_f) + smooth
    )


def dice_coef_loss(y_true, y_pred):
    return -dice_coef(y_true, y_pred)


class PASCALVOCIterator(Sequence):
    """
    Iterator adaptado a la estructura de `dataset/` proporcionada:
    - dataset/train/sat: imágenes (.tiff)
    - dataset/train/map: máscaras (grayscale, .tif)
    - dataset/valid/sat y dataset/valid/map para validación (opcional)

    No requiere `ImageSets` ni `train.txt`. Empareja por nombre de archivo.
    """

    IMG_EXTS = (".tiff",)  # imágenes en sat
    MASK_EXTS = (".tif",)  # máscaras en map

    def __init__(self, directory, split="train",
                 target_size=(256, 256), color_mode='grayscale',
                 batch_size=32, shuffle=True, seed=None,
                 interpolation='nearest'):

        self.directory = directory
        self.split = split  # 'train' | 'valid' | 'test'
        self.target_size = tuple(target_size)
        self.color_mode = color_mode
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.seed = seed
        self.interpolation = interpolation

        # Augmentación (solo para train)
        self.augmentation = Sequential([
            RandomFlip("vertical"),
            RandomFlip("horizontal"),
            RandomRotation(0.1),
            RandomTranslation(0.1, 0.1),
            RandomZoom(0.1)
        ]) if split == "train" else None

        # Rutas según estructura entregada
        self.images_path = os.path.join(directory, split, "sat")
        self.masks_path = os.path.join(directory, split, "map")

        for item in [self.images_path, self.masks_path]:
            assert os.path.exists(item), f"Path does not exist: {item}"

        # Listamos imágenes y emparejamos con máscaras por nombre
        img_files = [f for f in os.listdir(self.images_path) if f.lower().endswith(self.IMG_EXTS)]
        img_files.sort()

        self.filenames = []
        self.masks = []
        missing = 0
        for fname in img_files:
            base = os.path.splitext(fname)[0]
            # Buscamos máscara con extensiones esperadas
            mask_candidates = [
                os.path.join(self.masks_path, base + ext) for ext in self.MASK_EXTS
            ]
            mask_path = next((p for p in mask_candidates if os.path.exists(p)), None)
            if mask_path is not None:
                self.filenames.append(os.path.join(self.images_path, fname))
                self.masks.append(mask_path)
            else:
                missing += 1

        self.samples = len(self.filenames)
        if self.samples == 0:
            raise ValueError(
                f"No se encontraron pares imagen-máscara en '{self.images_path}' (.tiff) y '{self.masks_path}' (.tif). "
                f"Verifica que los nombres de archivo coincidan por basename."
            )

        # Índices barajados
        self.indices = np.arange(self.samples)
        if self.shuffle:
            if self.seed is not None:
                np.random.seed(self.seed)
                
            np.random.shuffle(self.indices)

    def __len__(self):
        return int(np.ceil(self.samples / self.batch_size))

    def __getitem__(self, idx):
        batch_indices = self.indices[idx * self.batch_size:(idx + 1) * self.batch_size]
        return self._generate_batch(batch_indices)

    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indices)

    def _generate_batch(self, batch_indices):
        batch_x = []
        batch_y = []

        for i in batch_indices:
            # Imagen (cargamos como grayscale para que coincida con input_shape)
            img_path = self.filenames[i]
            img = load_img(
                img_path,
                color_mode=self.color_mode,  # 'grayscale' por defecto
                target_size=self.target_size,
                interpolation=self.interpolation
            )
            x = img_to_array(img) / 255.0

            # Augmentación (solo en train)
            if self.augmentation is not None:
                x = self.augmentation(x, training=True)

            # Máscara
            mask_path = self.masks[i]
            mask = load_img(
                mask_path,
                color_mode='grayscale',
                target_size=self.target_size,
                interpolation=self.interpolation
            )
            y = img_to_array(mask) / 255.0

            batch_x.append(x)
            batch_y.append(y)

        batch_x = np.array(batch_x, dtype=floatx())
        batch_y = np.array(batch_y, dtype=floatx())

        return batch_x, batch_y

## Train

In [4]:
import os
import datetime

from keras.optimizers import Adadelta
from keras.callbacks import ModelCheckpoint, TensorBoard

# Hyper parameters
model_name = "./res_unet_"
input_shape = (512, 512, 1)  # Grayscale for segmentation
dataset_folder = "dataset"  # Provided structure: train/valid/test with sat/map
batch_size = 2

# Archivos de guardado
timestamp = datetime.datetime.now().strftime("_%d_%m_%y_%H_%M_%S")
model_file = model_name + timestamp + ".keras"             # modelo completo
weights_file = model_name + timestamp + ".weights.h5"      # solo pesos

model = build_res_unet(input_shape=input_shape)
optimizer = Adadelta()
model.compile(optimizer=optimizer, loss=dice_coef_loss, metrics=[dice_coef])

os.makedirs("models", exist_ok=True)

# Guardar el mejor modelo completo (.keras)
checkpoint_model = ModelCheckpoint(
    filepath=os.path.join("models", model_file),
    monitor="loss",
    save_best_only=True,
    save_weights_only=False,
    verbose=1
)

# Guardar solo los mejores pesos (.weights.h5)
checkpoint_weights = ModelCheckpoint(
    filepath=os.path.join("models", weights_file),
    monitor="loss",
    save_best_only=True,
    save_weights_only=True,
    verbose=1
)

tensorboard = TensorBoard()

# Generadores
try:
    train_gen = PASCALVOCIterator(
        directory=dataset_folder,
        split="train",
        target_size=(input_shape[0], input_shape[1]),
        color_mode='grayscale',
        batch_size=batch_size,
        shuffle=True
    )
except ValueError as e:
    print(f"Error preparando train: {e}")
    raise

# Validación si existe
try:
    val_gen = PASCALVOCIterator(
        directory=dataset_folder,
        split="valid",
        target_size=(input_shape[0], input_shape[1]),
        color_mode='grayscale',
        batch_size=batch_size,
        shuffle=False
    )
    validation_data = val_gen
except (AssertionError, ValueError) as e:
    print(f"Validación no disponible: {e}")
    val_gen = None
    validation_data = None

# Steps
steps = len(train_gen)
if steps == 0:
    raise ValueError("El generador de entrenamiento no tiene muestras.")

# Entrenamiento
model.fit(
    train_gen,
    validation_data=validation_data,
    steps_per_epoch=steps,
    epochs=50,
    callbacks=[tensorboard, checkpoint_model, checkpoint_weights]
)

Epoch 1/50
[1m554/554[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 248ms/step - dice_coef: 0.0879 - loss: -0.0879
Epoch 1: loss improved from None to -0.08984, saving model to models/./res_unet__27_11_25_20_37_49.keras

Epoch 1: loss improved from None to -0.08984, saving model to models/./res_unet__27_11_25_20_37_49.weights.h5
[1m554/554[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m149s[0m 252ms/step - dice_coef: 0.0898 - loss: -0.0898 - val_dice_coef: 0.1213 - val_loss: -0.1213
Epoch 2/50
[1m554/554[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 243ms/step - dice_coef: 0.0908 - loss: -0.0908
Epoch 2: loss improved from -0.08984 to -0.09130, saving model to models/./res_unet__27_11_25_20_37_49.keras

Epoch 2: loss improved from -0.08984 to -0.09130, saving model to models/./res_unet__27_11_25_20_37_49.weights.h5
[1m554/554[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m136s[0m 246ms/step - dice_coef: 0.0913 - loss: -0.0913 - val_dice_coef: 0.1332 - val_loss: -0

<keras.src.callbacks.history.History at 0x77401e1975c0>

## Resultados

### 1. Model summary con capas y número de parámetros.

In [None]:
# Resumen del modelo
import os
import tensorflow as tf

try:
    model
except NameError:
    input_shape = (512, 512, 1)
    model = build_res_unet(input_shape=input_shape)

models_dir = "models"
loaded = False
if os.path.isdir(models_dir):
    weight_files = [f for f in os.listdir(models_dir) if f.endswith(".weights.h5")]
    candidate_path = None

    paths = [os.path.join(models_dir, f) for f in weight_files]
    candidate_path = max(paths, key=os.path.getmtime)
  
    model = build_res_unet(input_shape=(512, 512, 1))
    model.load_weights(candidate_path)
    loaded = True
    print(f"Cargados pesos entrenados desde: {candidate_path}")

print("Resumen del modelo:")
model.summary()

Cargados pesos entrenados desde: models/res_unet__27_11_25_20_37_49.weights.h5
Resumen del modelo:


## 2. Ejemplo de tensor de entrada: forma y distribución

Se toma un ejemplo del conjunto `valid` si existe; en su defecto, `train`. Se reporta la forma del tensor y estadísticas básicas (min, max, media, desviación estándar y percentiles).

In [7]:
# Forma y distribución del tensor de entrada
import numpy as np

# Preferimos 'valid', caemos a 'train' si no está disponible
try:
    gen = PASCALVOCIterator(
        directory="dataset",
        split="valid",
        target_size=(512, 512),
        color_mode='grayscale',
        batch_size=1,
        shuffle=False
    )
except (AssertionError, ValueError) as e:
    print(f"Validación no disponible: {e}")
    gen = PASCALVOCIterator(
        directory="dataset",
        split="train",
        target_size=(512, 512),
        color_mode='grayscale',
        batch_size=1,
        shuffle=False
    )

x_batch, y_batch = gen[0]
x = x_batch[0]
vals = x.flatten()

print(f"Forma del tensor de entrada: {x.shape}")
print(f"Min: {vals.min():.6f}  Max: {vals.max():.6f}")
print(f"Media: {vals.mean():.6f}  Std: {vals.std():.6f}")

percentiles = [0, 1, 5, 25, 50, 75, 95, 99, 100]
for p in percentiles:
    print(f"P{p}: {np.percentile(vals, p):.6f}")

Forma del tensor de entrada: (512, 512, 1)
Min: 0.047059  Max: 1.000000
Media: 0.385497  Std: 0.155685
P0: 0.047059
P1: 0.121569
P5: 0.184314
P25: 0.274510
P50: 0.352941
P75: 0.474510
P95: 0.694118
P99: 0.835294
P100: 1.000000


## 3. Logits de salida: explicación y valores

Elegimos trabajar con los logits de salida. En este modelo, la última capa es `Conv2D(1, (1, 1), activation='sigmoid')`. Los logits son los valores lineales antes de aplicar la sigmoide. Vemos que la sigmoide es invertible, por lo tanto podemos recuperar los logits aplicando la función `logit(p) = log(p / (1 - p))` sobre las probabilidades `p` predichas.

Interpretación:
- Los logits positivos indican alta confianza en clase/segmentación positiva.
- Los logits negativos indican alta confianza en clase/segmentación negativa.
- Valores cercanos a 0 reflejan incertidumbre (p ≈ 0.5).

In [8]:
# Cálculo de logits y activaciones relevantes
import numpy as np

# Asegurar batch de entrada
try:
    x_batch
except NameError:
    try:
        gen = PASCALVOCIterator(
            directory="dataset",
            split="valid",
            target_size=(512, 512),
            color_mode='grayscale',
            batch_size=1,
            shuffle=False
        )
    except (AssertionError, ValueError) as e:
        print(f"Validación no disponible: {e}")
        gen = PASCALVOCIterator(
            directory="dataset",
            split="train",
            target_size=(512, 512),
            color_mode='grayscale',
            batch_size=1,
            shuffle=False
        )
    x_batch, y_batch = gen[0]

try:
    model
except NameError:
    model = build_res_unet(input_shape=(512, 512, 1))

# Probabilidades y logits
p = model.predict(x_batch, verbose=0)
# Evitamos divisiones por cero
eps = 1e-7
logits = np.log(p + eps) - np.log(1 - p + eps)

Z = logits.flatten()
print(f"Logits forma: {logits.shape}")
print(f"Logits min: {Z.min():.6f}  max: {Z.max():.6f}")
print(f"Logits mean: {Z.mean():.6f}  std: {Z.std():.6f}")

# Mostramos los valores más extremos (activaciones más relevantes)
top_k = 10
idx_sorted = np.argsort(Z)
negatives = Z[idx_sorted[:top_k]]
positives = Z[idx_sorted[-top_k:]]
print(f"Top {top_k} logits más negativos:")
print(negatives)
print(f"Top {top_k} logits más positivos:")
print(positives)

# Mostramos las probabilidades correspondientes a esos extremos
P = p.flatten()
neg_probs = P[idx_sorted[:top_k]]
pos_probs = P[idx_sorted[-top_k:]]
print("Probs de esos logits negativos:")
print(neg_probs)
print("Probs de esos logits positivos:")
print(pos_probs)

Logits forma: (1, 512, 512, 1)
Logits min: -8.779273  max: 16.118095
Logits mean: 8.969067  std: 6.949588
Top 10 logits más negativos:
[-8.779273 -8.732031 -8.70512  -8.675623 -8.630904 -8.628486 -8.536283
 -8.535275 -8.50159  -8.483449]
Top 10 logits más positivos:
[16.118095 16.118095 16.118095 16.118095 16.118095 16.118095 16.118095
 16.118095 16.118095 16.118095]
Probs de esos logits negativos:
[0.00015377 0.00016121 0.00016561 0.00017057 0.00017837 0.0001788
 0.00019608 0.00019628 0.000203   0.00020672]
Probs de esos logits positivos:
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]


I0000 00:00:1766897737.779777     114 device_compiler.h:196] Compiled cluster using XLA!  This line is logged at most once for the lifetime of the process.
