In [1]:
%pip install torchvision

import os, math, random, numpy as np, torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Subset
from torchvision import datasets, transforms


Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [2]:
import torch, random, numpy as np
from torch.utils.data import DataLoader, Subset
from torchvision import datasets, transforms

def set_seed(seed=42):
    random.seed(seed); np.random.seed(seed)
    torch.manual_seed(seed); torch.cuda.manual_seed_all(seed)

def get_device():
    return torch.device("cuda" if torch.cuda.is_available() else "cpu")

def _compute_mean_std(torchvision_dataset):
    loader = DataLoader(torchvision_dataset, batch_size=512, shuffle=False, num_workers=2)
    mean = 0.0; std = 0.0; n = 0
    for x, _ in loader:
        b = x.size(0)
        x = x.view(b, x.size(1), -1)         # [B, C, H*W]
        mean += x.mean(2).sum(0)              # sum of per-sample means
        std  += x.std(2, unbiased=False).sum(0)
        n += b
    mean /= n; std /= n
    return mean.tolist(), std.tolist()

def get_dataloaders(dataset="MNIST", batch_size=128, max_train=None, max_test=None,
                    norm_mode="standardize"):
    """
    norm_mode:
      - "standardize": compute train-set mean/std (StandardScaler-style)
      - "minus1to1": map [0,1] -> [-1,1] using mean=(0.5,)*C, std=(0.5,)*C
      - "none": only ToTensor()
    """
    name = dataset.upper()
    if name in ["MNIST", "FASHIONMNIST", "FASHION-MNIST"]:
        DS = datasets.FashionMNIST if "FASHION" in name else datasets.MNIST
        base_train = DS(root="./data", train=True, download=True, transform=transforms.ToTensor())
        if norm_mode == "standardize":
            mean, std = _compute_mean_std(base_train)
            norm = transforms.Normalize(mean, std)
        elif norm_mode == "minus1to1":
            norm = transforms.Normalize((0.5,), (0.5,))
        elif norm_mode == "none":
            norm = None
        else:
            raise ValueError("Unknown norm_mode")

        tform = transforms.Compose([transforms.ToTensor()] + ([norm] if norm else []))
        train_set = DS(root="./data", train=True,  download=True, transform=tform)
        test_set  = DS(root="./data", train=False, download=True, transform=tform)

    elif name == "CIFAR10":
        base_train = datasets.CIFAR10(root="./data", train=True, download=True, transform=transforms.ToTensor())
        if norm_mode == "standardize":
            mean, std = _compute_mean_std(base_train)
            norm = transforms.Normalize(mean, std)
        elif norm_mode == "minus1to1":
            norm = transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
        elif norm_mode == "none":
            norm = None
        else:
            raise ValueError("Unknown norm_mode")

        tform = transforms.Compose([transforms.ToTensor()] + ([norm] if norm else []))
        train_set = datasets.CIFAR10(root="./data", train=True,  download=True, transform=tform)
        test_set  = datasets.CIFAR10(root="./data", train=False, download=True, transform=tform)

    else:
        raise ValueError("Unsupported dataset.")

    if max_train: train_set = Subset(train_set, list(range(min(max_train, len(train_set)))))
    if max_test:  test_set  = Subset(test_set,  list(range(min(max_test,  len(test_set)))))

    return (
        DataLoader(train_set, batch_size=batch_size, shuffle=True,  num_workers=2, pin_memory=True),
        DataLoader(test_set,  batch_size=batch_size, shuffle=False, num_workers=2, pin_memory=True)
    )


In [14]:
import torch
import torch.nn as nn

class SmallCNN(nn.Module):
    def __init__(self, in_channels=1, num_classes=10, activation="relu", p_drop=0.0, input_size=28):
        super().__init__()

        def make_act(name):
            return {
                "relu": nn.ReLU,
                "tanh": nn.Tanh,
                "gelu": nn.GELU,
                "lrelu": lambda: nn.LeakyReLU(0.1),
            }[name]()  # fresh instance each time

        self.features = nn.Sequential(
            nn.Conv2d(in_channels, 32, 3, padding=1), make_act(activation), nn.MaxPool2d(2),
            nn.Conv2d(32, 64, 3, padding=1),          make_act(activation), nn.MaxPool2d(2),
            nn.Conv2d(64, 128, 3, padding=1),         make_act(activation), nn.MaxPool2d(2),
        )
        self.dropout = nn.Dropout(p_drop)

        # compute flattened size dynamically (works for 28x28, 32x32, etc.)
        with torch.no_grad():
            dummy = torch.zeros(1, in_channels, input_size, input_size)
            flat = self.features(dummy).view(1, -1).size(1)

        #### SINGLE LAYER OF FULLY CONNECTED LAYER
      #  self.fc = nn.Sequential(
      #      nn.Linear(flat, 128), make_act(activation),
      #      nn.Linear(128, num_classes),
      #  )
 #### SINGLE LAYER OF FULLY CONNECTED LAYER
        self.fc = nn.Sequential(
            nn.Linear(flat, 128), make_act(activation),
            nn.Linear(128, 64), make_act(activation),
            nn.Linear(64, num_classes),
        )
    def forward(self, x):
        z = self.features(x)
        z = z.view(z.size(0), -1)
        z = self.dropout(z)
        return self.fc(z)

