<h1 style="text-align:center;">Practical session 13</h1>
<h2 style="text-align:center;">Biomedical Data Science</h2>
<h3 style="text-align:center;">Lucas Fayolle & Jose Valero</h3>

# Libraries

In [1]:
import numpy as np
import random
from scipy import ndimage

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

# Data

In [2]:
@tf.function
def rotate(volume):
    """Rotate the volume by a few degrees"""
    def scipy_rotate(volume):
        angles = [-20, -10, -5, 5, 10, 20]
        angle = random.choice(angles)
        volume = ndimage.rotate(volume, angle, reshape=False)
        volume[volume < 0] = 0
        volume[volume > 1] = 1
        return volume

    augmented_volume = tf.numpy_function(scipy_rotate, [volume], tf.float32)
    augmented_volume.set_shape(volume.shape)
    return augmented_volume


def train_preprocessing(volume, label):
    """Process training data by rotating and adding a channel."""
    volume = rotate(volume)
    volume = tf.expand_dims(volume, axis=3)
    return volume, label


def validation_preprocessing(volume, label):
    """Process validation data by only adding a channel."""
    volume = tf.expand_dims(volume, axis=3)
    return volume, label

In [None]:
# np.savez("processed_data.npz", x_train=x_train, y_train=y_train, x_val=x_val, y_val=y_val)

In [3]:
data = np.load("processed_data.npz")
x_train = data["x_train"]
y_train = data["y_train"]
x_val = data["x_val"]
y_val = data["y_val"]

In [4]:
train_loader = tf.data.Dataset.from_tensor_slices((x_train, y_train))
validation_loader = tf.data.Dataset.from_tensor_slices((x_val, y_val))

batch_size = 2
train_dataset = (
    train_loader.shuffle(len(x_train))
    .map(train_preprocessing)
    .batch(batch_size)
    .prefetch(2)
)
validation_dataset = (
    validation_loader.shuffle(len(x_val))
    .map(validation_preprocessing)
    .batch(batch_size)
    .prefetch(2)
)

I0000 00:00:1734968667.506334     836 gpu_device.cc:2022] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 5563 MB memory:  -> device: 0, name: NVIDIA GeForce RTX 4060, pci bus id: 0000:01:00.0, compute capability: 8.9


In [None]:
len(train_dataset)

In [None]:
len(validation_dataset)

In [None]:
def train_model(model, train_dataset, validation_dataset, epochs=20):
    initial_learning_rate = 0.0001
    lr_schedule = keras.optimizers.schedules.ExponentialDecay(
        initial_learning_rate, decay_steps=100000, decay_rate=0.96, staircase=True
    )
    model.compile(
        loss="binary_crossentropy",
        optimizer=keras.optimizers.Adam(learning_rate=lr_schedule),
        metrics=["acc"],
    )

    # Define callbacks
    checkpoint_cb = keras.callbacks.ModelCheckpoint(
        f"{model.name}.keras", save_best_only=True
    )
    early_stopping_cb = keras.callbacks.EarlyStopping(monitor="val_acc", patience=5)

    # Train the model
    history = model.fit(
        train_dataset,
        validation_data=validation_dataset,
        epochs=epochs,
        shuffle=True,
        verbose=2,
        callbacks=[checkpoint_cb, early_stopping_cb],
    )

    return history

In [None]:
def plot_training_history(history):
    fig, ax = plt.subplots(1, 2, figsize=(20, 5))
    ax = ax.ravel()

    for i, metric in enumerate(["acc", "loss"]):
        ax[i].plot(history.history[metric])
        ax[i].plot(history.history["val_" + metric])
        ax[i].set_title(f"Model {metric}")
        ax[i].set_xlabel("Epochs")
        ax[i].set_ylabel(metric.capitalize())
        ax[i].legend(["Train", "Validation"])

    plt.show()

In [None]:
def make_predictions(model, x_val, class_names):
    predictions = model.predict(np.expand_dims(x_val[0], axis=0))[0]
    scores = [1 - predictions[0], predictions[0]]

    for score, name in zip(scores, class_names):
        print(
            f"This model is {100 * score:.2f}% confident that the CT scan is {name}."
        )

In [None]:
def run_experiment_and_save(
    model_fn, 
    config_name, 
    config_description, 
    train_dataset, 
    validation_dataset, 
    epochs=20,
    results_list=None
):
    """
    model_fn: función que construye y retorna un modelo
    config_name: nombre corto de la configuración (ej: 'extra_dense')
    config_description: descripción más larga
    train_dataset, validation_dataset: tus datasets
    epochs: número de épocas
    results_list: lista donde se guardará la info del experimento
    """
    model = model_fn()
    print(f"\nEntrenando configuración: {config_name}")
    model.summary()

    history = train_model(model, train_dataset, validation_dataset, epochs=epochs)

    final_train_acc = history.history["acc"][-1]
    final_train_loss = history.history["loss"][-1]
    final_val_acc   = history.history["val_acc"][-1]
    final_val_loss  = history.history["val_loss"][-1]

    results_list.append({
        "timestamp": datetime.datetime.now().isoformat(),
        "config_name": config_name,
        "config_description": config_description,
        "final_train_acc": final_train_acc,
        "final_train_loss": final_train_loss,
        "final_val_acc": final_val_acc,
        "final_val_loss": final_val_loss,
    })

    print(f"Finalizado entrenamiento de {config_name}.")

