# Project members

**Sali Raffaele**:
- ðŸ“§ [raffaele.sali@studio.unibo.it](mailto:raffaele.sali@studio.unibo.it)
- Student Number: `0001167817`

**Zanotti NiccolÃ²**:
- ðŸ“§ [niccolo.zanotti@studio.unibo.it](mailto:niccolo.zanotti@studio.unibo.it)
- Student Number: `0001121646`

**Zocco Ramazzo Marco**:
- ðŸ“§ [marco.zoccoramazzo@studio.unibo.it](mailto:marco.zoccoramazzo@studio.unibo.it)
- Student Number: `0001198289`

# Assignment Module 2: Pet Classification

The goal of this assignment is to implement a neural network that classifies images of 37 breeds of cats and dogs from the [Oxford-IIIT-Pet dataset](https://www.robots.ox.ac.uk/~vgg/data/pets/). The assignment is divided into two parts: first, you will be asked to implement from scratch your own neural network for image classification; then, you will fine-tune a pretrained network provided by PyTorch.

## Setup and dependencies installation

In the following, we will assume that you have 
- created a local python virtual environment - either with python [venv](https://docs.python.org/3/library/venv.html) module or via [uv](https://github.com/astral-sh/uv) (preferred) - with the `ipykernel` or `jupyter` packages pre-installed to start the jupyter kernel;
- have `git` installed on your machine;
- have a working internet connection

We will now download the `pyproject.toml` file specifying the project dependencies.

In [None]:
from pathlib import Path


def get_project_root() -> Path:
    """Return the root directory of the project."""
    start_dir = Path.cwd()

    markers = ["assignment2.ipynb"]

    for path in [start_dir, *list(start_dir.parents)]:
        for marker in markers:
            if (path / marker).exists():
                return path

    return start_dir


PROJECT_ROOT: Path = get_project_root()

In [None]:
import urllib.request

PROJECT_REPO: str = "niccolozanotti/ipcv-assignments"
COMMIT_HASH: str = "9f1f600af59401673e2e816b12d1ae740dc4386b"

pyproject_url = (
    f"https://raw.githubusercontent.com/{PROJECT_REPO}/{COMMIT_HASH}/pyproject.toml"
)
lockfile_url = f"https://raw.githubusercontent.com/{PROJECT_REPO}/{COMMIT_HASH}/uv.lock"

urllib.request.urlretrieve(pyproject_url, PROJECT_ROOT / "pyproject.toml")
urllib.request.urlretrieve(lockfile_url, PROJECT_ROOT / "uv.lock");

If using [uv](https://github.com/astral-sh/uv) (recommended) you can now install the dependencies to a local virtual environment at `.venv` simply via
```sh
uv sync --extra assignment2
```

If not, the same can be achieved with the usual python [venv](https://docs.python.org/3/library/venv.html):
```sh
python3 -m venv .venv
source .venv/bin/activate
(.venv) pip install ".[assignment2]" 
```

Make sure to do the above and *restart the kernel* if necessary before proceeding.

In [None]:
import json
import logging
import shutil
import subprocess
from contextlib import nullcontext  # Allows conditional 'with' statements
from dataclasses import dataclass
from typing import List, Tuple

import matplotlib.pyplot as plt
import pandas as pd
import torch
import torch.nn.functional as F
import torchvision.transforms as T
from graphviz import Source as draw_diagram
from PIL import Image
from torch import Tensor, nn
from torch.utils.data import DataLoader, Dataset
from torchvision import models
from torchvision.models import ResNet18_Weights
from torchvision.transforms import v2

## Dataset

The following cells contain the code to download and access the dataset you will be using in this assignment. Note that, although this dataset features each and every image from [Oxford-IIIT-Pet](https://www.robots.ox.ac.uk/~vgg/data/pets/), it uses a different train-val-test split than the original authors.

In [None]:
BRANCH_NAME: str = "dataset/assignment2"
REPO_URL: str = f"https://github.com/{PROJECT_REPO}.git"

temp_dir: Path = PROJECT_ROOT / "temp_repo"
dataset_path: Path = PROJECT_ROOT / "dataset"

if dataset_path.exists():
    print(f"'{dataset_path.name}' folder already exists locally. Skipping download.")
else:
    try:
        print(
            f"Downloading dataset at {PROJECT_REPO}/{BRANCH_NAME} via git sparse checkout..."
        )

        # Clone the repo tree
        clone_cmd = [
            "git",
            "clone",
            "--filter=blob:none",
            "--sparse",
            "--depth",
            "1",
            "--branch",
            BRANCH_NAME,
            REPO_URL,
            str(temp_dir),
        ]
        subprocess.run(clone_cmd, check=True, capture_output=True, text=True)

        # Fetch the contents of the 'dataset' folder
        sparse_cmd = ["git", "-C", str(temp_dir), "sparse-checkout", "set", "dataset"]
        subprocess.run(sparse_cmd, check=True, capture_output=True, text=True)

        source_dataset_path: Path = temp_dir / "dataset"

        if source_dataset_path.exists():
            shutil.move(source_dataset_path, dataset_path)
            print("Dataset successfully downloaded.")
        else:
            print(
                f"Error: Could not find the 'dataset' folder inside the cloned repo at '{temp_dir}'."
            )

    except subprocess.CalledProcessError as e:
        print(f"Git command failed: {e.stderr}")

    finally:
        # Clean up
        if temp_dir.exists():
            shutil.rmtree(temp_dir, ignore_errors=True)

In [None]:
class OxfordPetDataset(Dataset):
    def __init__(self, split: str, transform=None) -> None:
        super().__init__()

        self.root = dataset_path
        self.split = split
        self.names, self.labels = self._get_names_and_labels()
        self.transform = transform

    def __len__(self) -> int:
        return len(self.labels)

    def __getitem__(self, idx) -> Tuple[Tensor, int]:
        img_path = self.root / "images" / f"{self.names[idx]}.jpg"
        img = Image.open(img_path).convert("RGB")
        label = self.labels[idx]

        if self.transform:
            img = self.transform(img)

        return img, label

    def get_num_classes(self) -> int:
        return max(self.labels) + 1

    def _get_names_and_labels(self) -> Tuple[List[str], List[int]]:
        names = []
        labels = []

        with open(self.root / "annotations" / f"{self.split}.txt") as f:
            for line in f:
                name, label = line.replace("\n", "").split(" ")
                (names.append(name),)
                labels.append(int(label) - 1)

        return names, labels

In [None]:
train_dataset1 = OxfordPetDataset(split="train")
print(len(train_dataset1))
img, label = train_dataset1[0]
print(img.size, label)

In [None]:
def breed_from_name(name):
    return "_".join(name.split("_")[:-1])

In [None]:
# Images if we don't apply transformations
plt.figure(figsize=(12, 6))
for i in range(6):
    img, _ = train_dataset1[i]
    name = train_dataset1.names[i]

    plt.subplot(2, 3, i + 1)
    plt.imshow(img)
    plt.title(breed_from_name(name))
    plt.axis("off")

plt.tight_layout()
plt.show()

In [None]:
# Images if we apply transformations
train_transform = T.Compose(
    [
        T.Resize((256, 256)),
        T.RandomCrop(224),
        T.RandomHorizontalFlip(p=0.5),
        T.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
        T.ToTensor(),
        T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]
)

val_transform = T.Compose(
    [
        T.Resize((224, 224)),
        T.ToTensor(),
        T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]
)

train_dataset = OxfordPetDataset(split="train", transform=train_transform)
test_dataset = OxfordPetDataset(split="test", transform=val_transform)
validation_dataset = OxfordPetDataset(split="val", transform=val_transform)
print("Number of samples - train:", len(train_dataset))
print("Number of classes - train:", train_dataset.get_num_classes())
print("Number of samples - test:", len(test_dataset))
print("Number of classes - test:", test_dataset.get_num_classes())
print("Number of samples - validation:", len(validation_dataset))
print("Number of classes - validation:", validation_dataset.get_num_classes())

In [None]:
logging.getLogger("matplotlib").setLevel(logging.ERROR)


def show_samples(dataset, n=6):
    plt.figure(figsize=(12, 6))
    for i in range(n):
        img, _ = dataset[i]
        names = dataset.names[i]
        # label = dataset.labels[i]
        img = img.permute(1, 2, 0)

        plt.subplot(2, n // 2, i + 1)
        plt.imshow(img)
        plt.title(f"{breed_from_name(names)}")
        plt.axis("off")

    plt.tight_layout()
    plt.show()


show_samples(train_dataset)

In [None]:
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=2)

val_loader = DataLoader(validation_dataset, batch_size=32, shuffle=False, num_workers=2)

test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=2)

In [None]:
images, labels = next(iter(train_loader))
print(images.shape)
print(images.min(), images.max())

In [None]:
# Image example after transformations
def denormalized_img(tensor, mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]):
    if tensor.ndim == 4:
        mean = torch.tensor(mean, device=tensor.device).view(1, -1, 1, 1)
        std = torch.tensor(std, device=tensor.device).view(1, -1, 1, 1)
    else:
        mean = torch.tensor(mean, device=tensor.device).view(-1, 1, 1)
        std = torch.tensor(std, device=tensor.device).view(-1, 1, 1)
    return tensor * std + mean

index = torch.randperm(len(train_dataset))[:6]
plt.figure(figsize=(15, 10))
for ind, k in enumerate(index):
  img, label = train_dataset[k]
  img = denormalized_img(img).permute(1, 2, 0)
  plt.subplot(2,3, ind+1)
  plt.imshow(img)
  plt.title(f"Label: {label} - {breed_from_name(train_dataset.names[k])}")
  plt.axis("off")

## Part 1: design your own network

Your goal is to implement a convolutional neural network for image classification and train it from scratch on `OxfordPetDataset`. You should consider yourselves satisfied once you obtain a classification accuracy on the test split of ~60%. You are free to achieve this however you want, except for a few rules you must follow:

- Compile this notebook by displaying the results obtained by the best model you found throughout your experimentation; then show how, by removing some of its components, its performance drops. In other words, do an *ablation study* to prove that your design choices have a positive impact on the final result.

- Do not instantiate an off-the-self PyTorch network. Instead, construct your network as a composition of existing PyTorch layers. In more concrete terms, you can use e.g. `torch.nn.Linear`, but you cannot use e.g. `torchvision.models.alexnet`.

- Show your results and ablations with plots, tables, images, etc. â€” the clearer, the better.

Don't be too concerned with your model performance: the ~60% is just to give you an idea of when to stop. Keep in mind that a thoroughly justified model with lower accuracy will be rewarded more points than a poorly experimentally validated model with higher accuracy.

## **NOTE:**
- Several strategies and network architectures were explored, with the main objective of exploiting the components and architectures presented during the course. More advanced networks, such as EfficientNet, were also developed, achieving results very close to those of a simpler ResNet-inspired network. For this reason, the network that better leverages the course topics was ultimately chosen.

## **Common Pipeline**

**Includes:**
- **Accuracy function:** takes into account different output structure when MixUp is employed or not
- **Precision, Recall and F1 computation function**
- **Evaluation function**
- **Residual block class:** defines the structure of the residual blocks, taking into account the presence or not of the Batch Normalization layers
- **Convolutional block class:** defines the structure of the convolutional blocks, which are employed when trying to evidence the impact of Residual blocks
- **Network class:** defines the structure of the CNN, taking into account the use of Stem layers, Batch Normalization layers, Residual layers, Pooling layers and Dropout, to measure the impact of these components
- **TrainConfig class:** defines some core parameters for the training process (like number of epochs, starting learning rate, MixUp parameter, label smoothing factor) and takes into account the use of Label Smoothing, Learning Rate Scheduler and MixUp data augmentation technique
- **build_training_components function:** defines the structure of criterion, optimizer, scheduler and mixup according to boolean and values set in the TrainConfig class
- **Train process function:** defines the pipeline for training the model, minimizing the loss function and updating weights, storing the model with the highest accuracy obtained in validation set
- **Plot functions**

In [None]:
def accuracy(outputs, labels):
    preds = outputs.argmax(dim=1)

    # If labels are one-hot / soft (MixUp case)
    if labels.ndim == 2:
        labels = labels.argmax(dim=1)

    return (preds == labels).sum()

In [None]:
def compute_precision_recall_f1(preds, labels, num_classes):
    eps = 1e-8

    precision_list = []
    recall_list = []
    f1_list = []

    for cls in range(num_classes):
        tp = ((preds == cls) & (labels == cls)).sum().float()
        fp = ((preds == cls) & (labels != cls)).sum().float()
        fn = ((preds != cls) & (labels == cls)).sum().float()

        precision = tp / (tp + fp + eps)
        recall = tp / (tp + fn + eps)
        f1 = 2 * precision * recall / (precision + recall + eps)

        precision_list.append(precision)
        recall_list.append(recall)
        f1_list.append(f1)

    return (
        torch.mean(torch.stack(precision_list)).item(),
        torch.mean(torch.stack(recall_list)).item(),
        torch.mean(torch.stack(f1_list)).item(),
    )

In [None]:
def evaluate(model, loader, criterion, device):
    model.eval()

    total_loss = 0.0
    total_correct = 0
    total_samples = 0

    all_preds = []
    all_labels = []

    with torch.no_grad():
        for images, labels in loader:
            images = images.to(device)
            labels = labels.to(device)

            outputs = model(images)
            loss = criterion(outputs, labels)

            preds = outputs.argmax(dim=1)

            # Handle one-hot / soft labels
            if labels.ndim == 2:
                labels_hard = labels.argmax(dim=1)
            else:
                labels_hard = labels

            total_loss += loss.item() * labels.size(0)
            total_correct += accuracy(outputs, labels)
            total_samples += labels.size(0)

            all_preds.append(preds)
            all_labels.append(labels_hard)

    avg_loss = total_loss / total_samples
    avg_acc = total_correct.float() / total_samples

    all_preds = torch.cat(all_preds)
    all_labels = torch.cat(all_labels)

    precision, recall, f1 = compute_precision_recall_f1(
        all_preds, all_labels, num_classes=outputs.size(1)
    )

    return avg_loss, avg_acc.item(), precision, recall, f1

In [None]:
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, use_batchnorm=True):
        super().__init__()

        norm = nn.BatchNorm2d if use_batchnorm else nn.Identity

        self.conv1 = nn.Conv2d(
            in_channels,
            out_channels,
            kernel_size=3,
            stride=stride,
            padding=1,
            bias=False,
        )
        self.bn1 = norm(out_channels)

        self.relu = nn.ReLU(inplace=True)

        self.conv2 = nn.Conv2d(
            out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False
        )
        self.bn2 = norm(out_channels)

        # Skip connection adjustment if shape changes
        self.skip = None
        if stride != 1 or in_channels != out_channels:
            self.skip = nn.Sequential(
                nn.Conv2d(
                    in_channels, out_channels, kernel_size=1, stride=stride, bias=False
                ),
                norm(out_channels),
            )

    def forward(self, x):
        identity = x

        out = self.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))

        if self.skip is not None:
            identity = self.skip(identity)

        out += identity
        return self.relu(out)

