In [36]:
import os, sys, time, random
import torch, torchvision
import numpy as np

In [37]:
# check if a CUDA-capable GPU is available
use_cuda = torch.cuda.is_available()

# choose the compute device based on CUDA availability
device = torch.device("cuda" if use_cuda else "cpu")

# print whether CUDA is available
print("CUDA available:", use_cuda)

# if CUDA is available, print GPU details
if use_cuda:
    print("GPU:", torch.cuda.get_device_name(0))
    props = torch.cuda.get_device_properties(0)
    print("Compute capability:", f"{props.major}.{props.minor}")
    # print total GPU memory in gigabytes rounded to two decimals
    print("VRAM GB:", round(props.total_memory/1024**3, 2))

SEED = 42
random.seed(SEED)
np.random.seed(SEED)
# seed PyTorch’s CPU random number generator
torch.manual_seed(SEED)


if use_cuda:
    torch.cuda.manual_seed_all(SEED)
torch.backends.cudnn.deterministic = True
# disable cuDNN autotuner to avoid nondeterministic algorithms
torch.backends.cudnn.benchmark = False


RUN_ID = time.strftime("%Y%m%d-%H%M%S")
BASE_DIR = os.getcwd()
#top-level outputs directory path
OUTPUT_DIR = os.path.join(BASE_DIR, "outputs")
#subfolder
RUN_DIR = os.path.join(OUTPUT_DIR, f"run_{RUN_ID}")
#subfolder to store model checkpoints
CKPT_DIR = os.path.join(RUN_DIR, "checkpoints")
# subfolder to store logs and metrics
LOG_DIR = os.path.join(RUN_DIR, "logs")

os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(RUN_DIR, exist_ok=True)
os.makedirs(CKPT_DIR, exist_ok=True)
os.makedirs(LOG_DIR, exist_ok=True)

info_path = os.path.join(RUN_DIR, "RUN_INFO.txt")

# open the run info file for writing
with open(info_path, "w") as f:
    f.write(f"RUN_ID={RUN_ID}\n")
    f.write(f"Python={sys.version}\n")
    f.write(f"PyTorch={torch.__version__}\n")
    f.write(f"TorchVision={torchvision.__version__}\n")
    f.write(f"CUDA={use_cuda}\n")
    if use_cuda:
        f.write(f"GPU={torch.cuda.get_device_name(0)}\n")
        f.write(f"ComputeCap={props.major}.{props.minor}\n")
        f.write(f"VRAM_GB={props.total_memory/1024**3:.2f}\n")

    f.write(f"SEED={SEED}\n")
    f.write(f"DEVICE={device}\n")

print("RUN_DIR:", RUN_DIR)
print("CKPT_DIR:", CKPT_DIR)
print("LOG_DIR:", LOG_DIR)

CUDA available: True
GPU: Tesla T4
Compute capability: 7.5
VRAM GB: 14.74
RUN_DIR: /content/outputs/run_20251025-040551
CKPT_DIR: /content/outputs/run_20251025-040551/checkpoints
LOG_DIR: /content/outputs/run_20251025-040551/logs


In [38]:
import torchvision.transforms as transforms

DATA_DIR = os.path.join(BASE_DIR, "data")

# batch size increased from 4 to 128 to make it faster
BATCH_SIZE = 128

NUM_WORKERS = 2

# enable pinned memory for faster host→GPU copies when on CUDA
PIN_MEMORY = True if getattr(device, "type", "cpu") == "cuda" else False

train_transform = torchvision.transforms.Compose([
    torchvision.transforms.ToTensor(),
    torchvision.transforms.Normalize((0.5, 0.5, 0.5),
                                     (0.5, 0.5, 0.5))
])

test_transform = torchvision.transforms.Compose([
    torchvision.transforms.ToTensor(),
    torchvision.transforms.Normalize((0.5, 0.5, 0.5),
                                     (0.5, 0.5, 0.5))
])

# create CIFAR-10 training dataset object (50k images)
train_dataset = torchvision.datasets.CIFAR10(
    root=DATA_DIR, train=True, download=True, transform=train_transform
)

# create CIFAR-10 test dataset object (10k images)
test_dataset = torchvision.datasets.CIFAR10(
    root=DATA_DIR, train=False, download=True, transform=test_transform
)

# build a DataLoader for the test set #batchwise
test_loader = torch.utils.data.DataLoader(
    test_dataset, batch_size=BATCH_SIZE, shuffle=False,
    num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY
)

# print dataset sizes to verify download and object creation
print("Train images:", len(train_dataset), "Test images:", len(test_dataset))

# pull one batch from the test loader for a quick shape/type check
images, labels = next(iter(test_loader))

# show tensor shapes and dtypes to confirm expectations
print("Test batch:", images.shape, images.dtype, "| labels:", labels.shape, labels.dtype)

Train images: 50000 Test images: 10000
Test batch: torch.Size([128, 3, 32, 32]) torch.float32 | labels: torch.Size([128]) torch.int64


In [39]:
from torch.utils.data import DataLoader, SubsetRandomSampler

# get total number of training examples
n_total = len(train_dataset)

# for reproducible shuffling
rng = np.random.RandomState(SEED)

# build a list of all training indices [0, 1, ..., n_total-1]
all_indices = np.arange(n_total)

# shuffle the indices deterministically using the seeded RNG
rng.shuffle(all_indices)

# split the shuffled indices into three nearly-equal folds: X, Y, Z
X_idx, Y_idx, Z_idx = np.array_split(all_indices, 3)

# convert folds to Python lists
X_idx = X_idx.tolist()
Y_idx = Y_idx.tolist()
Z_idx = Z_idx.tolist()

# store folds in a dict for easy lookup by name
FOLDS = {"X": X_idx, "Y": Y_idx, "Z": Z_idx}

print("Fold sizes — X:", len(FOLDS["X"]), "Y:", len(FOLDS["Y"]), "Z:", len(FOLDS["Z"]))

# dedicated torch.Generator seeded for deterministic sampling order
loader_generator = torch.Generator()
loader_generator.manual_seed(SEED)

#helper that builds a DataLoader over a given subset of indices
def _make_loader(dataset, indices, batch_size, num_workers, pin_memory):
    sampler = SubsetRandomSampler(indices, generator=loader_generator)
    return DataLoader(dataset, batch_size=batch_size, sampler=sampler,
                      num_workers=num_workers, pin_memory=pin_memory, drop_last=False)

#function that returns (train_loader, val_loader) for a chosen validation fold
def get_fold_loaders(val_fold: str, batch_size=BATCH_SIZE):
    assert val_fold in FOLDS, f"val_fold must be one of {list(FOLDS.keys())}"
    val_idx = FOLDS[val_fold]

    train_idx = []
    # iterate over all fold names and add those that are not the validation fold
    for name, idxs in FOLDS.items():
        if name != val_fold:
            train_idx.extend(idxs)
    # build the training DataLoader using the combined indices
    train_loader = _make_loader(train_dataset, train_idx, batch_size, NUM_WORKERS, PIN_MEMORY)
    val_loader = _make_loader(train_dataset, val_idx, batch_size, NUM_WORKERS, PIN_MEMORY)

    # short summary of subset sizes for this fold
    print(f"[Fold with val={val_fold}] train samples: {len(train_idx)} | val samples: {len(val_idx)}")
    # return both loaders to the caller
    return train_loader, val_loader

