In [1]:
! python3 -m pip install torch torchvision matplotlib tqdm albumentations opencv-python-headless
from __future__ import annotations
import torch
import torchvision
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import numpy as np
import matplotlib.pyplot as plt
from collections import Counter
import albumentations as A
from albumentations.pytorch import ToTensorV2


Collecting torch
  Using cached torch-2.8.0-cp313-none-macosx_11_0_arm64.whl.metadata (30 kB)
Collecting torchvision
  Using cached torchvision-0.23.0-cp313-cp313-macosx_11_0_arm64.whl.metadata (6.1 kB)
Collecting matplotlib
  Using cached matplotlib-3.10.6-cp313-cp313-macosx_11_0_arm64.whl.metadata (11 kB)
Collecting tqdm
  Using cached tqdm-4.67.1-py3-none-any.whl.metadata (57 kB)
Collecting albumentations
  Using cached albumentations-2.0.8-py3-none-any.whl.metadata (43 kB)
Collecting opencv-python-headless
  Using cached opencv_python_headless-4.12.0.88-cp37-abi3-macosx_13_0_arm64.whl.metadata (19 kB)
Collecting filelock (from torch)
  Using cached filelock-3.19.1-py3-none-any.whl.metadata (2.1 kB)
Collecting typing-extensions>=4.10.0 (from torch)
  Using cached typing_extensions-4.15.0-py3-none-any.whl.metadata (3.3 kB)
Collecting setuptools (from torch)
  Using cached setuptools-80.9.0-py3-none-any.whl.metadata (6.6 kB)
Collecting sympy>=1.13.3 (from torch)
  Using cached sympy-1

In [None]:
# Load CIFAR-10 with basic transforms
train_tfms = transforms.Compose([
    transforms.ToTensor(),
])

test_tfms = transforms.Compose([
    transforms.ToTensor(),
])

train_ds = datasets.CIFAR10(root='./data', train=True, download=True, transform=train_tfms)
test_ds = datasets.CIFAR10(root='./data', train=False, download=True, transform=test_tfms)

classes = train_ds.classes
print('Classes:', classes)

train_loader = DataLoader(train_ds, batch_size=256, shuffle=True, num_workers=2)
test_loader = DataLoader(test_ds, batch_size=256, shuffle=False, num_workers=2)


In [None]:
# Compute dataset mean/std per channel (RGB) on training set
sum_ = torch.zeros(3)
sum_sq = torch.zeros(3)
count = 0
for imgs, _ in DataLoader(train_ds, batch_size=512, shuffle=False, num_workers=2):
    b, c, h, w = imgs.shape
    imgs = imgs.view(b, c, -1)
    sum_ += imgs.mean(dim=(0, 2)) * b
    sum_sq += (imgs ** 2).mean(dim=(0, 2)) * b
    count += b

mean = (sum_ / count).tolist()
std = ((sum_sq / count - torch.tensor(mean) ** 2).sqrt()).tolist()
print('Train mean:', mean)
print('Train std:', std)


In [None]:
# Class distribution in training set
counts = Counter(train_ds.targets)
print({classes[k]: counts[k] for k in range(len(classes))})


In [None]:
# Visualize a grid of training images
batch = next(iter(train_loader))
imgs, labels = batch
fig, axes = plt.subplots(4, 8, figsize=(12, 6))
for ax, img, lbl in zip(axes.flatten(), imgs[:32], labels[:32]):
    ax.imshow(np.transpose(img.numpy(), (1, 2, 0)))
    ax.set_title(classes[lbl])
    ax.axis('off')
plt.tight_layout()
plt.show()


In [None]:
# Preview common augmentations
aug_tfms = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
    transforms.ToTensor(),
    transforms.Normalize(mean, std)
])

aug_ds = datasets.CIFAR10(root='./data', train=True, download=False, transform=aug_tfms)
aug_loader = DataLoader(aug_ds, batch_size=32, shuffle=True)

aug_imgs, aug_labels = next(iter(aug_loader))
# de-normalize for display
aug_imgs_disp = aug_imgs.clone()
for c in range(3):
    aug_imgs_disp[:, c] = aug_imgs_disp[:, c] * std[c] + mean[c]

fig, axes = plt.subplots(4, 8, figsize=(12, 6))
for ax, img, lbl in zip(axes.flatten(), aug_imgs_disp[:32], aug_labels[:32]):
    ax.imshow(np.transpose(img.numpy(), (1, 2, 0)))
    ax.set_title(classes[lbl])
    ax.axis('off')
