## **1. IMPORTS Y CONFIGURACI√ìN**
Importamos las librer√≠as necesarias para el procesamiento de im√°genes, redes neuronales y entrenamiento distribuido.

In [1]:
import os
import io
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms, datasets, models
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler

from pyspark.sql import SparkSession
from pyspark.ml.torch.distributor import TorchDistributor

## **2. FUNCI√ìN DE ENTRENAMIENTO DISTRIBUIDO**
Esta funci√≥n entrena un modelo ResNet18 de manera distribuida usando PyTorch y Spark.
Se hace uso de AMP (Automatic Mixed Precision) para optimizar el rendimiento en GPUs.

In [2]:
def train_fn():
    """
    Esta funci√≥n entrena un modelo ResNet18 de manera distribuida usando PyTorch y Spark.
    Se hace uso de AMP (Automatic Mixed Precision) para optimizar el rendimiento en GPUs.
    """
    import torch
    import torch.distributed as dist
    from torchvision import transforms, datasets, models
    from torch.utils.data import DataLoader
    from torch.utils.data.distributed import DistributedSampler
    import io, os

    print("=== DISTRIBUTED RESNET18 TRAINING (AMP + SHARDING) ===")

    # ========================================
    # A. METADATOS DISTRIBUIDOS
    # ========================================

    # Informaci√≥n del proceso distribuido, como el rank (ID del worker) y el tama√±o total del mundo (n√∫mero de workers)
    rank = int(os.environ["RANK"])
    world_size = int(os.environ["WORLD_SIZE"])
    print(f"[Worker {rank}] World size: {world_size}")

    # Definimos la ruta del dataset, que se encuentra en un sistema de archivos distribuido (NFS).
    dataset_path = "/mnt/spark_data/DATASET-RUIDO"
    print(f"[Worker {rank}] Dataset path: {dataset_path}")

    # ========================================
    # B. TRANSFORMACIONES DE IM√ÅGENES
    # ========================================

    # Definimos las transformaciones que se aplicar√°n a las im√°genes de entrenamiento.
    # Estas incluyen el redimensionamiento, la normalizaci√≥n y la aleatorizaci√≥n de las im√°genes.
    train_tf = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225]
        )
    ])

    # ============================
    # C. CARGA DE LOS DATOS
    # ============================

    # Cargamos el dataset usando la clase ImageFolder de PyTorch y aplicamos las transformaciones definidas anteriormente.
    dataset = datasets.ImageFolder(root=dataset_path, transform=train_tf)

    # ============================
    # D. SAMPLER DISTRIBUIDO
    # ============================

    # Utilizamos un sampler distribuido para que cada worker cargue su parte del dataset de manera eficiente.
    sampler = DistributedSampler(
        dataset,
        num_replicas=world_size,  # N√∫mero total de workers
        rank=rank,  # ID del worker actual
        shuffle=True  # Mezclar los datos
    )

    # Creamos un DataLoader que usar√° este sampler distribuido.
    dataloader = DataLoader(
        dataset,
        batch_size=32,  # Tama√±o del batch
        sampler=sampler,
        num_workers=4,  # N√∫mero de workers para cargar los datos
        pin_memory=True  # Mejor rendimiento en GPUs
    )

    print(f"[Worker {rank}] Total images loaded: {len(dataset)}")

    # ================================
    # E. CONFIGURACI√ìN DEL DISPOSITIVO
    # ================================

    # Verificamos si hay una GPU disponible y configuramos el dispositivo donde se entrenar√° el modelo.
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"[Worker {rank}] Training on: {device}")

    # ============================
    # F. MODELO PREENTRENADO
    # ============================

    # Cargamos un modelo preentrenado ResNet18 y reemplazamos la √∫ltima capa para adaptarlo a nuestro problema (2 clases).
    model = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1)
    model.fc = nn.Linear(model.fc.in_features, 2)  # Adaptamos la capa final
    model = model.to(device)  # Movemos el modelo al dispositivo (GPU o CPU)
    model.train()  # Ponemos el modelo en modo de entrenamiento

    # ================================
    # G. CRITERIO, OPTIMIZADOR Y AMP
    # ================================

    # Definimos la funci√≥n de p√©rdida y el optimizador.
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=1e-4)

    # Utilizamos el GradScaler para mejorar el rendimiento en GPUs con AMP (Precisi√≥n Mixta Autom√°tica).
    scaler = torch.cuda.amp.GradScaler()

    # ================================
    # H. BUCLE DE ENTRENAMIENTO
    # ================================

    # Definimos el n√∫mero de √©pocas para entrenar el modelo.
    EPOCHS = 3
    print(f"[Worker {rank}] Starting training for {EPOCHS} epochs")

    for epoch in range(EPOCHS):
        # Establecemos la √©poca para el sampler distribuido.
        sampler.set_epoch(epoch)
        total_loss = 0.0

        # Bucle de entrenamiento para cada batch
        for imgs, labels in dataloader:
            imgs = imgs.to(device, non_blocking=True)
            labels = labels.to(device, non_blocking=True)

            optimizer.zero_grad()

            # Usamos AMP para entrenamiento con precisi√≥n mixta
            with torch.cuda.amp.autocast():
                outputs = model(imgs)
                loss = criterion(outputs, labels)

            # Backpropagation con AMP
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

            total_loss += loss.item()

        # Imprimimos el progreso del entrenamiento
        print(f"[Worker {rank}] Epoch {epoch+1}/{EPOCHS} - Loss: {total_loss:.4f}")
        torch.cuda.synchronize()

    print(f"[Worker {rank}] Training finished!")

    # ============================
    # I. DEVOLVER EL MODELO
    # ============================

    # Solo el worker 0 (primer worker) guarda y devuelve el modelo entrenado.
    if rank == 0:
        buffer = io.BytesIO()
        torch.save(model.state_dict(), buffer)
        buffer.seek(0)
        return buffer.getvalue()

    return None