# optionally instantiate loaders for Fold-1 (val = X) to verify everything works
train_loader_F1, val_loader_F1 = get_fold_loaders("X", batch_size=BATCH_SIZE)


Fold sizes — X: 16667 Y: 16667 Z: 16666
[Fold with val=X] train samples: 33333 | val samples: 16667


In [40]:
#two architectures: Arch-A (Plain CNN) and Arch-B (ResNet-style)

import torch.nn as nn
import torch

#Arch-A: plain CNN for CIFAR-10 classification
class BasicCNN(nn.Module):
    # initialize layers for the plain CNN
    def __init__(self, num_classes: int = 10):
        super().__init__()
        # first conv block: 3->32 channels, 3x3 kernel with padding=1 keeps H,W the same
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding=1)
        # batch normalization after first conv
        self.bn1 = nn.BatchNorm2d(32)
        # nonlinearity
        self.relu1 = nn.ReLU(inplace=True)
        # downsampling 2x via max pooling
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)

        # second conv block: 32->64 channels, again 3x3 with padding=1
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
        # batch normalization after second conv
        self.bn2 = nn.BatchNorm2d(64)
        # nonlinearity
        self.relu2 = nn.ReLU(inplace=True)
        # downsampling 2x via max pooling
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)

        # dropout to regularize the fully-connected part
        self.drop = nn.Dropout(p=0.5)
        # fully-connected hidden layer: 64 * 8 * 8 features -> 256 units
        self.fc1 = nn.Linear(64 * 8 * 8, 256)
        # nonlinearity
        self.relu3 = nn.ReLU(inplace=True)
        # final classification layer to 10 classes
        self.fc2 = nn.Linear(256, num_classes)

    def forward(self, x):
        # apply first conv block: conv -> bn -> relu -> pool
        x = self.pool1(self.relu1(self.bn1(self.conv1(x))))
        # apply second conv block: conv -> bn -> relu -> pool
        x = self.pool2(self.relu2(self.bn2(self.conv2(x))))
        # flatten feature map to (batch, features)
        x = x.view(x.size(0), -1)
        # fully-connected hidden with dropout and relu
        x = self.relu3(self.fc1(self.drop(x)))
        # final logits
        x = self.fc2(x)
        return x

    #Arch-B: ResNet-style with residual blocks (lightweight for CIFAR-10)
class BasicBlock(nn.Module):
    expansion = 1

    # initialize residual block with in/out channels and stride
    def __init__(self, in_planes, planes, stride=1, downsample=None):
        super().__init__()
        # first 3x3 conv (may stride for downsampling)
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        # batch norm after first conv
        self.bn1 = nn.BatchNorm2d(planes)
        # relu nonlinearity
        self.relu = nn.ReLU(inplace=True)
        # second 3x3 conv keeps spatial size (stride=1)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
        # batch norm after second conv
        self.bn2 = nn.BatchNorm2d(planes)
        #downsample to match identity shape when channels/stride change
        self.downsample = downsample

    def forward(self, x):
        # save identity for the skip connection
        identity = x
        # conv1 -> bn1 -> relu path
        out = self.relu(self.bn1(self.conv1(x)))
        # conv2 -> bn2 path
        out = self.bn2(self.conv2(out))
        # apply downsample to identity if provided
        if self.downsample is not None:
            identity = self.downsample(x)
        # add skip connection
        out += identity
        # final relu
        out = self.relu(out)
        # return block output
        return out

class ResNetLite(nn.Module):
    # initialize the ResNet-like network
    def __init__(self, block=BasicBlock, layers=(2, 2, 2), num_classes: int = 10, base_width: int = 64):
        super().__init__()
        # initial conv keeps 32x32 size (3x3, stride=1, pad=1)
        self.in_planes = base_width
        # first conv maps RGB to base_width channels
        self.conv1 = nn.Conv2d(3, base_width, kernel_size=3, stride=1, padding=1, bias=False)
        # batch norm after first conv
        self.bn1 = nn.BatchNorm2d(base_width)
        # nonlinearity
        self.relu = nn.ReLU(inplace=True)

        # layer1: keep spatial size, channels = base_width
        self.layer1 = self._make_layer(block, planes=base_width, blocks=layers[0], stride=1)
        # layer2: downsample 32->16, channels = 2*base_width
        self.layer2 = self._make_layer(block, planes=base_width * 2, blocks=layers[1], stride=2)
        # layer3: downsample 16->8, channels = 4*base_width
        self.layer3 = self._make_layer(block, planes=base_width * 4, blocks=layers[2], stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        # final linear layer from channels to classes
        self.fc = nn.Linear(base_width * 4 * block.expansion, num_classes)

    # helper to build a stack of residual blocks
    def _make_layer(self, block, planes, blocks, stride):
        downsample = None
        if stride != 1 or self.in_planes != planes * block.expansion:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_planes, planes * block.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(planes * block.expansion),
            )
        layers = [block(self.in_planes, planes, stride=stride, downsample=downsample)]
        # update current number of input planes after first block
        self.in_planes = planes * block.expansion
        # add remaining blocks (stride=1)
        for _ in range(1, blocks):
            layers.append(block(self.in_planes, planes, stride=1))
        return nn.Sequential(*layers)

    def forward(self, x):
        # initial conv -> bn -> relu
        x = self.relu(self.bn1(self.conv1(x)))
        # pass through three residual layers
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        # global average pool to (batch, channels, 1, 1)
        x = self.avgpool(x)
        # flatten to (batch, channels)
        x = x.view(x.size(0), -1)
        # linear classifier to logits
        x = self.fc(x)
        # return logits tensor of shape [batch, num_classes]
        return x

#  helper to count parameters in a model
def count_parameters(model: nn.Module) -> int:
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

model_A = BasicCNN(num_classes=10).to(device)
model_B = ResNetLite(num_classes=10, base_width=64, layers=(2, 2, 2)).to(device)

# print number of parameters for both models
print("Arch-A params:", count_parameters(model_A))
print("Arch-B params:", count_parameters(model_B))

# forward pass with a fake batch to verify output shapes
with torch.no_grad():
    dummy = torch.randn(2, 3, 32, 32, device=device)
    # forward through Arch-A
    out_A = model_A(dummy)
    # forward through Arch-B
    out_B = model_B(dummy)
    print("Arch-A output:", out_A.shape)
    print("Arch-B output:", out_B.shape)


Arch-A params: 1070986
Arch-B params: 2777674
Arch-A output: torch.Size([2, 10])
Arch-B output: torch.Size([2, 10])


In [41]:
import time
import os
import torch
import torch.nn as nn

#cross-entropy loss for multi-class classification
criterion = nn.CrossEntropyLoss()

# helper to build an SGD optimizer (you can switch to Adam later if needed)
def make_optimizer(model, lr=0.01, momentum=0.9, weight_decay=5e-4):
    return torch.optim.SGD(model.parameters(), lr=lr, momentum=momentum, weight_decay=weight_decay)