In [15]:
def train_one_epoch(model, loader, opt, device, criterion, scaler=None, scheduler=None, grad_clip=None):
    model.train()
    total, correct, loss_sum = 0, 0, 0.0

    autocast_ctx = torch.cuda.amp.autocast if (scaler is not None and device.type == "cuda") else nullcontext
    for x, y in loader:
        x = x.to(device, non_blocking=True)
        y = y.to(device, non_blocking=True)

        opt.zero_grad(set_to_none=True)
        with autocast_ctx():
            logits = model(x)
            loss = criterion(logits, y)

        if scaler is None:
            loss.backward()
            if grad_clip is not None:
                torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip)
            opt.step()
        else:
            scaler.scale(loss).backward()
            if grad_clip is not None:
                scaler.unscale_(opt)
                torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip)
            scaler.step(opt)
            scaler.update()

        if scheduler is not None:
            scheduler.step()

        # metrics
        loss_sum += float(loss) * y.size(0)
        pred = logits.argmax(dim=1)
        correct += (pred == y).sum().item()
        total += y.size(0)

    return loss_sum / total, correct / total


from contextlib import nullcontext

@torch.inference_mode()
def evaluate(model, loader, device, criterion):
    model.eval()
    total, correct, loss_sum = 0, 0, 0.0
    for x, y in loader:
        x = x.to(device, non_blocking=True)
        y = y.to(device, non_blocking=True)
        logits = model(x)
        loss = criterion(logits, y)
        loss_sum += float(loss) * y.size(0)
        pred = logits.argmax(dim=1)
        correct += (pred == y).sum().item()
        total += y.size(0)
    return loss_sum / total, correct / total

In [18]:
if __name__ == "__main__":
    set_seed(7)
    device = get_device()

    ## for MNIST
    #train_loader, test_loader = get_dataloaders(dataset="MNIST", batch_size=128, max_train=10000, max_test=2000)

    #model = SmallCNN(in_channels=1, num_classes=10, activation="relu", p_drop=0.0, input_size=28).to(device)

    ## for FASHIONMNIST
    #train_loader, test_loader = get_dataloaders(dataset="FASHIONMNIST", batch_size=128, max_train=10000, max_test=2000)

    #model = SmallCNN(in_channels=1, num_classes=10, activation="relu", p_drop=0.0, input_size=28).to(device)
    ## for CIFAR10
    train_loader, test_loader = get_dataloaders(dataset="CIFAR10", batch_size=128, max_train=20000, max_test=5000)
    model = SmallCNN(in_channels=3, num_classes=10, activation="relu", p_drop=0.0, input_size=32).to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.05, momentum=0.9, weight_decay=0.0)

    for epoch in range(20):
        tr_loss, tr_acc = train_one_epoch(model, train_loader, optimizer, device, criterion)
        te_loss, te_acc = evaluate(model, test_loader, device, criterion)
        print(f"Epoch {epoch:02d} | train loss {tr_loss:.3f} acc {tr_acc:.3f} | test loss {te_loss:.3f} acc {te_acc:.3f}")


Epoch 00 | train loss 1.902 acc 0.296 | test loss 1.653 acc 0.399
Epoch 01 | train loss 1.419 acc 0.479 | test loss 1.375 acc 0.503
Epoch 02 | train loss 1.188 acc 0.578 | test loss 1.153 acc 0.596
Epoch 03 | train loss 1.014 acc 0.641 | test loss 1.083 acc 0.629
Epoch 04 | train loss 0.891 acc 0.686 | test loss 1.053 acc 0.643
Epoch 05 | train loss 0.746 acc 0.736 | test loss 1.011 acc 0.663
Epoch 06 | train loss 0.678 acc 0.763 | test loss 1.053 acc 0.675
Epoch 07 | train loss 0.590 acc 0.794 | test loss 1.042 acc 0.670
Epoch 08 | train loss 0.509 acc 0.822 | test loss 1.245 acc 0.624
Epoch 09 | train loss 0.456 acc 0.844 | test loss 1.308 acc 0.650
Epoch 10 | train loss 0.417 acc 0.854 | test loss 1.268 acc 0.660
Epoch 11 | train loss 0.378 acc 0.872 | test loss 1.254 acc 0.670
Epoch 12 | train loss 0.371 acc 0.874 | test loss 1.382 acc 0.644
Epoch 13 | train loss 0.352 acc 0.882 | test loss 1.486 acc 0.669
Epoch 14 | train loss 0.337 acc 0.889 | test loss 1.479 acc 0.658
Epoch 15 |