In [None]:
class ConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, use_batchnorm=True):
        super().__init__()
        self.block = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 3, stride, 1, bias=False),
            nn.BatchNorm2d(out_channels) if use_batchnorm else nn.Identity(),
            nn.ReLU(),
        )

    def forward(self, x):
        return self.block(x)

In [None]:
class Net(nn.Module):
    def __init__(
        self,
        n_classes,
        use_stem=True,
        use_residuals=True,
        use_batchnorm=True,
        use_pooling=True,
        use_dropout=True,
    ):
        super().__init__()

        self.use_residuals = use_residuals

        # Stem layers
        if use_stem:
            self.stem = nn.Sequential(
                nn.Conv2d(3, 32, kernel_size=3, stride=2, padding=1, bias=False),
                nn.BatchNorm2d(32),
                nn.ReLU(inplace=True),
                nn.Conv2d(32, 32, kernel_size=3, stride=1, padding=1, bias=False),
                nn.BatchNorm2d(32),
                nn.ReLU(inplace=True),
                nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1, bias=False),
                nn.BatchNorm2d(64),
                nn.ReLU(inplace=True),
            )
        else:
            # Minimal stem: only channel lifting, no downsampling or pooling
            self.stem = nn.Sequential(
                nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False),
                nn.BatchNorm2d(64) if use_batchnorm else nn.Identity(),
                nn.ReLU(),
            )

        # Residual blocks
        self.stage1 = self._make_stage(
            64, 64, num_blocks=2, stride=1, use_batchnorm=use_batchnorm
        )
        self.stage2 = self._make_stage(
            64, 128, num_blocks=2, stride=2, use_batchnorm=use_batchnorm
        )
        self.stage3 = self._make_stage(
            128, 256, num_blocks=2, stride=2, use_batchnorm=use_batchnorm
        )
        self.stage4 = self._make_stage(
            256, 512, num_blocks=2, stride=2, use_batchnorm=use_batchnorm
        )

        # Classifier and Pooling
        self.pool = nn.AdaptiveAvgPool2d((1, 1)) if use_pooling else None
        self.fc = nn.Sequential(
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Dropout(p=0.2) if use_dropout else nn.Identity(),
            nn.Linear(256, n_classes),
        )

    def _make_stage(self, in_channels, out_channels, num_blocks, stride, use_batchnorm):
        if not self.use_residuals:
            # Plain conv blocks instead of residual blocks
            layers = [ConvBlock(in_channels, out_channels, stride, use_batchnorm)]
            for _ in range(1, num_blocks):
                layers.append(ConvBlock(out_channels, out_channels, 1, use_batchnorm))
            return nn.Sequential(*layers)
        else:
            layers = [ResidualBlock(in_channels, out_channels, stride, use_batchnorm)]
            for _ in range(1, num_blocks):
                layers.append(
                    ResidualBlock(
                        out_channels, out_channels, use_batchnorm=use_batchnorm
                    )
                )
            return nn.Sequential(*layers)

    def forward(self, x):
        x = self.stem(x)

        x = self.stage1(x)
        x = self.stage2(x)
        x = self.stage3(x)
        x = self.stage4(x)
        if self.pool is not None:
            x = self.pool(x)
        else:
            x = x.mean(dim=(2, 3), keepdim=True)

        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x