# single training epoch (one full pass over train loader)
def train_one_epoch(model, loader, optimizer, device):
    model.train()
    running_loss, running_correct, running_total = 0.0, 0, 0
    start_time = time.time()
    # iterate over mini-batches from the training loader
    for images, labels in loader:
        images = images.to(device)
        labels = labels.to(device)
        optimizer.zero_grad(set_to_none=True)
        # forward pass to obtain logits
        logits = model(images)
        # compute cross-entropy loss between logits and labels
        loss = criterion(logits, labels)
        # backpropagate to compute gradients
        loss.backward()
        # update model parameters using the optimizer
        optimizer.step()
        # accumulate running loss scaled by batch size
        running_loss += loss.item() * labels.size(0)
        # compute batch accuracy and add to running correct count
        running_correct += (logits.argmax(1) == labels).sum().item()
        # add batch size to running total
        running_total += labels.size(0)
    # compute average loss over all samples in the epoch
    avg_loss = running_loss / max(1, running_total)
    # compute accuracy over the whole epoch
    avg_acc = running_correct / max(1, running_total)
    # measure elapsed time for the epoch
    elapsed = time.time() - start_time
    # return average loss, average accuracy, and epoch time (seconds)
    return avg_loss, avg_acc, elapsed

# evaluation loop (no gradients) for val/test
def evaluate(model, loader, device):
    # set model to evaluation mode (disables dropout, uses BN running stats)
    model.eval()
    # initialize running totals
    running_loss, running_correct, running_total = 0.0, 0, 0
    # no gradient tracking during evaluation
    with torch.no_grad():
        for images, labels in loader:
            # move images and labels to device
            images = images.to(device)
            labels = labels.to(device)
            logits = model(images)
            loss = criterion(logits, labels)
            running_loss += loss.item() * labels.size(0)
            running_correct += (logits.argmax(1) == labels).sum().item()
            running_total += labels.size(0)
    # compute average loss
    avg_loss = running_loss / max(1, running_total)
    # compute accuracy
    avg_acc = running_correct / max(1, running_total)
    # return average loss and accuracy
    return avg_loss, avg_acc

# helper to save a checkpoint to disk
def save_checkpoint(path, model, optimizer, epoch, val_acc, meta: dict):
    # build a dictionary that includes model/optimizer state and metadata
    state = {
        "epoch": epoch,
        "val_acc": val_acc,
        "model_state": model.state_dict(),
        "optimizer_state": optimizer.state_dict(),
        "meta": meta,
    }
    # create parent directory if it does not exist
    os.makedirs(os.path.dirname(path), exist_ok=True)
    # save the checkpoint dictionary using torch.save
    torch.save(state, path)

# trains for multiple epochs on a chosen fold with checkpointing
def train_for_fold(model, arch_name: str, val_fold: str, num_epochs: int = 10, lr: float = 0.01):
    # get train and val loaders for the requested validation fold (X, Y, or Z)
    train_loader, val_loader = get_fold_loaders(val_fold, batch_size=BATCH_SIZE)

    optimizer = make_optimizer(model, lr=lr, momentum=0.9, weight_decay=5e-4)

    best_val_acc = -1.0
    # build file paths for best and last checkpoints for this (arch, fold)
    best_ckpt = os.path.join(CKPT_DIR, f"{arch_name}_val{val_fold}_best-val.pth")
    last_ckpt = os.path.join(CKPT_DIR, f"{arch_name}_val{val_fold}_last.pth")
    # iterate over epochs
    for epoch in range(1, num_epochs + 1):
        tr_loss, tr_acc, tr_time = train_one_epoch(model, train_loader, optimizer, device)
        va_loss, va_acc = evaluate(model, val_loader, device)
        print(f"[{arch_name} | val={val_fold}] epoch {epoch:03d} | "
              f"train: loss {tr_loss:.4f} acc {tr_acc:.4f} | "
              f"val: loss {va_loss:.4f} acc {va_acc:.4f} | "
              f"time {tr_time:.1f}s")
        meta = {"arch": arch_name, "val_fold": val_fold, "run_id": RUN_ID}
        save_checkpoint(last_ckpt, model, optimizer, epoch, va_acc, meta)
        if va_acc > best_val_acc:
            # update the best validation accuracy tracker
            best_val_acc = va_acc
            # save the best-val checkpoint to disk
            save_checkpoint(best_ckpt, model, optimizer, epoch, va_acc, meta)
    return best_val_acc


In [42]:
import csv
import os
import torch
import time

#CSV logger that writes a header once and appends rows
class CSVLogger:
    # initialize with a file path and the field names (header columns)
    def __init__(self, path: str, fieldnames: list[str]):
        self.path = path
        self.fieldnames = fieldnames
        os.makedirs(os.path.dirname(path), exist_ok=True)
        if not os.path.exists(self.path):
            with open(self.path, mode="w", newline="") as f:
                writer = csv.DictWriter(f, fieldnames=self.fieldnames)
                # write the header row
                writer.writeheader()

    # append a single row (dict matching fieldnames) to the CSV
    def log(self, row: dict):
        with open(self.path, mode="a", newline="") as f:
            writer = csv.DictWriter(f, fieldnames=self.fieldnames)
            writer.writerow(row)

# function that trains one fold and logs per-epoch metrics to CSV
def train_for_fold_logged(model, arch_name: str, val_fold: str, num_epochs: int = 10, lr: float = 0.01):
    train_loader, val_loader = get_fold_loaders(val_fold, batch_size=BATCH_SIZE)
    # create an SGD optimizer for this model
    optimizer = make_optimizer(model, lr=lr, momentum=0.9, weight_decay=5e-4)
    # prepare checkpoint file paths for this architecture and fold
    best_ckpt = os.path.join(CKPT_DIR, f"{arch_name}_val{val_fold}_best-val.pth")
    # prepare the 'last' checkpoint file path
    last_ckpt = os.path.join(CKPT_DIR, f"{arch_name}_val{val_fold}_last.pth")
    # set up a CSV file to store metrics for this (arch, fold)
    log_csv = os.path.join(LOG_DIR, f"{arch_name}_val{val_fold}_metrics.csv")
    # create a CSVLogger with the desired columns
    logger = CSVLogger(log_csv, fieldnames=["epoch", "train_loss", "train_acc", "val_loss", "val_acc", "epoch_time_sec"])
    # track the best validation accuracy and the epoch when it occurred
    best_val_acc, best_epoch = -1.0, -1
    # compute total trainable parameters for reporting
    param_count = count_parameters(model)
    # iterate over the requested number of epochs
    for epoch in range(1, num_epochs + 1):
        tr_loss, tr_acc, tr_time = train_one_epoch(model, train_loader, optimizer, device)
        va_loss, va_acc = evaluate(model, val_loader, device)
        # print a compact progress line for tracking
        print(f"[{arch_name} | val={val_fold}] epoch {epoch:03d} | "
              f"train: loss {tr_loss:.4f} acc {tr_acc:.4f} | "
              f"val: loss {va_loss:.4f} acc {va_acc:.4f} | "
              f"time {tr_time:.1f}s")
        # log the epoch metrics to CSV
        logger.log({"epoch": epoch, "train_loss": f"{tr_loss:.6f}", "train_acc": f"{tr_acc:.6f}",
                    "val_loss": f"{va_loss:.6f}", "val_acc": f"{va_acc:.6f}", "epoch_time_sec": f"{tr_time:.3f}"})
        # save the 'last' checkpoint snapshot for this epoch
        save_checkpoint(last_ckpt, model, optimizer, epoch, va_acc, meta={"arch": arch_name, "val_fold": val_fold, "run_id": RUN_ID})
        # if validation accuracy improved, update best and save the 'best-val' checkpoint
        if va_acc > best_val_acc:
            best_val_acc, best_epoch = va_acc, epoch
            save_checkpoint(best_ckpt, model, optimizer, epoch, va_acc, meta={"arch": arch_name, "val_fold": val_fold, "run_id": RUN_ID})
    # return a summary dict for this fold
    return {"arch": arch_name, "val_fold": val_fold, "best_val_acc": best_val_acc, "best_epoch": best_epoch,
            "best_ckpt": best_ckpt, "last_ckpt": last_ckpt, "log_csv": log_csv, "params": param_count}

