<a href="https://colab.research.google.com/github/laurefindele-o-catto/ML-Projects/blob/main/CIFAR-10/Model%20using%20resnet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import torch
import torchvision as tv
import torchvision.transforms as T
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import torch.nn.functional as F
import numpy as np
import PIL
from PIL import Image
from pathlib import Path
import seaborn as sns
import sys
import math
import time
import random

In [4]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print("Using device:", device)
print("GPU name:", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "No GPU")

Using device: cpu
GPU name: No GPU


**Helper Functions**

In [3]:
def mixup_data(x, y, alpha = 0.1):
  if alpha <= 0:
    return x, y, y, 1.0

  lam = np.random.beta(alpha, alpha)
  batch_size = x.size(0)
  index = torch.randperm(batch_size).to(x.device)
  mixed_x = lam*x + (1-lam) * x[index, :]
  y_a, y_b = y, y[index]

  return mixed_x, y_a, y_b, lam

def cutmix_data(x, y, alpha = 1.0):
  if alpha <= 0:
    return x, y, y, 1.0

  lam = np.random.beta(alpha, alpha)
  batch_size, _, H, W = x.size()
  index = torch.randperm(batch_size).to(x.device)

  cut_rat = np.sqrt(1. - lam)
  cut_w, cut_h = int(W * cut_rat), int(H * cut_rat)
  cx, cy = np.random.randint(W), np.random.randint(H)
  x1, x2 = np.clip(cx - cut_w // 2, 0, W), np.clip(cx + cut_w // 2, 0, W)
  y1, y2 = np.clip(cy - cut_h // 2, 0, H), np.clip(cy + cut_h // 2, 0, H)

  x[:, :, y1:y2, x1:x2] = x[index, :, y1:y2, x1:x2]

  lam = 1 - ((x2 - x1) * (y2-y1) / (W*H) )
  y_a, y_b = y, y[index]

  return x, y_a, y_b, lam


def mixup_cutmix_criterion(criterion, pred, y_a, y_b, lam):
  return lam * criterion(pred, y_a) + (1 - lam)*criterion(pred, y_b)

**Model Using Resnet**

ResNet Layers
- BasicBlock: Each block has two 3×3 convolutions with BatchNorm + ReLU, plus a skip connection that adds the input back to the output.
- Skip connections: Solve the vanishing gradient problem by letting gradients flow directly backward.
- ResNet18 structure:
- Conv1: 3×3 conv (CIFAR version) → 64 channels
- Stage 1: 2 blocks, 64 channels
- Stage 2: 2 blocks, 128 channels (downsample)
- Stage 3: 2 blocks, 256 channels (downsample)
- Stage 4: 2 blocks, 512 channels (downsample)
- Global Average Pooling → Fully Connected layer (10 classes for CIFAR‑10).

Early layers learn edges/textures, middle layers learn parts (wheels, eyes), deeper layers learn object semantics.

In [4]:
class BasicBlock(nn.Module):
  expansion = 1
  def __init__(self, in_ch, out_ch, stride = 1):
    super().__init__()
    self.conv1 = nn.Conv2d(in_ch, out_ch, kernel_size=3, stride = stride, padding = 1, bias  = False)
    self.bn1 = nn.BatchNorm2d(out_ch)
    self.conv2 = nn.Conv2d(out_ch, out_ch, kernel_size = 3, stride = 1, padding = 1, bias = False)
    self.bn2 = nn.BatchNorm2d(out_ch)

    self.downsample = None
    if stride != 1 or in_ch != out_ch:
      self.downsample = nn.Sequential(
          nn.Conv2d(in_ch, out_ch, kernel_size = 1, stride = stride, bias = False),
          nn.BatchNorm2d(out_ch)
      )

  def forward(self, x):
    identity = x  #save input for skip connection

    out = F.relu(self.bn1(self.conv1(x)), inplace = True)
    out = self.bn2(self.conv2(out))

    if self.downsample is not None:
      identity = self.downsample(x)

    out = F.relu(out + identity, inplace = True)  #residual connection

    return out

In [5]:
 class ResNet_CIFAR(nn.Module):
    def __init__(self, block=BasicBlock, layers=(2,2,2,2), num_classes=10):
        super().__init__()
        # CIFAR stem: 3x3 conv, stride 1, no maxpool
        self.in_ch = 64
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1   = nn.BatchNorm2d(64)
        self.layer1 = self._make_layer(block, 64,  layers[0], stride=1)
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)
        self.avgpool = nn.AdaptiveAvgPool2d((1,1))
        self.fc = nn.Linear(512*block.expansion, num_classes)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    def _make_layer(self, block, out_ch, blocks, stride):
        layers = [block(self.in_ch, out_ch, stride)]
        self.in_ch = out_ch * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.in_ch, out_ch, stride=1))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)), inplace=True)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        return self.fc(x)

def resnet18_cifar(num_classes=10):
    return ResNet_CIFAR(BasicBlock, (2,2,2,2), num_classes)

In [6]:
CIFAR10_MEAN = (0.4914, 0.4822, 0.4456)
CIFAR10_STD = (0.2023, 0.1994, 0.2010)