In [None]:
@dataclass
class TrainConfig:
    num_epochs: int = 100
    lr: float = 1e-3
    weight_decay: float = 1e-2

    use_scheduler: bool = False
    use_label_smoothing: bool = False
    label_smoothing: float = 0.1

    use_mixup: bool = False
    mixup_alpha: float = 0.2

    save_path: str = None

In [None]:
class NoMixUp:
    def __call__(self, x, y):
        return x, y

In [None]:
def build_training_components(model, train_loader, train_dataset, config: TrainConfig):
    criterion = nn.CrossEntropyLoss(
        label_smoothing=config.label_smoothing if config.use_label_smoothing else 0.0
    )

    optimizer = torch.optim.AdamW(
        model.parameters(),
        lr=config.lr,
        weight_decay=config.weight_decay,
    )

    scheduler = None
    if config.use_scheduler:
        scheduler = torch.optim.lr_scheduler.OneCycleLR(
            optimizer,
            max_lr=config.lr,
            epochs=config.num_epochs,
            steps_per_epoch=len(train_loader),
            pct_start=0.1,
            anneal_strategy="cos",
        )

    if config.use_mixup:
        mixup = v2.MixUp(
            alpha=config.mixup_alpha,
            num_classes=train_dataset.get_num_classes(),
        )
    else:
        mixup = NoMixUp()

    return criterion, optimizer, scheduler, mixup