## **3. CONFIGURACI√ìN DE SPARK**
Creamos una sesi√≥n de Spark configurada para el entrenamiento distribuido con GPU.

In [3]:
# SPARK CONFIG

spark = (
    SparkSession.builder
    .appName("BrainTumor-ResNet18-Distributed-IPYNB")
    .master("spark://100.108.67.1:7077")
    .config("spark.executor.instances", "2")
    .config("spark.executor.resource.gpu.amount", "1")
    .config("spark.executor.resource.gpu.discoveryScript", "/usr/local/bin/get-gpus.sh")
    .config("spark.task.resource.gpu.amount", "1")
    .config("spark.executorEnv.NCCL_SOCKET_IFNAME", "tailscale0")
    .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/22 19:18:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Inicializamos Spark y lo dejamos listo para ejecutar el entrenamiento distribuido.

In [4]:

spark


## **4. EJECUCI√ìN Y GUARDADO DEL MODELO DISTRIBUIDO**
Imprimimos mensaje para indicar el inicio del entrenamiento distribuido.

In [5]:
print("Launching distributed training with AMP + SHARDING + 2 GPUs...")

# Usamos el distribuidor de PySpark para ejecutar la funci√≥n de entrenamiento en dos procesos (uno por cada GPU).
model_bytes = TorchDistributor(
    num_processes=2,  # N√∫mero de procesos a usar (uno por GPU)
    local_mode=False,  # Ejecutar en un cluster distribuido
    use_gpu=True  # Usar GPUs para el entrenamiento
).run(train_fn)

# Si el modelo se entren√≥ correctamente y se devolvi√≥, lo guardamos en un archivo local.
if model_bytes is not None:
    # Ruta donde se guardar√° el modelo entrenado.
    out_path = "/home/piero/brain_resnet18.pt"
    
    # Abrimos el archivo en modo escritura binaria y guardamos los bytes del modelo.
    with open(out_path, "wb") as f:
        f.write(model_bytes)
    
    # Imprimimos una confirmaci√≥n indicando que el modelo se guard√≥ correctamente.
    print(f"Modelo guardado correctamente en: {out_path}")
else:
    # Si el worker secundario no devuelve el modelo, mostramos un mensaje de error.
    print("Worker secundario: no devuelve modelo.")

Launching distributed training with AMP + SHARDING + 2 GPUs...


Started distributed training with 2 executor processes
=== DISTRIBUTED RESNET18 TRAINING (AMP + SHARDING) ===              (0 + 2) / 2]
[Worker 0] World size: 2
[Worker 0] Dataset path: /mnt/spark_data/DATASET-RUIDO
=== DISTRIBUTED RESNET18 TRAINING (AMP + SHARDING) ===
[Worker 1] World size: 2
[Worker 1] Dataset path: /mnt/spark_data/DATASET-RUIDO
[Worker 0] Total images loaded: 5000                                (0 + 2) / 2]
[Worker 0] Training on: cuda
[Worker 0] Starting training for 3 epochs
[Worker 0] Epoch 1/3 - Loss: 13.4124
[Worker 0] Epoch 2/3 - Loss: 6.0668
[Worker 0] Epoch 3/3 - Loss: 1.9728                                 (0 + 2) / 2]
[Worker 0] Training finished!
[Worker 1] Total images loaded: 5000
[Worker 1] Training on: cuda
[Worker 1] Starting training for 3 epochs
[Worker 1] Epoch 1/3 - Loss: 14.0739                                (0 + 2) / 2]
[Worker 1] Epoch 2/3 - Loss: 3.5299                                 (0 + 2) / 2]
[Worker 1] Epoch 3/3 - Loss: 1.4003
[Worker

Modelo guardado correctamente en: /home/piero/brain_resnet18.pt


## **5. REGISTRAR EL EXPERIMENTO EN MLFLOW**

In [6]:
import dagshub
import mlflow

dagshub.init(
    repo_owner='picantitoDev',
    repo_name='percepcion-proyecto',
    mlflow=True
)

mlflow.set_tracking_uri("https://dagshub.com/picantitoDev/percepcion-proyecto.mlflow")
mlflow.set_experiment("ResNet18-Distributed-AMP")



Open the following link in your browser to authorize the client:
https://dagshub.com/login/oauth/authorize?state=6b16b3ed-cd23-4345-b005-36ff8d963e4f&client_id=32b60ba385aa7cecf24046d8195a71c07dd345d9657977863b52e7748e0f0f28&middleman_request_id=7bffd1fbbee7c3758a08393e9b05a8b5860d586823cf8b7d80485d1d17d02ff2




2025/11/22 19:24:04 INFO mlflow.tracking.fluent: Experiment with name 'ResNet18-Distributed-AMP' does not exist. Creating a new experiment.


<Experiment: artifact_location='mlflow-artifacts:/b4b56f5dabe94db9a5cf1af5119248d6', creation_time=1763857444386, experiment_id='0', last_update_time=1763857444386, lifecycle_stage='active', name='ResNet18-Distributed-AMP', tags={}>

In [17]:
if model_bytes is not None:
    print("Registrando en MLflow...")

    buffer = io.BytesIO(model_bytes)
    state_dict = torch.load(buffer, map_location="cpu")

    model = models.resnet18(weights=None)
    model.fc = nn.Linear(model.fc.in_features, 2)
    model.load_state_dict(state_dict)

    with mlflow.start_run() as run:
        # PARAMS
        mlflow.log_params({
            "batch_size": 32,
            "epochs": 3,
            "optimizer": "Adam",
            "lr": 1e-4,
            "model": "ResNet18",
            "distributed_world_size": 2,
            "amp": True,
        })

        mlflow.log_metric("final_loss", 0)

        # FIX: REMOVE PREVIOUS MODEL FOLDER
        import shutil, os
        if os.path.exists("model"):
            shutil.rmtree("model")

        # LOG MODEL
        mlflow.pytorch.save_model(model, path="model")
        mlflow.log_artifacts("model", artifact_path="model")

        print("Modelo registrado en MLflow correctamente.")
        
        # ==========================================
        # REGISTER THE MODEL IN DAGSHUB MODEL REGISTRY
        # ==========================================
        from mlflow import MlflowClient
        
        run_id = run.info.run_id
        model_name = "ResNet18"
        model_uri = f"runs:/{run_id}/model"
        
        client = MlflowClient()
        
        print(f"Registering model '{model_name}' from {model_uri} ...")
        
        try:
            registered_model = client.create_registered_model(model_name)
            print(f"Created new registered model: {model_name}")
        except Exception as e:
            print(f"Model {model_name} already exists, creating new version...")
        
        # Create a new version
        model_version = client.create_model_version(
            name=model_name,
            source=model_uri,
            run_id=run_id
        )
        
        print("Model registered successfully!")
        print(f"Model version: {model_version.version}")
        print("Model details:", model_version)

Registrando en MLflow...
Modelo registrado en MLflow correctamente.
Registering model 'ResNet18' from runs:/7caba809aabf4de6b9716fc2eea9c8cf/model ...
Created new registered model: ResNet18


2025/11/22 19:45:35 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: ResNet18, version 1


Model registered successfully!
Model version: 1
Model details: <ModelVersion: aliases=[], creation_timestamp=1763858735162, current_stage='None', deployment_job_state=<ModelVersionDeploymentJobState: current_task_name='', job_id='', job_state='DEPLOYMENT_JOB_CONNECTION_STATE_UNSPECIFIED', run_id='', run_state='DEPLOYMENT_JOB_RUN_STATE_UNSPECIFIED'>, description='', last_updated_timestamp=1763858735162, metrics=None, model_id=None, name='ResNet18', params=None, run_id='7caba809aabf4de6b9716fc2eea9c8cf', run_link='', source='runs:/7caba809aabf4de6b9716fc2eea9c8cf/model', status='READY', status_message=None, tags={}, user_id='', version='1'>
üèÉ View run angry-moth-625 at: https://dagshub.com/picantitoDev/percepcion-proyecto.mlflow/#/experiments/0/runs/7caba809aabf4de6b9716fc2eea9c8cf
üß™ View experiment at: https://dagshub.com/picantitoDev/percepcion-proyecto.mlflow/#/experiments/0


## **6. DETENER LA SESI√ìN DE SPARK**


Finalmente, detenemos la sesi√≥n de Spark para liberar los recursos del cluster.

In [18]:
spark.stop()