<a href="https://colab.research.google.com/github/suhas-bvp/session7/blob/master/experiment1_CIFAR_CNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from tqdm import tqdm
import ssl
import numpy as np
import matplotlib.pyplot as plt

# Bypass SSL certificate verification for dataset download
ssl._create_default_https_context = ssl._create_unverified_context

# Custom Bottleneck block for ResNet-50-like architecture
# Now includes comments for input channels, output channels, receptive field (RF), and effect
# Bottleneck: Implements the bottleneck residual block used in deep ResNet architectures.
# - Reduces/increases channel dimensions (1x1 conv), extracts spatial features (3x3 conv), expands channel dimensions (1x1 conv).
# - Supports downsampling for residual connections when input/output dimensions differ.
# - Used for efficient deep networks by reducing computation while maintaining representational power.
class Bottleneck(nn.Module):
    expansion = 4
    def __init__(self, in_planes, planes, stride=1):
        super(Bottleneck, self).__init__()
        # conv1: input channel = in_planes, output channel = planes, RF = 1x1, effect: reduce/increase channel dimension
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        # conv2: input channel = planes, output channel = planes, RF = 3x3, effect: spatial feature extraction
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        # conv3: input channel = planes, output channel = planes*expansion, RF = 1x1, effect: expand channel dimension
        self.conv3 = nn.Conv2d(planes, planes * self.expansion, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(planes * self.expansion)
        self.relu = nn.ReLU(inplace=True)
        self.downsample = None
        # Downsample: input channel = in_planes, output channel = planes*expansion, RF = 1x1, effect: match dimensions for residual addition
        if stride != 1 or in_planes != planes * self.expansion:
            self.downsample = nn.Sequential(
                nn.Conv2d(in_planes, planes * self.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(planes * self.expansion)
            )
    def forward(self, x):
        identity = x
        out = self.relu(self.bn1(self.conv1(x)))
        out = self.relu(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))
        if self.downsample is not None:
            identity = self.downsample(x)
        out += identity
        out = self.relu(out)
        return out

# Custom ResNet class (ResNet-50-like)
# Comments added for input/output channels, RF, and effect for each layer
# CustomResNet: Builds a modular, efficient ResNet-50-like architecture for CIFAR-100.
# - Stacks Bottleneck blocks in four main layers with increasing depth and receptive field.
# - Handles initial feature extraction, low/mid/high/semantic feature learning, global pooling, and classification.
# - Includes a summary() method to print architecture details (channels, RF, effect).
# - Initialization ensures stable training.
class CustomResNet(nn.Module):
    def __init__(self, block, layers, num_classes=100):
        super(CustomResNet, self).__init__()
        self.in_planes = 64
        # conv1: input channel = 3, output channel = 64, RF = 3x3, effect: initial feature extraction
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        # layer1: input channel = 64, output channel = 256, RF increases, effect: learn low-level features
        self.layer1 = self._make_layer(block, 64, layers[0])
        # layer2: input channel = 256, output channel = 512, RF increases, effect: learn mid-level features
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        # layer3: input channel = 512, output channel = 1024, RF increases, effect: learn high-level features
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        # layer4: input channel = 1024, output channel = 2048, RF increases, effect: learn semantic features
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)
        # avgpool: input channel = 2048, output channel = 2048, RF = global, effect: global spatial aggregation
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        # Add dropout before the final fully connected layer for regularization
        self.dropout = nn.Dropout(p=0.3)
        # fc: input channel = 2048, output channel = num_classes, RF = 1x1, effect: classification
        self.fc = nn.Linear(512 * block.expansion, num_classes)
        # Initialization
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
    def _make_layer(self, block, planes, blocks, stride=1):
        layers = []
        layers.append(block(self.in_planes, planes, stride))
        self.in_planes = planes * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.in_planes, planes))
        return nn.Sequential(*layers)
    def forward(self, x):
        x = self.relu(self.bn1(self.conv1(x)))
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.dropout(x)
        x = self.fc(x)
        return x

    def summary(self):
        """
        Prints a summary of the model architecture, including input/output channels, receptive field (RF), and effect for each main layer.
        """
        print("CustomResNet-50-like Model Summary:")
        print("Layer         | In Channels | Out Channels | RF    | Effect")
        print("-------------------------------------------------------------")
        print(f"Conv1         | 3           | 64           | 3x3   | Initial feature extraction")
        print(f"Layer1        | 64          | 256          | ~9x9  | Low-level features")
        print(f"Layer2        | 256         | 512          | ~21x21| Mid-level features")
        print(f"Layer3        | 512         | 1024         | ~45x45| High-level features")
        print(f"Layer4        | 1024        | 2048         | ~93x93| Semantic features")
        print(f"AvgPool/FC    | 2048        | {self.fc.out_features}         | Global| Classification")
        print("-------------------------------------------------------------")
        print("Note: RF is approximate and depends on stride and kernel sizes.")

