*<small>Last updated: 2026-02-19 22:39:14 UTC | Student Version (No Solutions)</small>*

**Student Version** &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; **Instructor Version**

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/gist/mtgca/7d52f0b7b63c6317f0151fe1505d85c7/ComplexCNN.ipynb) &nbsp;&nbsp;&nbsp; [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/mtgca/DL-Labs/blob/main/01%20CNNs/ComplexCNN.ipynb)

# MobileNetV2 usando PyTorch Lightning
## Objetivos

- Importar un modelo CNN complejo
- Entrenar el modelo para clasificacion de imagenes


## Instalar e importar bibliotecas


In [None]:
!pip install lightning

In [None]:
import numpy as np
import os
%matplotlib inline
import matplotlib.pyplot as plt
import time
import pandas as pd

import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
import pytorch_lightning as pl
import torchmetrics

from collections import Counter
from torchvision import datasets
from torchvision import transforms
from torch.utils.data import DataLoader
from torch.utils.data.dataset import random_split

from pytorch_lightning.callbacks import ModelCheckpoint
from pytorch_lightning.loggers import CSVLogger
from pytorch_lightning.callbacks import TQDMProgressBar, RichProgressBar

torch.set_float32_matmul_precision('medium')

# Performance Optimization Through Environment Detection
The fundamental difference between VSCode remote containers and Google Colab lies in how they communicate with the user interface. In VSCode remote environments, your code executes inside a Docker container or remote server, and all output—including progress bar updates, print statements, and data transfers—must traverse the network to reach your local VSCode client. When PyTorch Lightning's progress bar updates hundreds of times per epoch (once per batch), these frequent UI updates create massive network overhead, with each update requiring a round-trip between the container and your client. Similarly, spawning multiple data loader worker processes in a remote container creates severe inter-process communication (IPC) bottlenecks, as each worker must serialize and transfer data through multiple layers. In contrast, Google Colab runs in a browser with a local connection to its backend servers, making these operations far less costly.

This environment detection code automatically identifies whether you're in a constrained remote environment and adjusts critical settings accordingly. For VSCode remote, it sets num_workers=0 to eliminate multi-process overhead (letting the GPU handle parallelism instead), disables progress_bar to eliminate hundreds of network round-trips per epoch, and reduces logging frequency by 10x. These changes transform training from 10-100x slower than native to near-native GPU speed—typically resulting in 5-10x performance improvement. For Colab or local environments where these bottlenecks don't exist, it maintains standard settings with progress bars and parallel data loading. This automatic adaptation means you can run the same notebook efficiently in both environments without manual configuration changes, ensuring optimal performance regardless of where you execute your training.

In [None]:
import os
import sys

def is_remote_vscode():
    """Detect if running in VSCode remote container/SSH environment.

    Uses DMI Product information to distinguish between VSCode Remote and Google Colab.
    Google Colab runs on "Google Compute Engine" VMs.
    """
    try:
        if os.path.exists('/sys/class/dmi/id/product_name'):
            with open('/sys/class/dmi/id/product_name', 'r') as f:
                product = f.read().strip()
                print("DMI:", product)
                # Google Colab runs on Google Compute Engine
                if 'Google' in product or 'Compute Engine' in product:
                    return False  # This is Google Colab
        # If not Google Compute Engine, assume VSCode Remote/Local
        return True
    except:
        # If we can't read DMI, assume local/non-Colab
        return True

# Detect environment and set optimal configurations
IS_REMOTE = is_remote_vscode()
print(f"Environment detected: {'VSCode Remote' if IS_REMOTE else 'Google Colab'}")

# Performance-optimized settings based on environment
if IS_REMOTE:
    ENABLE_PROGRESS_BAR = False  # Disable progress bar updates over network
else:
    ENABLE_PROGRESS_BAR = True
print(f"Progress bar enabled: {ENABLE_PROGRESS_BAR}")

## Definición de hiperparámetros de la red

In [None]:
BATCH_SIZE = 32
NUM_EPOCHS = 60
LEARNING_RATE = 0.001
NUM_WORKERS = 4
CLASES = 10

## Preparación de la base de datos: CIFAR-10