plt.tight_layout()
plt.show()


In [None]:
# Albumentations transforms using dataset mean
mean, std
alb_train_tfms = A.Compose([
    A.HorizontalFlip(p=0.5),
    A.ShiftScaleRotate(shift_limit=0.0625, scale_limit=0.1, rotate_limit=15, p=0.5),
    A.CoarseDropout(
        max_holes=1,
        max_height=16,
        max_width=16,
        min_holes=1,
        min_height=16,
        min_width=16,
        fill_value=(int(mean[0]*255), int(mean[1]*255), int(mean[2]*255)),
        mask_fill_value=None,
        p=0.5,
    ),
    A.Normalize(mean=mean, std=std),
    ToTensorV2(),
])

alb_test_tfms = A.Compose([
    A.Normalize(mean=mean, std=std),
    ToTensorV2(),
])

# Wrapper to apply Albumentations on torchvision dataset
class AlbumentationsCIFAR(torchvision.datasets.CIFAR10):
    def __init__(self, *args, transform=None, **kwargs):
        super().__init__(*args, transform=None, **kwargs)
        self.alb_transform = transform
    def __getitem__(self, idx):
        img, target = self.data[idx], self.targets[idx]  # HWC RGB uint8
        if self.alb_transform is not None:
            augmented = self.alb_transform(image=img)
            img = augmented['image']
        return img, target

train_ds_alb = AlbumentationsCIFAR(root='./data', train=True, download=False, transform=alb_train_tfms)
test_ds_alb = AlbumentationsCIFAR(root='./data', train=False, download=False, transform=alb_test_tfms)

train_loader = DataLoader(train_ds_alb, batch_size=128, shuffle=True, num_workers=0)
test_loader = DataLoader(test_ds_alb, batch_size=128, shuffle=False, num_workers=0)


In [None]:
# C1-C2-C3-C4 CNN with last stride 2, then GAP and classifier
from model import Net
import torch

model = Net().to(torch.device('cuda' if torch.cuda.is_available() else 'cpu'))
print(sum(p.numel() for p in model.parameters()))
    

In [None]:
! pip install torchsummary
from torchsummary import summary
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
print(device)
model = Net().to(device)
summary(model, input_size=(3, 32, 32))

In [None]:
# Receptive field quick check
# Stack: C1(3x3,3x3), C2(3x3,3x3), C3(3x3,3x3), C4(3x3, 3x3 s=2), then 7x7, 5x5, 5x5
layers = [
    (3,1), (3,1),  # C1
    (3,1), (3,1),  # C2
    (3,1), (3,1),  # C3
    (3,1), (3,2),  # C4 start (downsample)
    (7,1), (5,1), (5,1),  # extra large kernels in C4
]
rf = 1
jump = 1
for k,s in layers:
    rf = rf + (k-1)*jump
    jump *= s
print('Approx RF:', rf)  # expect >= 45


In [None]:
from tqdm import tqdm
import torch.nn.functional as F

train_losses = []
test_losses = []
train_acc = []
test_acc = []

def train(model, device, train_loader, optimizer, epoch):
  model.train()
  pbar = tqdm(train_loader)
  correct = 0
  processed = 0
  for batch_idx, (data, target) in enumerate(pbar):
    # get samples
    data, target = data.to(device), target.to(device)

    # Init
    optimizer.zero_grad()
    # In PyTorch, we need to set the gradients to zero before starting to do backpropragation because PyTorch accumulates the gradients on subsequent backward passes.
    # Because of this, when you start your training loop, ideally you should zero out the gradients so that you do the parameter update correctly.

    # Predict
    y_pred = model(data)

    # Calculate loss
    loss = F.nll_loss(y_pred, target)
    train_losses.append(loss)

    # Backpropagation
    loss.backward()
    optimizer.step()

    # Update pbar-tqdm

    pred = y_pred.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
    correct += pred.eq(target.view_as(pred)).sum().item()
    processed += len(data)

    pbar.set_description(desc= f'Loss={loss.item()} Batch_id={batch_idx} Accuracy={100*correct/processed:0.2f}')
    train_acc.append(100*correct/processed)

def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()  # sum up batch loss
            pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)
    test_losses.append(test_loss)

    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))

    test_acc.append(100. * correct / len(test_loader.dataset))

In [None]:
import torch.optim as optim

model =  Net().to(device)
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
EPOCHS = 20
for epoch in range(EPOCHS):
    print("EPOCH:", epoch)
    train(model, device, train_loader, optimizer, epoch)
    test(model, device, test_loader)