# create a global results dictionary to collect fold summaries per architecture
RESULTS = {}

# helper to record a fold result into the RESULTS dictionary
def record_fold_result(summary: dict):
    # extract architecture name and fold name from the summary
    arch = summary["arch"]
    fold = summary["val_fold"]
    # create a nested dict for this architecture if not present
    if arch not in RESULTS:
        RESULTS[arch] = {}
    RESULTS[arch][fold] = summary
# helper to pick the best fold (highest val acc) for a given architecture
def pick_arch_winner(arch_name: str):
    # ensure we have results for this architecture
    assert arch_name in RESULTS and len(RESULTS[arch_name]) > 0, f"No results stored for architecture '{arch_name}'."
    best_fold, best_acc, best_summary = None, -1.0, None
    # iterate over folds and summaries for this architecture
    for fold_name, summary in RESULTS[arch_name].items():
        acc = summary["best_val_acc"]
        # if better than current best, update the trackers
        if acc > best_acc:
            best_acc = acc
            best_fold = fold_name
            best_summary = summary

    print(f"[WINNER | {arch_name}] fold={best_fold} | best_val_acc={best_acc:.4f} | ckpt={best_summary['best_ckpt']}")
    return best_summary


In [46]:
# ==== RESET FOR STEP-7 (plain, fair, reproducible) ====

# 1) Hard reset determinism (same seed, deterministic kernels)
SEED = 42
import random, numpy as np, torch, torchvision, os

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)

torch.backends.cudnn.deterministic = True   # force deterministic kernels
torch.backends.cudnn.benchmark = False      # DON'T autotune for Step-7

# 2) Rebuild PLAIN transforms and datasets (NO augmentation for Step-7)
_plain = torchvision.transforms.Compose([
    torchvision.transforms.ToTensor(),
    torchvision.transforms.Normalize((0.5,0.5,0.5), (0.5,0.5,0.5)),
])

train_dataset = torchvision.datasets.CIFAR10(
    root=DATA_DIR, train=True, download=False, transform=_plain
)
test_dataset  = torchvision.datasets.CIFAR10(
    root=DATA_DIR, train=False, download=False, transform=_plain
)

# 3) Rebuild test_loader (plain)
test_loader = torch.utils.data.DataLoader(
    test_dataset, batch_size=BATCH_SIZE, shuffle=False,
    num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY
)

# 4) Rebuild the 3 folds with the SAME seed
n_total = len(train_dataset)
rng = np.random.RandomState(SEED)
all_indices = np.arange(n_total); rng.shuffle(all_indices)
X_idx, Y_idx, Z_idx = np.array_split(all_indices, 3)
FOLDS = {"X": X_idx.tolist(), "Y": Y_idx.tolist(), "Z": Z_idx.tolist()}

# 5) Rebuild the fold loaders helper (PLAIN dataset; NO aug function)
from torch.utils.data import DataLoader, SubsetRandomSampler
loader_generator = torch.Generator(); loader_generator.manual_seed(SEED)

def _make_loader(dataset, indices, batch_size, num_workers, pin_memory):
    sampler = SubsetRandomSampler(indices, generator=loader_generator)
    return DataLoader(dataset, batch_size=batch_size, sampler=sampler,
                      num_workers=num_workers, pin_memory=pin_memory, drop_last=False)

def get_fold_loaders(val_fold: str, batch_size=BATCH_SIZE):
    assert val_fold in FOLDS, f"val_fold must be one of {list(FOLDS.keys())}"
    val_idx = FOLDS[val_fold]
    train_idx = []
    for name, idxs in FOLDS.items():
        if name != val_fold:
            train_idx.extend(idxs)
    tr = _make_loader(train_dataset, train_idx, batch_size, NUM_WORKERS, PIN_MEMORY)
    va = _make_loader(train_dataset, val_idx,   batch_size, NUM_WORKERS, PIN_MEMORY)
    print(f"[Fold with val={val_fold}] train samples: {len(train_idx)} | val samples: {len(val_idx)}")
    return tr, va

# 6) Sanity print: make sure we are PLAIN
print("H1/Step-7 train transform:", train_dataset.transform)
print("H1/Step-7 test  transform:", test_dataset.transform)


H1/Step-7 train transform: Compose(
    ToTensor()
    Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
)
H1/Step-7 test  transform: Compose(
    ToTensor()
    Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
)


In [47]:
# Fold-1 training (val = X; train = Y+Z) for both architectures

EPOCHS_FOLD1 = 10
LR_FOLD1 = 0.01

# Arch-A on Fold-1 (val = X)

# Arch-A model instance for this fold
model_A_F1 = BasicCNN(num_classes=10).to(device)
print("Arch-A (Fold-1) params:", count_parameters(model_A_F1))
# train Arch-A for fold-1 with CSV logging and checkpointing
resA_X = train_for_fold_logged(model_A_F1, arch_name="ArchA", val_fold="X",
                               num_epochs=EPOCHS_FOLD1, lr=LR_FOLD1)
# store the fold-1 summary for Arch-A in the global RESULTS dict
record_fold_result(resA_X)

# Arch-B on Fold-1 (val = X)

# Arch-B model instance for this fold
model_B_F1 = ResNetLite(num_classes=10, base_width=64, layers=(2, 2, 2)).to(device)
print("Arch-B (Fold-1) params:", count_parameters(model_B_F1))
# train Arch-B for fold-1 with CSV logging and checkpointing
resB_X = train_for_fold_logged(model_B_F1, arch_name="ArchB", val_fold="X",
                               num_epochs=EPOCHS_FOLD1, lr=LR_FOLD1)
# store the fold-1 summary for Arch-B in the global RESULTS dict
record_fold_result(resB_X)

# print to confirm best validation results and checkpoint paths
print("\nFold-1 results:")
print("Arch-A:", resA_X)
print("Arch-B:", resB_X)