### CIFAR-10 contiene 60k RGB imágenes de 32x32x3 pixeles distribuidas en 10 clases.

*   Grupo de entrenamiento: 50K imágenes
*   Grupo de evaluación: 10k imágenes

In [None]:
# Descargamos dataset

train_dataset = torchvision.datasets.CIFAR10(
    root="./data", train=True, transform=transforms.ToTensor(), download=True)

test_dataset = datasets.CIFAR10(
    root="./data", train=False, transform=transforms.ToTensor(), download=True)

In [None]:
# Definimos dataloaders para los datasets

train_loader = DataLoader(dataset=train_dataset,
                          batch_size=BATCH_SIZE,
                          num_workers=NUM_WORKERS,
                          drop_last=False, # ignora el último batch si el número de muestras no son divisibles para el tamaño de batch
                          shuffle=True)

test_loader = DataLoader(dataset=test_dataset,
                          batch_size=BATCH_SIZE,
                          num_workers=NUM_WORKERS,
                          drop_last=False,
                          shuffle=False)

In [None]:
print("Número de muestras de entrenamiento:", len(train_dataset))
print("Número de muestras de evaluación:", len(test_dataset))

In [None]:
print("Número de iteraciones por época:", len(train_loader))
# 1563*32 = 50000
# 3152*16 = 50000

In [None]:
# Visualización de imágenes

for images, labels in train_loader:
    break

plt.figure(figsize=(8, 8))
plt.axis("off")
plt.title("Imágenes de entrenamiento")
plt.imshow(np.transpose(torchvision.utils.make_grid(
    images[:64],
    padding=2,
    normalize=True),
    (1, 2, 0)))
plt.show()

In [None]:
# Dimensión de cada imagen
print("Dimensión de entrada:", images.shape) # (batch_size, channels_in, H, W)

### Definición de CIFAR-10 DataModule para Lightning

In [None]:
class CIFAR_DataModule(pl.LightningDataModule):
    def __init__(self, data_path="./", batch_size=32):
        super().__init__()
        self.data_path = data_path
        self.batch_size = batch_size
        self.train_transform = torchvision.transforms.Compose([
            torchvision.transforms.Resize((256, 256)),
            torchvision.transforms.RandomCrop((224, 224)),
            torchvision.transforms.ToTensor(),
            torchvision.transforms.Normalize(
                (0.485, 0.456, 0.406), (0.229, 0.224, 0.225))])

        self.test_transform = torchvision.transforms.Compose([
            torchvision.transforms.Resize((224, 224)),
            torchvision.transforms.ToTensor(),
            torchvision.transforms.Normalize(
                (0.485, 0.456, 0.406), (0.229, 0.224, 0.225))])
        #https://pytorch.org/hub/pytorch_vision_mobilenet_v2/ see normalization values

    def prepare_data(self): # Método prepare_data es usado para pasos que van a ser ejecutados solo una vez, como descargar el dataset y definir las transformaciones

        #datasets.CIFAR10(root=self.data_path, download=True)


        return

    def setup(self, stage=None):   # cargamos el dataset
        train = datasets.CIFAR10(
            root=self.data_path,
            train=True,
            transform=self.train_transform,
            download=False,
        )

        self.test = datasets.CIFAR10(
            root=self.data_path,
            train=False, # False para obtener el grupo de test
            transform=self.test_transform,
            download=False,
        )

        # Dividir el grupo de entrenamiento original en entrenamiento y validación

        self.train, self.valid = random_split(train, lengths=[int(len(train)*0.9), int(len(train)*0.1)])

        print("Muestras de entrenamiento:", len(self.train))
        print("Muestras de validación:", len(self.valid))
        print("Muestras de evaluación:", len(self.test))

    def train_dataloader(self):
        train_loader = DataLoader(
            dataset=self.train,
            batch_size=self.batch_size,
            drop_last=True,
            shuffle=True,
            num_workers=NUM_WORKERS,
        )
        return train_loader

    def val_dataloader(self):
        valid_loader = DataLoader(
            dataset=self.valid,
            batch_size=self.batch_size,
            drop_last=False,
            shuffle=False,
            num_workers=NUM_WORKERS,
        )
        return valid_loader

    def test_dataloader(self):
        test_loader = DataLoader(
            dataset=self.test,
            batch_size=self.batch_size,
            drop_last=False,
            shuffle=False,
            num_workers=NUM_WORKERS,
        )
        return test_loader