transform_train = T.Compose([
    T.RandomCrop(32, padding = 4),
    T.RandomHorizontalFlip(),
    T.ToTensor(),
    T.Normalize(CIFAR10_MEAN, CIFAR10_STD)
])

transform_test = T.Compose([
    T.ToTensor(),
    T.Normalize(CIFAR10_MEAN, CIFAR10_STD)
])

trainset = tv.datasets.CIFAR10(root='./data', train = True, download = True, transform = transform_train)
testset = tv.datasets.CIFAR10(root = './data', train = False, download = True, transform = transform_test)

train_loader = torch.utils.data.DataLoader(trainset, batch_size = 128, shuffle = True, num_workers = 2, pin_memory = True)
test_loader = torch.utils.data.DataLoader(testset, batch_size = 256, shuffle = False, num_workers = 2, pin_memory = True)

100%|██████████| 170M/170M [00:03<00:00, 46.6MB/s]


**Training Setup**

In [None]:
def accuracy(logits, targets):
  return (logits.argmax(1) == targets).float().mean().item() * 100.0

def train_resnet(model, train_loader, test_loader, epochs = 200, base_lr = 0.1, weight_decay = 5e-4, label_smoothing = 0.1, device = device, save_dir="checkpoints"):
  model = model.to(device)

  criterion = nn.CrossEntropyLoss(label_smoothing=label_smoothing)
  optimizer = torch.optim.SGD(model.parameters(), lr = base_lr, momentum=0.9, weight_decay = weight_decay, nesterov = True)
  scheduler = torch.optimc.lr_scheduler.CosineAnnealingLR(optimizer, T_max = epochs, eta_min = base_lr*1e-2)

  train_hist, test_hist = [], []

  for epoch in range(1, epochs+1):
    model.train()
    total, correct, running = 0, 0, 0.0
    for x, y in train_loader:
      x, y = x.to(device), y.to(device)
      optimizer.zero_grad()
      logits = model(x)
      loss = criterion(logits, y)
      loss.backward()
      optimizer.step()

      running += loss.item() * x.size(0)
      total += x.size(0)
      correct += (logits.argmax(1) == y).sum().item()

    train_loss = running/total
    train_acc = 100.0 * correct/total

    model.eval()
    total, correct, runnin = 0, 0, 0.0

    with torch.no_grad():
      for x, y in test_loader:
        x, y = x.to(device), y.to(device)
        logits = model(x)
        loss = criterion(logits, y)
        running += loss.item() * x.size(0)
        total += x.size(0)
        correct += (logits.argmax(1) == y).sum().item()

    test_loss = running/total
    test_acc = 100.0*correct/total

    scheduler.step()
    train_hist.append(train_acc)
    test_hist.append(test_acc)
    print(f"Epoch [{epoch:3d}/{epochs}] LR {scheduler.get_last_lr()[0]:.5f} | Train Acc: {train_acc:.2f}% | Test Acc: {test_acc:.2f}%")

  return train_hist, test_hist

In [7]:
import os

def train_resnet_adv(model, train_loader, test_loader, epochs=200, base_lr=0.1,
                     weight_decay=5e-4, label_smoothing=0.1,
                     use_mixup=False, use_cutmix=False, alpha=1.0, device=None, save_dir = "checkpoints"):
    device = device or ("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)

    criterion = nn.CrossEntropyLoss(label_smoothing=label_smoothing)
    optimizer = torch.optim.SGD(model.parameters(), lr=base_lr, momentum=0.9,
                                weight_decay=weight_decay, nesterov=True)

    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max = epochs, eta_min = base_lr*1e-2)

    os.makedirs(save_dir, exist_ok=True)
    train_hist, test_hist = [], []
    best_acc = 0.0   # track best test accuracy

    for epoch in range(1, epochs+1):
        model.train()
        total, correct = 0, 0

        mix_count, cut_count, none_count = 0, 0, 0

        if epoch < int(0.8*epochs):   #first 80% epochs
          p_mix, p_cut, p_none = 0.4, 0.4, 0.2
          alpha_now = alpha
        else:
          p_mix, p_cut, p_none = 0.15, 0.15, 0.70
          alpha_now = 0.2


        for x, y in train_loader:
            x, y = x.to(device), y.to(device)

            r = random.random()

            # Apply MixUp or CutMix
            if r < p_mix:
                x, y_a, y_b, lam = mixup_data(x, y, alpha_now)
                mix_count += 1
            elif r < p_mix + p_cut:
                x, y_a, y_b, lam = cutmix_data(x, y, alpha_now)
                cut_count += 1
            else:
                y_a, y_b, lam = y, y, 1.0
                none_count += 1

            optimizer.zero_grad()
            logits = model(x)
            loss = mixup_cutmix_criterion(criterion, logits, y_a, y_b, lam)
            loss.backward()
            optimizer.step()

            total += y.size(0)
            correct += (logits.argmax(1) == y).sum().item()


        scheduler.step()
        train_acc = 100.0 * correct / total

        # Evaluate
        model.eval()
        total, correct = 0, 0
        with torch.no_grad():
            for x, y in test_loader:
                x, y = x.to(device), y.to(device)
                logits = model(x)
                total += y.size(0)
                correct += (logits.argmax(1) == y).sum().item()
        test_acc = 100.0 * correct / total

        train_hist.append(train_acc)
        test_hist.append(test_acc)
        print(f"Epoch [{epoch:3d}/{epochs}] "
              f"LR {scheduler.get_last_lr()[0]:.5f} | "
              f"Train Acc: {train_acc:.2f}% | Test Acc: {test_acc:.2f}% | "
              f"MixUp: {mix_count}, CutMix: {cut_count}, None: {none_count}")


        # Save latest checkpoint every N epochs (resume safety)
        if epoch % 10 == 0:
            torch.save({
              "epoch": epoch,
              "model_state": model.state_dict(),
              "optimizer_state": optimizer.state_dict(),
              "scheduler_state": scheduler.state_dict(),
              "train_hist": train_hist,
              "test_hist": test_hist,
              "best_acc": best_acc,
            }, os.path.join(save_dir, f"checkpoint_epoch{epoch}.pth"))
        print(f"💾 Saved checkpoint at epoch {epoch}")


        if test_acc > best_acc:
            best_acc = test_acc
            torch.save({
                "epoch": epoch,
                "model_state": model.state_dict(),
                "optimizer_state": optimizer.state_dict(),
                "scheduler_state": scheduler.state_dict(),
                "train_hist": train_hist,
                "test_hist": test_hist,
                "best_acc": best_acc,
            }, os.path.join(save_dir, "best_model.pth"))

            print(f"🌟 New best model saved at epoch {epoch} with acc {best_acc:.2f}%")

    return train_hist, test_hist