Arch-A (Fold-1) params: 1070986
[Fold with val=X] train samples: 33333 | val samples: 16667
[ArchA | val=X] epoch 001 | train: loss 1.4612 acc 0.4682 | val: loss 1.2190 acc 0.5630 | time 7.0s
[ArchA | val=X] epoch 002 | train: loss 1.1404 acc 0.5895 | val: loss 1.0141 acc 0.6356 | time 7.0s
[ArchA | val=X] epoch 003 | train: loss 0.9973 acc 0.6431 | val: loss 0.9426 acc 0.6673 | time 7.7s
[ArchA | val=X] epoch 004 | train: loss 0.9084 acc 0.6762 | val: loss 0.8753 acc 0.6890 | time 8.0s
[ArchA | val=X] epoch 005 | train: loss 0.8291 acc 0.7064 | val: loss 0.8748 acc 0.6900 | time 8.0s
[ArchA | val=X] epoch 006 | train: loss 0.7781 acc 0.7229 | val: loss 0.7998 acc 0.7217 | time 7.8s
[ArchA | val=X] epoch 007 | train: loss 0.7209 acc 0.7425 | val: loss 0.8247 acc 0.7110 | time 8.0s
[ArchA | val=X] epoch 008 | train: loss 0.6694 acc 0.7633 | val: loss 0.8466 acc 0.7053 | time 8.0s
[ArchA | val=X] epoch 009 | train: loss 0.6268 acc 0.7779 | val: loss 0.7666 acc 0.7371 | time 7.6s
[ArchA |

In [48]:
#  Fold-2 training (val = Y; train = X+Z) for both architectures

EPOCHS_FOLD2 = 10
LR_FOLD2 = 0.01

# Arch-A on Fold-2 (val = Y)

#Arch-A model instance for this fold
model_A_F2 = BasicCNN(num_classes=10).to(device)
print("Arch-A (Fold-2) params:", count_parameters(model_A_F2))
resA_Y = train_for_fold_logged(model_A_F2, arch_name="ArchA", val_fold="Y",
                               num_epochs=EPOCHS_FOLD2, lr=LR_FOLD2)
# store the fold-2 summary for Arch-A in the global RESULTS dict
record_fold_result(resA_Y)

#  Arch-B on Fold-2 (val = Y)

# Arch-B model instance for this fold
model_B_F2 = ResNetLite(num_classes=10, base_width=64, layers=(2, 2, 2)).to(device)
print("Arch-B (Fold-2) params:", count_parameters(model_B_F2))
# train Arch-B for fold-2 with CSV logging and checkpointing
resB_Y = train_for_fold_logged(model_B_F2, arch_name="ArchB", val_fold="Y",
                               num_epochs=EPOCHS_FOLD2, lr=LR_FOLD2)
# store the fold-2 summary for Arch-B in the global RESULTS dict
record_fold_result(resB_Y)

# print to confirm best validation results and checkpoint paths
print("\nFold-2 results:")
print("Arch-A:", resA_Y)
print("Arch-B:", resB_Y)


Arch-A (Fold-2) params: 1070986
[Fold with val=Y] train samples: 33333 | val samples: 16667
[ArchA | val=Y] epoch 001 | train: loss 1.4605 acc 0.4697 | val: loss 1.2225 acc 0.5600 | time 8.0s
[ArchA | val=Y] epoch 002 | train: loss 1.1555 acc 0.5856 | val: loss 1.1085 acc 0.5964 | time 8.1s
[ArchA | val=Y] epoch 003 | train: loss 1.0170 acc 0.6393 | val: loss 0.9685 acc 0.6590 | time 7.8s
[ArchA | val=Y] epoch 004 | train: loss 0.9162 acc 0.6751 | val: loss 0.8840 acc 0.6903 | time 7.1s
[ArchA | val=Y] epoch 005 | train: loss 0.8401 acc 0.7031 | val: loss 0.8366 acc 0.7077 | time 7.2s
[ArchA | val=Y] epoch 006 | train: loss 0.7760 acc 0.7249 | val: loss 0.8308 acc 0.7080 | time 7.2s
[ArchA | val=Y] epoch 007 | train: loss 0.7203 acc 0.7429 | val: loss 0.8491 acc 0.7080 | time 7.4s
[ArchA | val=Y] epoch 008 | train: loss 0.6799 acc 0.7597 | val: loss 0.8026 acc 0.7194 | time 8.4s
[ArchA | val=Y] epoch 009 | train: loss 0.6311 acc 0.7783 | val: loss 0.7822 acc 0.7317 | time 8.1s
[ArchA |

In [49]:
# Fold-3 (val = Z), pick winners per architecture, test both winners, pick final winner

EPOCHS_FOLD3 = 10
LR_FOLD3 = 0.01

# Arch-A on Fold-3 (val = Z)

# Arch-A model instance for this fold
model_A_F3 = BasicCNN(num_classes=10).to(device)
print("Arch-A (Fold-3) params:", count_parameters(model_A_F3))
# train Arch-A for fold-3 with CSV logging and checkpointing
resA_Z = train_for_fold_logged(model_A_F3, arch_name="ArchA", val_fold="Z",
                               num_epochs=EPOCHS_FOLD3, lr=LR_FOLD3)
# store the fold-3 summary for Arch-A in the global RESULTS dict
record_fold_result(resA_Z)

#  Arch-B on Fold-3 (val = Z)

# Arch-B model instance for this fold
model_B_F3 = ResNetLite(num_classes=10, base_width=64, layers=(2, 2, 2)).to(device)
print("Arch-B (Fold-3) params:", count_parameters(model_B_F3))
# train Arch-B for fold-3 with CSV logging and checkpointing
resB_Z = train_for_fold_logged(model_B_F3, arch_name="ArchB", val_fold="Z",
                               num_epochs=EPOCHS_FOLD3, lr=LR_FOLD3)
# store the fold-3 summary for Arch-B in the global RESULTS dict
record_fold_result(resB_Z)

# print to confirm best validation results and checkpoint paths for Fold-3
print("\nFold-3 results:")
print("Arch-A:", resA_Z)
print("Arch-B:", resB_Z)

# Pick winners per architecture across all three folds

# choose the best fold for Arch-A based on highest validation accuracy
winA = pick_arch_winner("ArchA")
# choose the best fold for Arch-B based on highest validation accuracy
winB = pick_arch_winner("ArchB")






Arch-A (Fold-3) params: 1070986
[Fold with val=Z] train samples: 33334 | val samples: 16666
[ArchA | val=Z] epoch 001 | train: loss 1.4822 acc 0.4613 | val: loss 1.1917 acc 0.5673 | time 8.0s
[ArchA | val=Z] epoch 002 | train: loss 1.1653 acc 0.5857 | val: loss 1.0562 acc 0.6261 | time 7.9s
[ArchA | val=Z] epoch 003 | train: loss 1.0222 acc 0.6366 | val: loss 1.0435 acc 0.6255 | time 8.1s
[ArchA | val=Z] epoch 004 | train: loss 0.9298 acc 0.6701 | val: loss 0.8933 acc 0.6835 | time 7.8s
[ArchA | val=Z] epoch 005 | train: loss 0.8500 acc 0.6995 | val: loss 0.8982 acc 0.6825 | time 7.0s
[ArchA | val=Z] epoch 006 | train: loss 0.7833 acc 0.7218 | val: loss 0.8174 acc 0.7156 | time 6.8s
[ArchA | val=Z] epoch 007 | train: loss 0.7388 acc 0.7371 | val: loss 0.7750 acc 0.7299 | time 6.9s
[ArchA | val=Z] epoch 008 | train: loss 0.6850 acc 0.7585 | val: loss 0.7891 acc 0.7249 | time 7.5s
[ArchA | val=Z] epoch 009 | train: loss 0.6304 acc 0.7780 | val: loss 0.7593 acc 0.7383 | time 8.0s
[ArchA |

In [50]:
# Test both architecture winners and choose overall winner

# helper that builds a fresh model instance by architecture name
def build_model_for_arch(arch_name: str):
    if arch_name == "ArchA":
        return BasicCNN(num_classes=10).to(device)
    elif arch_name == "ArchB":
        return ResNetLite(num_classes=10, base_width=64, layers=(2, 2, 2)).to(device)
    else:
        raise ValueError(f"Unknown architecture name: {arch_name}")

# helper that loads 'best-val' weights into a fresh model using a winner summary dict
def load_best_model(winner_summary: dict):
    model = build_model_for_arch(winner_summary["arch"])
    state = torch.load(winner_summary["best_ckpt"], map_location=device)
    model.load_state_dict(state["model_state"])
    model.eval()
    # return the ready-to-evaluate model
    return model

# build and load the Arch-A winner model
model_A_win = load_best_model(winA)
# evaluate the Arch-A winner on the 10k CIFAR-10 test set
test_loss_A, test_acc_A = evaluate(model_A_win, test_loader, device)
print(f"[TEST] Arch-A winner (fold={winA['val_fold']}) | loss {test_loss_A:.4f} | acc {test_acc_A:.4f}")

# build and load the Arch-B winner model
model_B_win = load_best_model(winB)
# evaluate the Arch-B winner on the 10k CIFAR-10 test set
test_loss_B, test_acc_B = evaluate(model_B_win, test_loader, device)
print(f"[TEST] Arch-B winner (fold={winB['val_fold']}) | loss {test_loss_B:.4f} | acc {test_acc_B:.4f}")

# select the final overall winner by higher test accuracy
if test_acc_A >= test_acc_B:
    final_winner = {"arch": "ArchA", "fold": winA["val_fold"], "test_acc": float(test_acc_A), "ckpt": winA["best_ckpt"]}
# otherwise Arch-B has strictly higher test accuracy
else:
    final_winner = {"arch": "ArchB", "fold": winB["val_fold"], "test_acc": float(test_acc_B), "ckpt": winB["best_ckpt"]}

print(f"[FINAL WINNER] {final_winner['arch']} | fold={final_winner['fold']} | test_acc={final_winner['test_acc']:.4f} | ckpt={final_winner['ckpt']}")


[TEST] Arch-A winner (fold=X) | loss 0.7800 | acc 0.7357
[TEST] Arch-B winner (fold=X) | loss 0.6813 | acc 0.7705
[FINAL WINNER] ArchB | fold=X | test_acc=0.7705 | ckpt=/content/outputs/run_20251025-040551/checkpoints/ArchB_valX_best-val.pth


In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
import numpy as np

def plot_clean_confusion_matrix(model, test_loader, class_names, title):
    model.eval()
    all_preds, all_labels = [], []
    
    with torch.no_grad():
        for images, labels in test_loader:
            images = images.to(device)
            outputs = model(images)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    cm = confusion_matrix(all_labels, all_preds)
    
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
                xticklabels=class_names, yticklabels=class_names)
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.title(f'Confusion Matrix - {title}')
    plt.tight_layout()
    plt.show()
    
    # Verify row sums
    row_sums = cm.sum(axis=1)
    print("Row sums (should all be 1000):", row_sums)
    
    return cm

# Regenerate for your models
cm_archB = plot_clean_confusion_matrix(model_B_win, test_loader, class_names, "Arch-B Winner")

In [51]:
# --- Quick sanity reset before H1: plain datasets + loaders for a clean H1 run ---

import torchvision, torchvision.transforms as T
from torch.utils.data import DataLoader
import numpy as np

# 1) Plain (no augmentation) transforms for H1
_plain = T.Compose([T.ToTensor(), T.Normalize((0.5,0.5,0.5),(0.5,0.5,0.5))])

# 2) Recreate datasets (do NOT use the H2 augmented ones)
train_dataset = torchvision.datasets.CIFAR10(root=DATA_DIR, train=True,  download=False, transform=_plain)
test_dataset  = torchvision.datasets.CIFAR10(root=DATA_DIR, train=False, download=False, transform=_plain)

# 3) Rebuild test loader (plain)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False,
                         num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY)

# 4) Rebuild the 3-fold split deterministically
rng = np.random.RandomState(SEED)
idx = np.arange(len(train_dataset))
rng.shuffle(idx)
X_idx, Y_idx, Z_idx = np.array_split(idx, 3)
FOLDS = {"X": X_idx.tolist(), "Y": Y_idx.tolist(), "Z": Z_idx.tolist()}
print("Fold sizes:", {k: len(v) for k,v in FOLDS.items()})

# 5) Quick check of transforms being used
print("H1 train transform:", train_dataset.transform)
print("H1 test  transform:", test_dataset.transform)


Fold sizes: {'X': 16667, 'Y': 16667, 'Z': 16666}
H1 train transform: Compose(
    ToTensor()
    Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
)
H1 test  transform: Compose(
    ToTensor()
    Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
)


In [34]:
del final_winner_model, model_A_win, model_B_win
torch.cuda.empty_cache()


In [52]:
# Does changing model size (base_width) change performance? Fold = X

# build→train→load_best→test for a given base_width using your Part-1 recipe
def _run_bw_X(bw: int, tag: str):
    # make model with requested width
    model = ResNetLite(num_classes=10, base_width=bw, layers=(2,2,2)).to(device)
    # train on Fold-X with the same Part-1 trainer (SGD, CE, 10 epochs)
    summary = train_for_fold_logged(model, arch_name=tag, val_fold="X", num_epochs=10, lr=0.01)
    # keep result for bookkeeping
    record_fold_result(summary)
    # reload best weights into a fresh model and eval on test set
    m = ResNetLite(num_classes=10, base_width=bw, layers=(2,2,2)).to(device)
    state = torch.load(summary["best_ckpt"], map_location=device); m.load_state_dict(state["model_state"]); m.eval()
    # compute test metrics on your existing test_loader
    tl, ta = evaluate(m, test_loader, device)
    # return summary + test numbers
    return summary, float(tl), float(ta)

# get baseline (bw=64) from Part-1 RESULTS; if missing, train it
base = RESULTS.get("ArchB", {}).get("X")
if base is None:
    base, _, _ = _run_bw_X(64, "H1_bw64")

# run SMALL (bw=32) and LARGE (bw=96)
small, tl_s, ta_s = _run_bw_X(32, "H1_bw32")
large, tl_l, ta_l = _run_bw_X(96, "H1_bw96")

# evaluate baseline on test 
m64 = ResNetLite(num_classes=10, base_width=64, layers=(2,2,2)).to(device)
st64 = torch.load(base["best_ckpt"], map_location=device); m64.load_state_dict(st64["model_state"]); m64.eval()
tl_b, ta_b = evaluate(m64, test_loader, device)

# print compact comparison
print("\n[H1 COMPARISON]")
print(f"SMALL (bw=32) | val_best={small['best_val_acc']:.4f} (ep{small['best_epoch']}) | test_acc={ta_s:.4f}")
print(f"BASE  (bw=64) | val_best={base['best_val_acc']:.4f} (ep{base['best_epoch']}) | test_acc={ta_b:.4f}")
print(f"LARGE (bw=96) | val_best={large['best_val_acc']:.4f} (ep{large['best_epoch']}) | test_acc={ta_l:.4f}")
print("H1 complete.")


[Fold with val=X] train samples: 33333 | val samples: 16667
[H1_bw32 | val=X] epoch 001 | train: loss 1.4789 acc 0.4556 | val: loss 1.4024 acc 0.4928 | time 13.4s
[H1_bw32 | val=X] epoch 002 | train: loss 1.0204 acc 0.6353 | val: loss 0.9770 acc 0.6506 | time 13.5s
[H1_bw32 | val=X] epoch 003 | train: loss 0.8455 acc 0.6996 | val: loss 1.0686 acc 0.6207 | time 13.4s
[H1_bw32 | val=X] epoch 004 | train: loss 0.7071 acc 0.7495 | val: loss 1.0007 acc 0.6678 | time 13.3s
[H1_bw32 | val=X] epoch 005 | train: loss 0.6045 acc 0.7865 | val: loss 0.9236 acc 0.7031 | time 13.4s
[H1_bw32 | val=X] epoch 006 | train: loss 0.5202 acc 0.8183 | val: loss 1.2134 acc 0.6461 | time 13.3s
[H1_bw32 | val=X] epoch 007 | train: loss 0.4514 acc 0.8451 | val: loss 0.8808 acc 0.7059 | time 13.8s
[H1_bw32 | val=X] epoch 008 | train: loss 0.3765 acc 0.8704 | val: loss 0.8872 acc 0.7332 | time 13.4s
[H1_bw32 | val=X] epoch 009 | train: loss 0.3142 acc 0.8927 | val: loss 0.7771 acc 0.7557 | time 13.4s
[H1_bw32 | va

In [53]:
#Needs more regularization for better training. (justify) (test) (yes/no) (explain)
#regularization via Crop+Flip + Label Smoothing on Fold-X, with robust fallbacks

import os, glob, time
import torch, torch.nn as nn, torch.optim as optim
import torchvision, torchvision.transforms as T
from torch.utils.data import DataLoader, SubsetRandomSampler

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

try:
    NUM_WORKERS
except NameError:
    NUM_WORKERS = 0
PIN_MEMORY = bool(torch.cuda.is_available())

# ensure a deterministic generator exists for samplers
if "loader_generator" not in globals():
    loader_generator = torch.Generator()
    loader_generator.manual_seed(42)

# ensure DATA_DIR exists (fallback to ./data if not set)
try:
    DATA_DIR
except NameError:
    DATA_DIR = "./data"

# ensure BATCH_SIZE exists (fallback to 128)
try:
    BATCH_SIZE
except NameError:
    BATCH_SIZE = 128

# ensure the ResNetLite class exists in scope; if not, raise a helpful error
assert "ResNetLite" in globals(), "ResNetLite class is not defined in this session. Re-run the cell where you defined it."

# rebuild FOLDS if missing (same 3-way split as Part-1, deterministic)
if "FOLDS" not in globals():
    all_idx = list(range(50000))
    g = torch.Generator(); g.manual_seed(42); perm = torch.randperm(50000, generator=g).tolist()
    all_idx = [all_idx[i] for i in perm]
    X = all_idx[:16667]; Y = all_idx[16667:33334]; Z = all_idx[33334:]
    FOLDS = {"X": X, "Y": Y, "Z": Z}
    print("FOLDS was missing — rebuilt deterministically.")

# build train-time augmentation and plain eval transforms
train_aug = T.Compose([T.RandomCrop(32, padding=4), T.RandomHorizontalFlip(), T.ToTensor(), T.Normalize((0.5,)*3, (0.5,)*3)])
eval_plain = T.Compose([T.ToTensor(), T.Normalize((0.5,)*3, (0.5,)*3)])

# instantiate CIFAR-10 datasets with the chosen transforms
ds_train_aug   = torchvision.datasets.CIFAR10(root=DATA_DIR, train=True,  download=True, transform=train_aug)
ds_train_plain = torchvision.datasets.CIFAR10(root=DATA_DIR, train=True,  download=False, transform=eval_plain)
ds_test_plain  = torchvision.datasets.CIFAR10(root=DATA_DIR, train=False, download=False, transform=eval_plain)

# pick Fold-X indices and build the complementary training indices
val_idx   = FOLDS["X"]
train_idx = [i for k, idxs in FOLDS.items() if k != "X" for i in idxs]

# create DataLoaders (aug for train, plain for val/test)
train_loader = DataLoader(ds_train_aug, batch_size=BATCH_SIZE,
                          sampler=SubsetRandomSampler(train_idx, generator=loader_generator),
                          num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY, drop_last=False)
val_loader   = DataLoader(ds_train_plain, batch_size=BATCH_SIZE,
                          sampler=SubsetRandomSampler(val_idx, generator=loader_generator),
                          num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY, drop_last=False)
test_loader_ = DataLoader(ds_test_plain, batch_size=BATCH_SIZE, shuffle=False,
                          num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY)

# build the winner architecture fresh (ResNetLite, bw=64)
model = ResNetLite(num_classes=10, base_width=64, layers=(2,2,2)).to(device)

# define CrossEntropy with label smoothing (regularization)
criterion = nn.CrossEntropyLoss(label_smoothing=0.1)

# use SGD+momentum as in Part-1
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9, weight_decay=5e-4)

# train for 10 epochs using your Part-1-style loop (simple + robust)
best_val, best_ep = -1.0, -1
for ep in range(1, 11):
    # put model in train mode and init counters
    model.train(); start = time.time(); tot=0; correct=0; loss_sum=0.0
    # iterate over training batches
    for x,y in train_loader:
        # move to device
        x,y = x.to(device), y.to(device)
        # zero grads
        optimizer.zero_grad(set_to_none=True)
        # forward
        logits = model(x)
        # loss
        loss = criterion(logits, y)
        # backward
        loss.backward()
        # step
        optimizer.step()
        # accumulate stats
        loss_sum += loss.item() * y.size(0); tot += y.size(0); correct += (logits.argmax(1) == y).sum().item()
    # compute training metrics
    tr_loss = loss_sum / max(1, tot); tr_acc = correct / max(1, tot)
    # evaluate on validation loader
    with torch.no_grad():
        model.eval(); v_tot=0; v_cor=0; v_loss=0.0
        for xv, yv in val_loader:
            xv, yv = xv.to(device), yv.to(device)
            out = model(xv)
            loss_v = criterion(out, yv)
            v_loss += loss_v.item() * yv.size(0); v_tot += yv.size(0); v_cor += (out.argmax(1) == yv).sum().item()
        va_loss = v_loss / max(1, v_tot); va_acc = v_cor / max(1, v_tot)
    # print epoch progress
    print(f"[H2|val=X] epoch {ep:02d} | train {tr_loss:.4f}/{tr_acc:.4f} | val {va_loss:.4f}/{va_acc:.4f} | time {time.time()-start:.1f}s")
    # track best validation
    if va_acc > best_val: best_val, best_ep = va_acc, ep

# evaluate the regularized model on the 10k test set
with torch.no_grad():
    model.eval(); t_tot=0; t_cor=0; t_loss=0.0
    for xt, yt in test_loader_:
        xt, yt = xt.to(device), yt.to(device)
        out = model(xt)
        loss_t = criterion(out, yt)
        t_loss += loss_t.item() * yt.size(0); t_tot += yt.size(0); t_cor += (out.argmax(1) == yt).sum().item()
    test_loss_reg = t_loss / max(1, t_tot); test_acc_reg = t_cor / max(1, t_tot)

# print the regularized model test metrics
print(f"[TEST|H2] REG (aug+LS) | loss {test_loss_reg:.4f} | acc {test_acc_reg:.4f} | best_val={best_val:.4f} (ep{best_ep})")

# try to locate a Part-1 baseline checkpoint for Arch-B Fold-X even if RESULTS is missing
ckpt_guess = None
if "RESULTS" in globals() and "ArchB" in RESULTS and "X" in RESULTS["ArchB"]:
    ckpt_guess = RESULTS["ArchB"]["X"]["best_ckpt"]
else:
    # search common filename under your checkpoints directory
    try:
        CKPT_DIR
    except NameError:
        CKPT_DIR = "./outputs/checkpoints"
    # look for ArchB Fold-X best checkpoint
    matches = glob.glob(os.path.join(CKPT_DIR, "*ArchB*valX*best-val.pth"))
    ckpt_guess = matches[0] if matches else None

# if we found a baseline checkpoint, load and compare; otherwise warn and skip comparison
if ckpt_guess is not None:
    # build a fresh baseline model and load weights
    base_model = ResNetLite(num_classes=10, base_width=64, layers=(2,2,2)).to(device)
    state = torch.load(ckpt_guess, map_location=device)
    base_model.load_state_dict(state["model_state"])
    base_model.eval()
    # evaluate baseline on the same test loader
    with torch.no_grad():
        bt, bc, bl = 0, 0, 0.0
        for xt, yt in test_loader_:
            xt, yt = xt.to(device), yt.to(device)
            out = base_model(xt)
            l = criterion(out, yt)
            bl += l.item() * yt.size(0); bt += yt.size(0); bc += (out.argmax(1) == yt).sum().item()
        test_loss_base = bl / max(1, bt); test_acc_base = bc / max(1, bt)
    # print side-by-side comparison
    print(f"[TEST|H2] BASE (no-aug) | loss {test_loss_base:.4f} | acc {test_acc_base:.4f}")
    print(f"\n[H2 RESULT] REG acc={test_acc_reg:.4f}  vs  BASE acc={test_acc_base:.4f}")
else:
    # notify that we could not find a baseline checkpoint
    print("\n[H2 NOTICE] Could not find Part-1 baseline checkpoint (ArchB Fold-X). Report REG numbers above, or re-run Part-1 Step-7 to regenerate.")


Device: cuda
[H2|val=X] epoch 01 | train 1.7051/0.4332 | val 1.7746/0.4667 | time 26.0s
[H2|val=X] epoch 02 | train 1.3617/0.6182 | val 1.5659/0.5597 | time 26.3s
[H2|val=X] epoch 03 | train 1.2122/0.6906 | val 1.3405/0.6810 | time 25.9s
[H2|val=X] epoch 04 | train 1.1031/0.7427 | val 1.1411/0.7511 | time 25.9s
[H2|val=X] epoch 05 | train 1.0306/0.7817 | val 1.1129/0.7497 | time 26.1s
[H2|val=X] epoch 06 | train 0.9826/0.8032 | val 1.0903/0.7632 | time 26.8s
[H2|val=X] epoch 07 | train 0.9363/0.8229 | val 1.0297/0.7921 | time 26.1s
[H2|val=X] epoch 08 | train 0.9096/0.8374 | val 0.9823/0.8128 | time 27.0s
[H2|val=X] epoch 09 | train 0.8819/0.8496 | val 1.0168/0.8001 | time 27.0s
[H2|val=X] epoch 10 | train 0.8514/0.8641 | val 0.9792/0.8134 | time 26.6s
[TEST|H2] REG (aug+LS) | loss 0.9922 | acc 0.8114 | best_val=0.8134 (ep10)
[TEST|H2] BASE (no-aug) | loss 1.4144 | acc 0.7705

[H2 RESULT] REG acc=0.8114  vs  BASE acc=0.7705


In [57]:
#Changing the optimizer (e.g: from SGD to Adam) improves performance (justify) (test) (yes/no) (explain)
import copy, torch.nn as nn, torch.optim as optim

# use plain CE to isolate optimizer effect (set to 0.1 if you want LS)
criterion = nn.CrossEntropyLoss(label_smoothing=0.0)

def run_opt_best(opt_name, lr, wd):
    # fresh model
    model = ResNetLite(num_classes=10, base_width=64, layers=(2,2,2)).to(device)
    # optimizer
    optimizer = (optim.AdamW(model.parameters(), lr=lr, weight_decay=wd)
                 if opt_name=="adamw" else
                 optim.SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=wd))
    # track best
    best_val, best_ep, best_state = -1.0, -1, None
    # train 10 epochs, keep best weights in memory
    for ep in range(1, 11):
        tr_loss, tr_acc = _train_epoch_adapt(model, train_loader, criterion, optimizer, device)
        va_loss, va_acc = _eval_adapt(model, val_loader, device)
        print(f"[H3-best|{opt_name}] ep{ep:02d} | train {tr_loss:.4f}/{tr_acc:.4f} | val {va_loss:.4f}/{va_acc:.4f}")
        if va_acc > best_val:
            best_val, best_ep, best_state = va_acc, ep, copy.deepcopy(model.state_dict())
    # restore best weights and test
    model.load_state_dict(best_state); model.eval()
    tl, ta = _eval_adapt(model, TEST_LOADER, device)
    return {"opt": opt_name, "val_best": best_val, "best_ep": best_ep, "test_acc": float(ta), "test_loss": float(tl)}