In [None]:
# Inicizalización de DataModule

torch.manual_seed(47)  # especificamos un random seed para reproducibilidad de inicializaciones aleatorias
data_module = CIFAR_DataModule(data_path='./data', batch_size=BATCH_SIZE)

## Importar la arquitectura MobileNetV2

https://pytorch.org/hub/

In [None]:
pytorch_model = torch.hub.load('pytorch/vision:v0.11.0', 'mobilenet_v2', weights=None) # modelo sin pre-entrenamiento

Modificamos el numero de clases de salida en MobileNetV2

In [None]:
pytorch_model

In [None]:
# Example input tensor with shape [32, 3, 224, 224]
input_tensor = torch.randn(32, 3, 224, 224)
# Forward pass through the feature extractor (excluding classifier)
features_ex = pytorch_model.features(input_tensor)  # Shape: [32, 1280, 7, 7]
print('Salida del bloque features: ', features_ex.shape)
# Apply global average pooling, reducing the spatial dimensions to 1x1
# it shrinks each 7x7 feature map into a single 1x1 pixel that represents the average of the original 7x7 area.
# nn.AdaptiveAvgPool2d((2, 2)) would transform each 7x7 feature map into a 2x2 feature map.
#This means, for each of the 1280 channels, the layer would effectively divide the 7x7 input area into four regions and calculate the average for each region. The resulting output tensor shape would be torch.Size([32, 1280, 2, 2]).
global_avg_pool = nn.AdaptiveAvgPool2d((1, 1)) #Layer definition
pooled_features = global_avg_pool(features_ex)  # Shape: [32, 1280, 1, 1]
print('Salida del bloque features after avg pooling: ', pooled_features.shape )
# Flatten the tensor
#The 1 refers to the start_dim argument of the torch.flatten function.
#When start_dim=1 is specified, torch.flatten will take dimensions 1, 2, and 3 (1280, 1, 1) and combine them into a single dimension. The new size of this flattened dimension will be the product of these dimensions: 1280 * 1 * 1 = 1280
flattened_features = torch.flatten(pooled_features, 1)  # Shape: [32, 1280]
print('Salida 1D: ', flattened_features.shape )

#where is the avg pooling in the model?
import inspect
print(inspect.getsource(pytorch_model.forward))
print(inspect.getsource(pytorch_model._forward_impl))

In [None]:
# Sobre escribimos el clasificador de la red, el cual es la ultima capa [-1]
pytorch_model.classifier[-1] = torch.nn.Linear(
        in_features=1280,  # Vector de caracteristicas entregado por MobileNetV2
        out_features=10)  # Numero de clases para CIFAR-10

## Definición del Módulo Lightning

In [None]:
class Lightning_CNN(pl.LightningModule):
    def __init__(self, model, learning_rate, classes):
        super().__init__()

        self.learning_rate = learning_rate
        # Modelo PyTorch heredado
        self.model = model
        self.classes = classes

        # Guardar hiperparametros en directorio de logs
        # Ignora los pesos del modelo
        self.save_hyperparameters(ignore=["model"])

        # Definición de métricas para cada grupo de datos
        self.train_acc = torchmetrics.Accuracy(num_classes = self.classes, task='multiclass')
        self.valid_acc = torchmetrics.Accuracy(num_classes = self.classes, task='multiclass')
        self.test_acc = torchmetrics.Accuracy(num_classes = self.classes, task='multiclass')

    # Defining the forward method is only necessary
    # if you want to use a Trainer's .predict() method (optional)
    def forward(self, x):
        return self.model(x)

    # Pasos del proceso forward comunes entre train, val, test
    def _shared_step(self, batch):
        features, true_labels = batch
        logits = self(features)
        loss = torch.nn.functional.cross_entropy(logits, true_labels) # cross entropy loss recibe logits y labels como entrada. No recibe probabilidades!
        probs = torch.nn.functional.softmax(logits, dim=1)
        predicted_labels = torch.argmax(probs, dim=1)

        return loss, true_labels, predicted_labels

    def training_step(self, batch, batch_idx):
        loss, true_labels, predicted_labels = self._shared_step(batch)
        self.log("train_loss", loss)
        self.train_acc(predicted_labels, true_labels)
        self.log("train_acc", self.train_acc, on_epoch=True, on_step=False)

        return loss  # this is passed to the optimzer for training

    def validation_step(self, batch, batch_idx):
        loss, true_labels, predicted_labels = self._shared_step(batch)
        self.log("valid_loss", loss)
        self.valid_acc(predicted_labels, true_labels)
        self.log("valid_acc", self.valid_acc, on_epoch=True, on_step=False, prog_bar=True)

    def test_step(self, batch, batch_idx):
        loss, true_labels, predicted_labels = self._shared_step(batch)
        self.test_acc(predicted_labels, true_labels)
        self.log("test_acc", self.test_acc, on_epoch=True, on_step=False)

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate, weight_decay=1e-4)
        return optimizer