# Function to create a custom ResNet-50-like model for CIFAR-100
# - Uses Bottleneck blocks and 3x3 conv for CIFAR
# - Layer config: [3, 4, 6, 3] (ResNet-50)
def get_resnet(num_classes=100):
    return CustomResNet(Bottleneck, [3, 4, 6, 3], num_classes=num_classes)

# Function to create data loaders for CIFAR-100
# - Applies advanced data augmentation and normalization for training
# - Normalizes test data
# - Returns train and test data loaders
# - Downloads data if not present
# - Uses batch size 128 for training, 100 for testing
# - Uses 2 worker threads for loading data
def get_dataloaders(batch_size=128):
    transform_train = transforms.Compose([
        transforms.RandomCrop(32, padding=4),
        transforms.RandomHorizontalFlip(),
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
        transforms.ToTensor(),
        transforms.RandomErasing(p=0.5, scale=(0.02, 0.2)),
        transforms.Normalize((0.5071, 0.4867, 0.4408), (0.2675, 0.2565, 0.2761)),
    ])
    transform_test = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.5071, 0.4867, 0.4408), (0.2675, 0.2565, 0.2761)),
    ])
    trainset = torchvision.datasets.CIFAR100(root='./data', train=True, download=True, transform=transform_train)
    trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=2)
    testset = torchvision.datasets.CIFAR100(root='./data', train=False, download=True, transform=transform_test)
    testloader = torch.utils.data.DataLoader(testset, batch_size=100, shuffle=False, num_workers=2)
    return trainloader, testloader

# CutMix implementation for regularization
def cutmix_data(x, y, alpha=1.0):
    '''
    Returns mixed inputs, pairs of targets, and lambda
    '''
    lam = np.random.beta(alpha, alpha)
    batch_size = x.size()[0]
    index = torch.randperm(batch_size).to(x.device)
    bbx1, bby1, bbx2, bby2 = rand_bbox(x.size(), lam)
    x[:, :, bbx1:bbx2, bby1:bby2] = x[index, :, bbx1:bbx2, bby1:bby2]
    y_a, y_b = y, y[index]
    return x, y_a, y_b, lam