# run both with reasonable defaults
res_sgd_best   = run_opt_best("sgd",   lr=0.01,  wd=5e-4)
res_adamw_best = run_opt_best("adamw", lr=3e-4,  wd=1e-4)

# summary
print("\n[H3 COMPARISON — best epoch tested]")
print(f"SGD   | val_best={res_sgd_best['val_best']:.4f} (ep{res_sgd_best['best_ep']}) | test_acc={res_sgd_best['test_acc']:.4f}")
print(f"AdamW | val_best={res_adamw_best['val_best']:.4f} (ep{res_adamw_best['best_ep']}) | test_acc={res_adamw_best['test_acc']:.4f}")


[H3-best|sgd] ep01 | train 1.5393/0.4276 | val 1.4600/0.4817
[H3-best|sgd] ep02 | train 1.0874/0.6109 | val 1.5242/0.5322
[H3-best|sgd] ep03 | train 0.8876/0.6883 | val 1.1908/0.6080
[H3-best|sgd] ep04 | train 0.7462/0.7355 | val 0.9324/0.7033
[H3-best|sgd] ep05 | train 0.6520/0.7702 | val 0.8995/0.7017
[H3-best|sgd] ep06 | train 0.5813/0.7991 | val 1.1136/0.6635
[H3-best|sgd] ep07 | train 0.5296/0.8151 | val 0.6797/0.7808
[H3-best|sgd] ep08 | train 0.4865/0.8301 | val 0.6735/0.7781
[H3-best|sgd] ep09 | train 0.4554/0.8400 | val 0.7568/0.7457
[H3-best|sgd] ep10 | train 0.4138/0.8548 | val 0.6927/0.7777
[H3-best|adamw] ep01 | train 1.4340/0.4742 | val 1.2242/0.5648
[H3-best|adamw] ep02 | train 1.0281/0.6335 | val 1.1431/0.6146
[H3-best|adamw] ep03 | train 0.8508/0.7001 | val 0.9486/0.6723
[H3-best|adamw] ep04 | train 0.7184/0.7487 | val 0.9780/0.6817
[H3-best|adamw] ep05 | train 0.6339/0.7804 | val 0.6397/0.7805
[H3-best|adamw] ep06 | train 0.5611/0.8056 | val 0.7824/0.7455
[H3-best|ada

Does changing to Adam/AdamW improve performance?

Yes
SGD → best val 0.7808 (ep7) → test 0.7834
AdamW → best val 0.8161 (ep8) → test 0.8096

Justification
Adaptive steps: Adam/AdamW scales updates per-parameter using first/second moments → faster, more stable progress early, especially with short training (10 epochs).

Decoupled weight decay (AdamW): cleaner L2 regularization than Adam’s original L2-as-grad, often better generalization.

Short budget effect: With only 10 epochs, AdamW often reaches a better solution sooner. With more epochs + tuned schedules, SGD can catch up or tie (so the conclusion is contingent on this training budget).

Changing from SGD to AdamW improved performance under the same 10-epoch setup (SGD 78.34% → AdamW 80.96% test). Reason: AdamW’s adaptive updates and decoupled weight decay converge faster and generalize better in short training; with longer training and tuned schedules, SGD may close the gap.