In [None]:
model = resnet18_cifar()
# Randomize CutMix and MixUp
train_hist, test_hist = train_resnet_adv(model, train_loader, test_loader,
                                         epochs=200, base_lr=0.1,
                                         use_mixup=True, use_cutmix=True, alpha=1.0)

Epoch [  1/200] LR 0.09999 | Train Acc: 18.84% | Test Acc: 35.97% | MixUp: 166, CutMix: 149, None: 76
💾 Saved checkpoint at epoch 1
🌟 New best model saved at epoch 1 with acc 35.97%
Epoch [  2/200] LR 0.09998 | Train Acc: 30.12% | Test Acc: 45.79% | MixUp: 148, CutMix: 157, None: 86
💾 Saved checkpoint at epoch 2
🌟 New best model saved at epoch 2 with acc 45.79%
Epoch [  3/200] LR 0.09995 | Train Acc: 35.93% | Test Acc: 51.59% | MixUp: 139, CutMix: 161, None: 91
💾 Saved checkpoint at epoch 3
🌟 New best model saved at epoch 3 with acc 51.59%
Epoch [  4/200] LR 0.09990 | Train Acc: 39.21% | Test Acc: 60.36% | MixUp: 163, CutMix: 163, None: 65
💾 Saved checkpoint at epoch 4
🌟 New best model saved at epoch 4 with acc 60.36%
Epoch [  5/200] LR 0.09985 | Train Acc: 44.36% | Test Acc: 59.33% | MixUp: 164, CutMix: 153, None: 74
💾 Saved checkpoint at epoch 5
Epoch [  6/200] LR 0.09978 | Train Acc: 47.48% | Test Acc: 68.57% | MixUp: 160, CutMix: 157, None: 74
💾 Saved checkpoint at epoch 6
🌟 New be

**Confusion Matrix & Misclassifications**

**Discussion & Analysis**

We used:

 *Loss Function*
 - CrossEntropyLoss with Label Smoothing:
- Standard CE compares predicted logits vs. one‑hot labels.
- Label smoothing (e.g., 0.1) softens the target distribution: instead of [0,0,1,0,...], the true class gets 0.9 and others share 0.1.
- Benefits: prevents overconfidence, improves calibration, helps generalization.


*Optimizer*
- SGD with Momentum (0.9) + Nesterov:
- SGD updates weights in the direction of the gradient.
- Momentum accumulates past gradients → smoother, faster convergence.
- Nesterov momentum looks ahead, correcting overshoot.


*Learning Rate Scheduler*
- CosineAnnealingLR:
- Starts at base LR (0.1).
- Decays smoothly following a cosine curve toward eta_min (0.001).
- Prevents sudden drops, encourages better minima.


*Weight Decay*
- L2 regularization: adds a penalty proportional to the square of weights.
- Prevents weights from growing too large, reduces overfitting.
- Standard value 5e‑4.



*CutMix and MixUp*
- MixUp: blends two images and their labels linearly. Encourages smooth decision boundaries.
- CutMix: replaces a patch of one image with another, labels mixed by patch area. Preserves natural textures.
- MixUp teaches interpolation, CutMix teaches occlusion robustness. Alternating gives the model both benefits.
- Tapering: Strong mixing early (0.4, 0.4, 0.2), weaker late (0.15, 0.15, 0.70) so the model sharpens on real labels.