In [None]:
def residual_block_3d(x, filters, activation="relu"):
    shortcut = x
    x = layers.Conv3D(filters, kernel_size=3, padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation(activation)(x)

    x = layers.Conv3D(filters, kernel_size=3, padding="same")(x)
    x = layers.BatchNormalization()(x)

    x = layers.Add()([shortcut, x])
    x = layers.Activation(activation)(x)
    return x

def inception_block_3d(x, filters, activation="relu"):
    branch1 = layers.Conv3D(filters, kernel_size=1, padding="same", activation=activation)(x)
    branch2 = layers.Conv3D(filters, kernel_size=3, padding="same", activation=activation)(x)
    branch3 = layers.Conv3D(filters, kernel_size=5, padding="same", activation=activation)(x)
    branch4 = layers.MaxPool3D(pool_size=3, strides=1, padding="same")(x)
    branch4 = layers.Conv3D(filters, kernel_size=1, padding="same", activation=activation)(branch4)
    x = layers.Concatenate(axis=-1)([branch1, branch2, branch3, branch4])
    return x

In [None]:
def build_3d_cnn(
    width=128,
    height=128,
    depth=64,
    initial_filters=[64, 64, 128, 256],
    extra_dense=False,
    two_conv_per_level=False,
    activation="relu",
    pooling="max",  # "max" o "avg"
    use_residual=False,
    use_inception=False,
    name="3dcnn"
):
    inputs = keras.Input((width, height, depth, 1))

    x = inputs
    for f in initial_filters:
        if use_residual:
            x = residual_block_3d(x, f, activation=activation)
        elif use_inception:
            x = inception_block_3d(x, f, activation=activation)
        else:
            x = layers.Conv3D(filters=f, kernel_size=3, activation=activation, padding="same")(x)
            if two_conv_per_level:
                x = layers.Conv3D(filters=f, kernel_size=3, activation=activation, padding="same")(x)

        if pooling == "avg":
            x = layers.AveragePooling3D(pool_size=2)(x)
        else:
            x = layers.MaxPool3D(pool_size=2)(x)

        x = layers.BatchNormalization()(x)

    x = layers.GlobalAveragePooling3D()(x)

    x = layers.Dense(units=512, activation=activation)(x)
    x = layers.Dropout(0.3)(x)

    if extra_dense:
        x = layers.Dense(units=100, activation="relu")(x)

    outputs = layers.Dense(units=1, activation="sigmoid")(x)

    model = keras.Model(inputs, outputs, name=name)
    return model

In [None]:
models_config = {
    "original": {
        "description": "Modelo base sin modificaciones",
        "params": {
            "extra_dense": False,
            "two_conv_per_level": False,
            "activation": "relu",
            "pooling": "max",
            "use_residual": False,
            "use_inception": False,
        },
    },
    "extra_dense": {
        "description": "Añade capa Dense de 100 unidades antes de la salida",
        "params": {
            "extra_dense": True,       # clave
            "two_conv_per_level": False,
            "activation": "relu",
            "pooling": "max",
            "use_residual": False,
            "use_inception": False,
        },
    },
    "two_conv_per_level": {
        "description": "Agrega una segunda capa Conv3D en cada bloque",
        "params": {
            "extra_dense": False,
            "two_conv_per_level": True,  # clave
            "activation": "relu",
            "pooling": "max",
            "use_residual": False,
            "use_inception": False,
        },
    },
    "relu_to_sigmoid": {
        "description": "Cambia la activación ReLU por sigmoid en todas las capas",
        "params": {
            "extra_dense": False,
            "two_conv_per_level": False,
            "activation": "sigmoid",  # clave
            "pooling": "max",
            "use_residual": False,
            "use_inception": False,
        },
    },
    "max_to_avg_pooling": {
        "description": "Usa AveragePooling3D en lugar de MaxPooling3D",
        "params": {
            "extra_dense": False,
            "two_conv_per_level": False,
            "activation": "relu",
            "pooling": "avg",   # clave
            "use_residual": False,
            "use_inception": False,
        },
    },
    "residual_blocks": {
        "description": "Reemplaza capas conv por bloques residuales",
        "params": {
            "extra_dense": False,
            "two_conv_per_level": False,
            "activation": "relu",
            "pooling": "max",
            "use_residual": True,   # clave
            "use_inception": False,
        },
    },
    "inception_blocks": {
        "description": "Reemplaza capas conv por bloques inception 3D",
        "params": {
            "extra_dense": False,
            "two_conv_per_level": False,
            "activation": "relu",
            "pooling": "max",
            "use_residual": False,
            "use_inception": True,  # clave
        },
    },
    # Podrías seguir agregando “residual_inception_blocks”, etc.
}

In [None]:
import pandas as pd
import datetime

def run_all_experiments(train_dataset, validation_dataset, epochs=20):
    results_list = []

    for config_name, cfg in models_config.items():
        print(f"\n=== Entrenando modelo '{config_name}' ===")
        params = cfg["params"]

        model = build_3d_cnn(
            width=128, 
            height=128, 
            depth=64,
            **params,          
            name=f"3dcnn_{config_name}"
        )
        model.summary()

        history = train_model(model, train_dataset, validation_dataset, epochs=epochs)

        final_train_acc = history.history["acc"][-1]
        final_val_acc   = history.history["val_acc"][-1]

        results_list.append({
            "timestamp": datetime.datetime.now().isoformat(),
            "config_name": config_name,
            "description": cfg["description"],
            "final_train_acc": final_train_acc,
            "final_val_acc": final_val_acc,
        })

        tf.keras.backend.clear_session()

    df = pd.DataFrame(results_list)
    # df.to_csv("experimentos.csv", index=False)
    return df

In [None]:
df = run_all_experiments(train_dataset, validation_dataset, epochs=2)