## Entrenamiento del modelo

In [None]:
# Inicialización del modulo lightning

lightning_model = Lightning_CNN(pytorch_model, learning_rate=LEARNING_RATE, classes=CLASES)

callback_check = ModelCheckpoint(save_top_k=1, mode="max", monitor="valid_acc") # guardamos el mejor modelo monitoreado en la acc de validación. Por qué no la de entrenamiento?

callback_tqdm = RichProgressBar(leave=True)

logger = CSVLogger(save_dir="logs/", name="complex-cnn-cifar")

In [None]:
class CustomLoggingCallback(pl.Callback):
    def on_train_epoch_start(self, trainer, pl_module):
        self.epoch_start_time = time.time()

    def on_train_start(self, trainer, pl_module):
        print(f"Batch Size: {trainer.datamodule.batch_size}")

    def on_train_epoch_end(self, trainer, pl_module):
        epoch_duration = time.time() - self.epoch_start_time
        epoch = trainer.current_epoch
        train_loss = trainer.callback_metrics.get("train_loss")
        valid_loss = trainer.callback_metrics.get("valid_loss")

        output_str = f"Epoch {epoch}: "
        if train_loss is not None:
            output_str += f"Train Loss: {train_loss:.4f}, "
        if valid_loss is not None:
            output_str += f"Valid Loss: {valid_loss:.4f}, "
        output_str += f"Time per Epoch: {epoch_duration:.2f} seconds"
        print(output_str)

custom_logger_callback = CustomLoggingCallback()

In [None]:
#  Inicia entrenamiento

trainer = pl.Trainer(max_epochs=NUM_EPOCHS,
                    callbacks=[callback_check, custom_logger_callback],
                    accelerator="auto",  # Uses GPUs or TPUs if available
                    devices="auto",  # Uses all available GPUs/TPUs if applicable
                    logger=logger,
                    log_every_n_steps=100,
                    enable_progress_bar= ENABLE_PROGRESS_BAR)

start_time = time.time()
trainer.fit(model = lightning_model, datamodule = data_module)

runtime = (time.time() - start_time) / 60
print(f"Tiempo de entrenamiento en minutos: {runtime:.2f}")

## Graficamos las curvas de aprendizaje del modelo

In [None]:
metrics = pd.read_csv(f"{trainer.logger.log_dir}/metrics.csv")

aggreg_metrics = []
agg_col = "epoch"
for i, dfg in metrics.groupby(agg_col):
    agg = dict(dfg.mean())
    agg[agg_col] = i
    aggreg_metrics.append(agg)

df_metrics = pd.DataFrame(aggreg_metrics)
df_metrics[["train_loss", "valid_loss"]].plot(
    grid=True, legend=True, xlabel="Epoch", ylabel="Loss"
)
df_metrics[["train_acc", "valid_acc"]].plot(
    grid=True, legend=True, xlabel="Epoch", ylabel="ACC"
)

plt.show()

## Evaluamos el mejor modelo en el grupo de test

In [None]:
trainer.test(model = lightning_model, datamodule = data_module, ckpt_path = 'best') # cargamos el mejor checkpoint del modelo