def rand_bbox(size, lam):
    W = size[2]
    H = size[3]
    cut_rat = np.sqrt(1. - lam)
    cut_w = int(W * cut_rat)
    cut_h = int(H * cut_rat)
    cx = np.random.randint(W)
    cy = np.random.randint(H)
    bbx1 = np.clip(cx - cut_w // 2, 0, W)
    bby1 = np.clip(cy - cut_h // 2, 0, H)
    bbx2 = np.clip(cx + cut_w // 2, 0, W)
    bby2 = np.clip(cy + cut_h // 2, 0, H)
    return bbx1, bby1, bbx2, bby2

# MixUp implementation for regularization
# MixUp blends two images and their labels for better generalization

def mixup_data(x, y, alpha=1.0):
    '''Returns mixed inputs, pairs of targets, and lambda'''
    lam = np.random.beta(alpha, alpha)
    batch_size = x.size()[0]
    index = torch.randperm(batch_size).to(x.device)
    mixed_x = lam * x + (1 - lam) * x[index, :]
    y_a, y_b = y, y[index]
    return mixed_x, y_a, y_b, lam

# Function to train and evaluate the model
# - Performs training and validation (testing) for a specified number of epochs
# - Uses label smoothing, CutMix, cosine annealing, and mixed precision
# - Logs training and test loss/accuracy to a file and prints to console
# - Saves the best model based on test accuracy
def train(model, trainloader, testloader, device, epochs=100, lr=0.1, log_file='training_logs.md'):
    criterion = nn.CrossEntropyLoss(label_smoothing=0.1)
    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=5e-4)
    scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs)
    # Use torch.amp.GradScaler and torch.amp.autocast as per new API (without device_type argument)
    scaler = torch.amp.GradScaler() if torch.cuda.is_available() else None
    best_acc = 0.0
    train_losses, train_accs, test_losses, test_accs = [], [], [], []
    with open(log_file, 'w') as f:
        f.write('| Epoch | Train Loss | Train Acc (%) | Test Loss | Test Acc (%) |\n')
        f.write('|-------|------------|----------------|-----------|--------------|\n')
        for epoch in range(1, epochs+1):
            model.train()
            running_loss = 0.0
            correct_train = 0
            total_train = 0
            for inputs, targets in tqdm(trainloader, desc=f'Epoch {epoch}/{epochs}'):
                inputs, targets = inputs.to(device), targets.to(device)
                r = np.random.rand()
                # Alternate between CutMix and MixUp for regularization
                if r < 0.33:
                    inputs, targets_a, targets_b, lam = cutmix_data(inputs, targets)
                    with torch.amp.autocast('cuda', enabled=scaler is not None):
                        outputs = model(inputs)
                        loss = lam * criterion(outputs, targets_a) + (1 - lam) * criterion(outputs, targets_b)
                elif r < 0.66:
                    inputs, targets_a, targets_b, lam = mixup_data(inputs, targets)
                    with torch.amp.autocast('cuda', enabled=scaler is not None):
                        outputs = model(inputs)
                        loss = lam * criterion(outputs, targets_a) + (1 - lam) * criterion(outputs, targets_b)
                else:
                    with torch.amp.autocast('cuda', enabled=scaler is not None):
                        outputs = model(inputs)
                        loss = criterion(outputs, targets)
                optimizer.zero_grad()
                if scaler:
                    scaler.scale(loss).backward()
                    scaler.step(optimizer)
                    scaler.update()
                else:
                    loss.backward()
                    optimizer.step()
                running_loss += loss.item() * inputs.size(0)
                _, predicted = outputs.max(1)
                total_train += targets.size(0)
                correct_train += predicted.eq(targets).sum().item()
            train_loss = running_loss / len(trainloader.dataset)
            train_acc = 100. * correct_train / total_train
            train_losses.append(train_loss)
            train_accs.append(train_acc)
            # Validation (test) phase
            model.eval()
            test_loss = 0.0
            correct = 0
            total = 0
            with torch.no_grad():
                for inputs, targets in testloader:
                    inputs, targets = inputs.to(device), targets.to(device)
                    with torch.amp.autocast('cuda', enabled=scaler is not None):
                        outputs = model(inputs)
                        loss = criterion(outputs, targets)
                    test_loss += loss.item() * inputs.size(0)
                    _, predicted = outputs.max(1)
                    total += targets.size(0)
                    correct += predicted.eq(targets).sum().item()
            test_loss = test_loss / len(testloader.dataset)
            test_acc = 100. * correct / total
            test_losses.append(test_loss)
            test_accs.append(test_acc)
            # Log results to file and print to console
            f.write(f'| {epoch} | {train_loss:.4f} | {train_acc:.2f} | {test_loss:.4f} | {test_acc:.2f} |\n')
            print(f'Epoch {epoch}: Train Loss={train_loss:.4f}, Train Acc={train_acc:.2f}%, Test Loss={test_loss:.4f}, Test Acc={test_acc:.2f}%')
            # Save best model
            if test_acc > best_acc:
                best_acc = test_acc
                torch.save(model.state_dict(), 'best_resnet50_cifar100.pth')
            scheduler.step()
    # Plot graphs after training
    plt.figure(figsize=(12,5))
    plt.subplot(1,2,1)
    plt.plot(range(1, epochs+1), train_losses, label='Train Loss')
    plt.plot(range(1, epochs+1), test_losses, label='Test Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Loss vs Epoch')
    plt.legend()
    plt.subplot(1,2,2)
    plt.plot(range(1, epochs+1), train_accs, label='Train Accuracy')
    plt.plot(range(1, epochs+1), test_accs, label='Test Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.title('Accuracy vs Epoch')
    plt.legend()
    plt.tight_layout()
    plt.savefig('training_curves.png')
    plt.show()

# Main function to set up device, data loaders, model, and start training
# - Uses GPU if available, otherwise CPU
def main():
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    trainloader, testloader = get_dataloaders()
    model = get_resnet().to(device)
    train(model, trainloader, testloader, device)

if __name__ == '__main__':
    main()


Epoch 1/100: 100%|██████████| 391/391 [01:03<00:00,  6.13it/s]


Epoch 1: Train Loss=4.8258, Train Acc=1.10%, Test Loss=4.5535, Test Acc=1.71%


Epoch 2/100: 100%|██████████| 391/391 [01:03<00:00,  6.20it/s]


Epoch 2: Train Loss=4.4702, Train Acc=2.69%, Test Loss=4.2821, Test Acc=4.59%


Epoch 3/100: 100%|██████████| 391/391 [01:02<00:00,  6.22it/s]


Epoch 3: Train Loss=4.3185, Train Acc=4.59%, Test Loss=4.0798, Test Acc=7.31%


Epoch 4/100: 100%|██████████| 391/391 [01:02<00:00,  6.21it/s]


Epoch 4: Train Loss=4.2308, Train Acc=6.55%, Test Loss=3.9738, Test Acc=10.25%


Epoch 5/100: 100%|██████████| 391/391 [01:03<00:00,  6.17it/s]


Epoch 5: Train Loss=4.1457, Train Acc=8.64%, Test Loss=3.8205, Test Acc=14.10%


Epoch 6/100: 100%|██████████| 391/391 [01:03<00:00,  6.13it/s]


Epoch 6: Train Loss=4.0424, Train Acc=10.93%, Test Loss=3.6318, Test Acc=16.69%


Epoch 7/100: 100%|██████████| 391/391 [01:03<00:00,  6.13it/s]


Epoch 7: Train Loss=3.9368, Train Acc=13.16%, Test Loss=3.5397, Test Acc=18.85%


Epoch 8/100: 100%|██████████| 391/391 [01:03<00:00,  6.18it/s]


Epoch 8: Train Loss=3.8617, Train Acc=15.64%, Test Loss=3.4714, Test Acc=21.03%


Epoch 9/100: 100%|██████████| 391/391 [01:03<00:00,  6.17it/s]


Epoch 9: Train Loss=3.7306, Train Acc=18.72%, Test Loss=3.4340, Test Acc=22.96%


Epoch 10/100: 100%|██████████| 391/391 [01:03<00:00,  6.20it/s]


Epoch 10: Train Loss=3.6052, Train Acc=21.04%, Test Loss=3.3143, Test Acc=25.84%


Epoch 11/100: 100%|██████████| 391/391 [01:02<00:00,  6.22it/s]


Epoch 11: Train Loss=3.5350, Train Acc=23.70%, Test Loss=3.0423, Test Acc=31.59%


Epoch 12/100: 100%|██████████| 391/391 [01:02<00:00,  6.23it/s]


Epoch 12: Train Loss=3.4720, Train Acc=25.51%, Test Loss=2.8574, Test Acc=36.31%


Epoch 13/100: 100%|██████████| 391/391 [01:02<00:00,  6.22it/s]


Epoch 13: Train Loss=3.4003, Train Acc=26.93%, Test Loss=3.0723, Test Acc=30.86%


Epoch 14/100:  83%|████████▎ | 326/391 [00:52<00:10,  6.17it/s]


KeyboardInterrupt: 