`USE_MLFLOW` boolean object determines whether the training metrics, hyperparameters, model architectures, and artifacts across all PyTorch experiments along with system usage metrics [will be logged on the mlflow instance](https://mlflow.org/docs/latest/ml/deep-learning/pytorch/).

In [None]:
USE_MLFLOW = True

if USE_MLFLOW:
    import mlflow
    mlflow.set_tracking_uri("https://mlflow.niccolozanotti.com")
    mlflow.set_experiment("ipcv-pet-classification-exp")
    print("MLflow logging is ENABLED.")
else:
    print("MLflow logging is DISABLED. Models will be saved locally.")

In [None]:
def train_model(model_name, model, train_loader, val_loader, criterion, optimizer, scheduler, mixup, device, config, use_mlflow=True):
    """
    Training loop that conditionally logs to MLflow and returns history.
    Assumes an MLflow run is ALREADY ACTIVE from the calling cell.
    """
    # Initialize history
    history = {
        "train_loss": [], "val_loss": [],
        "train_acc": [], "val_acc": [],
        "val_precision": [], "val_recall": [], "val_f1": [],
        "lr": []
    }
    
    best_val_acc = 0.0

    for epoch in range(config.num_epochs):
        model.train()
        total_loss, total_correct, total_samples = 0.0, 0, 0
        
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            
            # Apply MixUp if configured
            if config.use_mixup and mixup is not None:
                images, labels = mixup(images, labels)

            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            if scheduler is not None:
                scheduler.step()

            total_loss += loss.item() * images.size(0)
            total_correct += accuracy(outputs, labels) 
            total_samples += images.size(0)

        # Calculate Epoch Metrics
        train_loss = total_loss / total_samples
        train_acc = total_correct.float() / total_samples
        
        # Evaluate on Validation Set
        val_loss, val_acc, val_precision, val_recall, val_f1 = evaluate(
            model, val_loader, criterion, device
        )

        current_lr = optimizer.param_groups[0]["lr"]
        
        # --- Update History Dictionary ---
        history["train_loss"].append(train_loss)
        history["val_loss"].append(val_loss)
        history["train_acc"].append(train_acc.item())
        history["val_acc"].append(val_acc)
        history["val_precision"].append(val_precision)
        history["val_recall"].append(val_recall)
        history["val_f1"].append(val_f1)
        history["lr"].append(current_lr)

        if use_mlflow:
            # Log metrics to MLflow per epoch
            mlflow.log_metrics({
                "train_loss": train_loss,
                "train_acc": train_acc.item(),
                "val_loss": val_loss,
                "val_acc": val_acc,
                "val_precision": val_precision,
                "val_recall": val_recall,
                "val_f1": val_f1,
                "lr": current_lr
            }, step=epoch)

        print(f"Epoch [{epoch + 1}/{config.num_epochs}] | "
              f"Train Loss: {train_loss:.3f}, Train Acc: {train_acc:.3f} | "
              f"Val Loss: {val_loss:.3f}, Val Acc: {val_acc:.3f} | LR: {current_lr:.2e}")

        # Save best model
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            
            # Standard local save
            if config.save_path is not None:
                torch.save(model.state_dict(), config.save_path)
                
            if use_mlflow:
                # Uploads the PyTorch model artifact directly to MLflow
                mlflow.pytorch.log_model(model, artifact_path="best_model")

    print(f"[{model_name}] Finished! Best Validation Accuracy: {best_val_acc:.3f}\n")
    return best_val_acc, history

In [None]:
def plot_accuracy(history):
    plt.figure()
    plt.plot(history["train_acc"])
    plt.plot(history["val_acc"])
    plt.xlabel("Epoch")
    plt.ylabel("Accuracy")
    plt.title("Training vs Validation Accuracy")
    plt.legend(["Train", "Validation"])
    plt.grid(True)
    plt.show()


def plot_loss(history):
    plt.figure()
    plt.plot(history["train_loss"])
    plt.plot(history["val_loss"])
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.title("Training vs Validation Loss")
    plt.legend(["Train", "Validation"])
    plt.grid(True)
    plt.show()


def plot_learning_rate(history):
    plt.figure()
    plt.plot(history["lr"])
    plt.xlabel("Iteration")
    plt.ylabel("Learning Rate")
    plt.title("Learning Rate Schedule")
    plt.grid(True)
    plt.show()

In [None]:
ABLATION_CSV_PATH: Path = PROJECT_ROOT / "ablation_test_results.csv"
ABLATION_JSON_PATH: Path = PROJECT_ROOT / "ablation_histories.json"

In [None]:
# Load previous CSV results if available
if ABLATION_CSV_PATH.exists():
    df_results = pd.read_csv(ABLATION_CSV_PATH)
    results = df_results.to_dict(orient="records")
else:
    results = []

# Load previous histories if available
if ABLATION_JSON_PATH.exists():
    with open(ABLATION_JSON_PATH, "r") as f:
        all_histories = json.load(f)
else:
    all_histories = {}

## **Model variant: Full model (baseline)**

**Architecture**
- Convolutional neural network implementing architectures and strategies faced during the course.
- A convolutional stem composed of a three 3Ã—3 convolution (1st with stride=2 and others with stride=1), followed by Batch Normalization, ReLU activation, and max pooling.
- Four sequential stages operating at increasing feature dimensions (64 â†’ 128 â†’ 256 â†’ 512).
- Each stage consists of two residual blocks with identity skip connections.
- When spatial resolution or channel dimensions change, skip connections are adapted using a 1Ã—1 convolution followed by Batch Normalization.
- Global feature aggregation is performed using adaptive average pooling.
- The classifier head consists of two fully connected layers with ReLU activation and dropout.

**Normalization and regularization**
- Batch Normalization is applied after every convolution, including within residual branches and skip connections.
- Dropout is applied in the classifier to reduce overfitting.

**Training setup**
- Optimized using AdamW with weight decay.
- Learning rate scheduling is enabled via OneCycleLR with cosine annealing.
- Cross-entropy loss with label smoothing is used.
- MixUp data augmentation is applied during training.
- Gradient norm clipping is used to improve training stability.

**Purpose**
- This configuration serves as the baseline model against which all ablation studies are compared.


**NOTE:** All ablation variants modify a single component at a time while keeping the remaining architecture and training configuration identical to the baseline model.


In [None]:
network_diagram = r"""
digraph ImprovedNet {
    rankdir=TB;
    fontname="Helvetica";
    node [shape=record, fontname="Helvetica"];

    Input [
        label="Input\n3Ã—224Ã—224"
    ];

    Stem [
        label="{Stem|
        Conv3Ã—3, s=2, p=1|
        BN + ReLU|
        Conv3Ã—3, s=1, p=1|
        BN + ReLU|
        Conv3Ã—3, s=1, p=1|
        BN + ReLU|
        Output: 64Ã—112Ã—112}"
    ];

    Stage1 [
        label="{Stage 1:
        2Ã—ResidualBlock|
        RB1:
        Conv3Ã—3 â†’ BN â†’ ReLU +
        Conv3Ã—3 â†’ BN
        + Identity|
        RB2:
        Conv3Ã—3 â†’ BN â†’ ReLU +
        Conv3Ã—3 â†’ BN
        + Identity|
        Output: 64x56x56}"
    ];

    Stage2 [
        label="{Stage 2:
        2Ã—ResidualBlock|
        RB1:
        Conv3Ã—3, s=2 + BN + ReLU +
        Conv3Ã—3 + BN
        + Skip Conv1Ã—1, s=2 + BN|
        Output: 128x28x28|
        RB2:
        Conv3Ã—3 + BN + ReLU +
        Conv3Ã—3 + BN|
        Output: 128x28x28}"
    ];

    Stage3 [
        label="{Stage 3:
        2Ã—ResidualBlock|
        RB1:
        Conv3Ã—3, s=2 + BN + ReLU +
        Conv3Ã—3 + BN
        + Skip Conv1Ã—1, s=2 + BN|
        Output: 256x14x14|
        RB2:
        Conv3Ã—3 + BN + ReLU +
        Conv3Ã—3 + BN|
        Output: 256x14x14}"
    ];

    Stage4 [
        label="{Stage 4:
        2Ã—ResidualBlock|
        RB1:
        Conv3Ã—3, s=2 + BN + ReLU +
        Conv3Ã—3 + BN
        + Skip Conv1Ã—1, s=2 + BN|
        Output: 512x7x7|
        RB2:
        Conv3Ã—3 + BN + ReLU +
        Conv3Ã—3 + BN|
        Output: 512x7x7}"
    ];

    Pool [
        label="AdaptiveAvgPool\n512Ã—1Ã—1"
    ];

    FC [
        label="{Classifier|
        Linear 512â†’256 + ReLU|
        Dropout p=0.2|
        Linear 256â†’N classes}"
    ];

    Output [
        label="Output\nN classes"
    ];

    Input -> Stem -> Stage1 -> Stage2 -> Stage3 -> Stage4 -> Pool -> FC -> Output;
}
"""
draw_diagram(network_diagram)

We now choose the right GPU-acceleration backend to speed up our model(s) training. The following code chunk allows for CUDA (NVIDIA chips backend) or MPS (Apple Silicon chips backend) devices with fallback to CPU. 
The outputs in this notebook were obtained by running it for $\sim 5$ hours on a [NVIDIA L40 GPU](https://www.nvidia.com/en-us/data-center/l40/). You can find the used sbatch script [here](https://github.com/niccolozanotti/ipcv-assignments/blob/main/scripts/run-task2.sbatch).

In [None]:
match (torch.cuda.is_available(), torch.backends.mps.is_available()):
    case (True, _):
        device = torch.device("cuda")
    case (False, True):
        device = torch.device("mps")
    case _:
        device = torch.device("cpu")

print(f"Using device: {device}")

In [None]:
def run_ablation_experiment(
    model_name, 
    model_kwargs=None, 
    config_kwargs=None, 
    custom_train_dataset=None, 
    custom_train_loader=None
):
    """
    Runs a single ablation experiment, handles all MLflow logging, 
    local saving, and plotting.
    """
    model_kwargs = model_kwargs or {}
    config_kwargs = config_kwargs or {}
    
    # Use defaults unless custom ones are provided (for Model 6)
    ds_train = custom_train_dataset if custom_train_dataset else train_dataset
    dl_train = custom_train_loader if custom_train_loader else train_loader

    # 1. Open the MLflow Run
    run_context = mlflow.start_run(run_name=model_name) if USE_MLFLOW else nullcontext()
    
    with run_context:
        
        # Initialize model with dynamic arguments
        model = Net(n_classes=ds_train.get_num_classes(), **model_kwargs).to(device)

        total_params = sum(p.numel() for p in model.parameters())
        trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
        print(f"=== Running: {model_name} ===")
        print(f"Total parameters: {total_params:,} | Trainable: {trainable_params:,}")
        
        if USE_MLFLOW:
            mlflow.log_param("total_params", total_params)

        # Setup Config with dynamic overrides
        default_config = {
            "use_scheduler": True,
            "use_label_smoothing": True,
            "use_mixup": True,
            "save_path": f"best_{model_name.replace(' ', '_')}.pth"
        }
        default_config.update(config_kwargs) # Overwrite defaults with any passed kwargs
        config = TrainConfig(**default_config)
        
        if USE_MLFLOW:
            mlflow.log_params(vars(config))

        # Build training components
        criterion, optimizer, scheduler, mixup = build_training_components(
            model=model,
            train_loader=dl_train,
            train_dataset=ds_train,
            config=config,
        )

        # 2. Train the model
        best_acc, history = train_model(
            model_name=model_name,
            model=model,
            train_loader=dl_train,
            val_loader=val_loader,
            criterion=criterion,
            optimizer=optimizer,
            scheduler=scheduler,
            mixup=mixup,
            device=device,
            config=config,
            use_mlflow=USE_MLFLOW,
        )

        # 3. Load best weights and Evaluate
        model.load_state_dict(torch.load(config.save_path, map_location=device))
        model.to(device)

        test_loss, test_acc, test_prec, test_recall, test_f1 = evaluate(
            model, test_loader, criterion, device
        )

        print(f"[{model_name}] Test Loss: {test_loss:.3f} | Test Acc: {test_acc:.3f}")

        # 4. Log to MLflow
        if USE_MLFLOW:
            mlflow.log_metrics({
                "test_loss": test_loss, "test_accuracy": test_acc,
                "test_precision": test_prec, "test_recall": test_recall, "test_f1": test_f1,
            })
            mlflow.pytorch.log_model(model, artifact_path="model")

    # --- Local Saving and Plotting ---
    
    # Update global results list (removes previous run of the same model if it exists)
    global results, all_histories 
    results = [r for r in results if r["Model"] != model_name]
    results.append({
        "Model": model_name, "Test Loss": test_loss, "Test Accuracy": test_acc,
        "Test Precision": test_prec, "Test Recall": test_recall, "Test F1": test_f1,
    })
    
    all_histories[model_name] = history

    # Save to disk
    pd.DataFrame(results).round(4).to_csv(ABLATION_CSV_PATH, index=False)
    with open(ABLATION_JSON_PATH, "w") as f:
        json.dump(all_histories, f, indent=4)

    # Plot
    plot_accuracy(history)
    plot_loss(history)
    plot_learning_rate(history)
    
    return model, history

In [None]:
# Model 1
run_ablation_experiment("Baseline")

## **Model variant: No Batch Normalization**

**Change**
- Removed all Batch Normalization layers from the network, including those in residual skip connections.

**Purpose**
- To assess the contribution of Batch Normalization to training stability and final performance.


In [None]:
# Model 2
run_ablation_experiment("No BatchNorm", model_kwargs={"use_batchnorm": False})

## **Model variant: No Pooling**

**Change**
- Removed all pooling operations from the network, including max pooling in the stem and global average pooling before the classifier.

**Purpose**
- To evaluate the role of spatial downsampling and global feature aggregation in representation learning and classification performance.

In [None]:
# Model 3
run_ablation_experiment("No Pooling", model_kwargs={"use_pooling": False})

## **Model variant: No Dropout**

**Change**
- Removed dropout from the classifier head while keeping the rest of the architecture unchanged.

**Purpose**
- To assess the impact of dropout-based regularization on overfitting and generalization performance.

In [None]:
# Model 4
run_ablation_experiment("No Dropout", model_kwargs={"use_dropout": False})

## **Model variant: No MixUp**

**Change**
- Disabled MixUp data augmentation during training, using only standard inputâ€“label pairs.

**Purpose**
- To measure the effect of MixUp on model robustness and generalization compared to standard supervised training.

In [None]:
# Model 5
run_ablation_experiment("No MixUp", config_kwargs={"use_mixup": False})

## **Model variant: No Data Augmentation**

**Change**
- Disabled all data augmentation techniques during training, including MixUp and any other stochastic input transformations.

**Purpose**
- To isolate the contribution of data augmentation to model generalization and performance on unseen data.

In [None]:
# Model 6 (Requires custom dataset/loader logic first)
train_transform2 = T.Compose([
    T.Resize((224, 224)), T.ToTensor(),
    T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
train_dataset2 = OxfordPetDataset(split="train", transform=train_transform2)
train_loader2 = DataLoader(train_dataset2, batch_size=32, shuffle=True, num_workers=2)

run_ablation_experiment(
    "No Augmentation", 
    config_kwargs={"use_mixup": False},
    custom_train_dataset=train_dataset2,
    custom_train_loader=train_loader2
)

## **Model variant: No Stem Layer**

**Change**
- Removed the convolutional stem (7Ã—7 convolution, Batch Normalization, ReLU, and max pooling), feeding inputs directly into the first stage of the network.

**Purpose**
- To investigate the importance of early-stage feature extraction and aggressive spatial downsampling.

In [None]:
# Model 7
run_ablation_experiment("No StemLayer", model_kwargs={"use_stem": False})

## **Model variant: No Residual Blocks**

**Change**
- Replaced all residual blocks with plain convolutional blocks, removing skip connections while preserving depth and channel dimensions.

**Purpose**
- To evaluate the contribution of residual connections to optimization stability and final accuracy.

In [None]:
# Model 8
run_ablation_experiment("No Residual", model_kwargs={"use_residuals": False})

## **Model variant: No Label Smoothing**

**Change**
- Disabled label smoothing in the cross-entropy loss, using hard one-hot target labels during training.

**Purpose**
- To assess the effect of label smoothing on model calibration and generalization.

In [None]:
# Model 9
run_ablation_experiment("No LabelSmoothing", config_kwargs={"use_label_smoothing": False})

## **Model variant: No Learning Rate Scheduler**

**Change**
- Disabled the learning rate scheduler, training the model with a constant learning rate throughout all epochs.

**Purpose**
- To evaluate the impact of dynamic learning rate scheduling on convergence speed and final performance.

In [None]:
# Model 10
run_ablation_experiment("No LR Scheduler", config_kwargs={"use_scheduler": False})

## **RECAP**



In [None]:
model_order = [
    "Baseline",
    "No BatchNorm",
    "No Pooling",
    "No Dropout",
    "No MixUp",
    "No Augmentation",
    "No StemLayer",
    "No Residual",
    "No LabelSmoothing",
    "No LR Scheduler",
]

In [None]:
plt.figure()

for model in model_order:
    if model in all_histories:
        plt.plot(all_histories[model]["val_acc"])

plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.title("Validation Accuracy - Ablation Study")
plt.legend(
    [m for m in model_order if m in all_histories],
    loc="center left",
    bbox_to_anchor=(1, 0.5),
)
plt.grid(True)
plt.show()

In [None]:
plt.figure()

for model in model_order:
    if model in all_histories:
        plt.plot(all_histories[model]["train_loss"])

plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Training Loss - Ablation Study")
plt.legend(
    [m for m in model_order if m in all_histories],
    loc="center left",
    bbox_to_anchor=(1, 0.5),
)
plt.grid(True)
plt.show()

## Part 2: fine-tune an existing network

Your goal is to fine-tune a pretrained ResNet-18 model on `OxfordPetDataset`. Use the implementation provided by PyTorch, i.e. the opposite of part 1. Specifically, use the PyTorch ResNet-18 model pretrained on ImageNet-1K (V1). Divide your fine-tuning into two parts:

2A. First, fine-tune the ResNet-18 with the same training hyperparameters you used for your best model in part 1.

2B. Then, tweak the training hyperparameters in order to increase the accuracy on the test split. Justify your choices by analyzing the training plots and/or citing sources that guided you in your decisions â€” papers, blog posts, YouTube videos, or whatever else you may find useful. You should consider yourselves satisfied once you obtain a classification accuracy on the test split of ~90%.

Approach with helper

In [None]:
def run_finetuning_stage(
    model_name,
    model,
    config,
    criterion,
    optimizer,
    scheduler,
    mixup_fn,
    custom_mlflow_params=None
):
    """
    Executes a fine-tuning stage, handling training, evaluation, 
    MLflow logging, local saving, and plotting.
    """
    # 1. Open MLflow context
    run_context = mlflow.start_run(run_name=model_name) if USE_MLFLOW else nullcontext()
    
    with run_context:
        total_params = sum(p.numel() for p in model.parameters())
        trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
        
        print(f"\n" + "="*50)
        print(f"=== Running: {model_name} ===")
        print(f"Total params: {total_params:,} | Trainable params: {trainable_params:,}")
        print("="*50)
        
        if USE_MLFLOW:
            mlflow.log_param("total_params", total_params)
            mlflow.log_param("trainable_params", trainable_params)
            mlflow.log_params(vars(config))
            if custom_mlflow_params:
                mlflow.log_params(custom_mlflow_params)

        # 2. Train the model
        best_acc, history = train_model(
            model_name=model_name,
            model=model,
            train_loader=train_loader,
            val_loader=val_loader,
            criterion=criterion,
            optimizer=optimizer,
            scheduler=scheduler,
            mixup=mixup_fn,
            device=device,
            config=config,
            use_mlflow=USE_MLFLOW,
        )

        # 3. Load best weights and Evaluate
        model.load_state_dict(torch.load(config.save_path, map_location=device))
        model.to(device)

        test_loss, test_acc, test_prec, test_recall, test_f1 = evaluate(
            model, test_loader, criterion, device
        )

        print(f"[{model_name}] Test Loss: {test_loss:.3f} | Test Acc: {test_acc:.3f}")

        # 4. Log to MLflow
        if USE_MLFLOW:
            mlflow.log_metrics({
                "test_loss": test_loss, "test_accuracy": test_acc,
                "test_precision": test_prec, "test_recall": test_recall, "test_f1": test_f1,
            })
            mlflow.pytorch.log_model(model, artifact_path="model")

    # --- Local Saving and Plotting ---
    global results, all_histories 
    results = [r for r in results if r["Model"] != model_name]
    results.append({
        "Model": model_name, "Test Loss": test_loss, "Test Accuracy": test_acc,
        "Test Precision": test_prec, "Test Recall": test_recall, "Test F1": test_f1,
    })
    all_histories[model_name] = history

    pd.DataFrame(results).round(4).to_csv(ABLATION_CSV_PATH, index=False)
    with open(ABLATION_JSON_PATH, "w") as f:
        json.dump(all_histories, f, indent=4)

    plot_accuracy(history)
    plot_loss(history)
    plot_learning_rate(history)
    
    return test_acc # Returning this so we can compare 2A and 2B later

In [None]:
# ==============================================================================
# Part 2A: Fine-tune ResNet-18 (Same hyperparameters as Part 1)
# ==============================================================================

# Setup Model
resnet18_2a = models.resnet18(weights=ResNet18_Weights.IMAGENET1K_V1)
resnet18_2a.fc = nn.Linear(resnet18_2a.fc.in_features, train_dataset.get_num_classes())
resnet18_2a = resnet18_2a.to(device)

# Setup Components
config_2a = TrainConfig(use_scheduler=True, use_label_smoothing=True, use_mixup=True, save_path="best_resnet18_2a.pth")
crit_2a, opt_2a, sched_2a, mix_2a = build_training_components(resnet18_2a, train_loader, train_dataset, config_2a)

# Run
test_acc_2a = run_finetuning_stage("ResNet-18 Fine-tuned (Part 2A)", resnet18_2a, config_2a, crit_2a, opt_2a, sched_2a, mix_2a)

## Part 2B: Fine-tune ResNet-18 with optimized hyperparameters

**Changes from Part 2A:**
A two stage fine-tuning strategy was used, first training only the classifier head and then fine-tuning the full network. This two stage approach involves:

- **Different classifier layer**: in the classifier, dropout was applied with a $p=0.4$ 
- **Different learning rate behaviour and scheduler**: Cosine LR scheduler was applied, with a smaller lr while unfreezing the backbone ($lr=1e-3$ when fine tuning only the head, $lr=1e-5$ when fine tuning all layers)
- **Smaller Label Smoothing**: Label smoothing of 0.05 was used
- **Freeze backbone initially**: Train only the classifier first, then unfreeze the backbone and retrain the whole network

**Justification:**
 - Using a two stage fine-tuning strategy follows standard transfer learning practice: pretrained CNN backbones learn general visual features that transfer across tasks, making head-only training effective for initial adaptation (*"How transferable are features in deep neural networks?", Yosinski et al., 2014*; PyTorch Transfer Learning Tutorial).
 - After initial convergence, unfreezing the entire network with a lower learning rate allows task-specific refinement while preserving useful pretrained representations (*"Hands-on Transfer Learning with Python: Implement advanced deep learning and neural network models using TensorFlow and Keras", Sarkar, Bali, 2018*).
 - Techniques such as weight decay, cosine LR scheduling and label smoothing shown to improve generalization in modern CNNs (*"Bag of Tricks for Image Classification with Convolutional Neural Networks", He et al., 2020*).
 - The pretrained features are already well-optimized for image classification; aggressive updates can degrade them

In [None]:
# ==============================================================================
# Part 2B - Stage 1: Fine-tune only the head (Optimized HPs)
# ==============================================================================

# Setup Model
resnet18_2b = models.resnet18(weights=ResNet18_Weights.IMAGENET1K_V1)
resnet18_2b.fc = nn.Sequential(
    nn.Dropout(p=0.4, inplace=False),
    nn.Linear(in_features=512, out_features=train_dataset.get_num_classes(), bias=True),
)
resnet18_2b = resnet18_2b.to(device)

# Freeze base, unfreeze head
resnet18_2b.requires_grad_(False)
resnet18_2b.fc.requires_grad_(True)

# Setup Components
config_2b_head = TrainConfig(use_scheduler=True, use_label_smoothing=True, use_mixup=False, save_path="best_resnet18_2b_head.pth")
config_2b_head.num_epochs = 100

crit_2b = nn.CrossEntropyLoss(label_smoothing=0.05)
opt_fc = torch.optim.AdamW(resnet18_2b.fc.parameters(), lr=1e-3, weight_decay=1e-3)
sched_fc = torch.optim.lr_scheduler.CosineAnnealingLR(opt_fc, T_max=100)

# Run
run_finetuning_stage(
    "ResNet-18 FT Head (Part 2B)", resnet18_2b, config_2b_head, crit_2b, opt_fc, sched_fc, NoMixUp(), 
    custom_mlflow_params={"stage": "head_only"}
)

In [None]:
# ==============================================================================
# Part 2B - Stage 2: Fine-tune all layers (Optimized HPs)
# ==============================================================================

# Unfreeze all layers
resnet18_2b.requires_grad_(True)

# Setup Components
config_2b_full = TrainConfig(use_scheduler=True, use_label_smoothing=True, use_mixup=False, save_path="best_resnet18_2b_full.pth")
config_2b_full.num_epochs = 100

opt_full = torch.optim.AdamW(resnet18_2b.parameters(), lr=1e-5, weight_decay=1e-3)
sched_full = torch.optim.lr_scheduler.CosineAnnealingLR(opt_full, T_max=100)

# Run
test_acc_2b2 = run_finetuning_stage(
    "ResNet-18 FT Full (Part 2B)", resnet18_2b, config_2b_full, crit_2b, opt_full, sched_full, NoMixUp(),
    custom_mlflow_params={"stage": "full_network"}
)

In [None]:
# ==============================================================================
# Summary comparison: Part 2A vs Part 2B
# ==============================================================================
print("\n" + "="*50)
print("Part 2 Summary: ResNet-18 Fine-tuning Results")
print("="*50)
print(f"Part 2A (same hyperparameters as Part 1):")
print(f"  Test Accuracy: {test_acc_2a:.3f}")
print(f"\nPart 2B (optimized hyperparameters for transfer learning):")
print(f"  Test Accuracy: {test_acc_2b2:.3f}")
print(f"\nImprovement: {(test_acc_2b2 - test_acc_2a) * 100:.2f} percentage points")