<a href="https://colab.research.google.com/github/theboredman/CSE468/blob/main/Quiz_1/CNN/Using_CNN_CIFAR100_PyTorch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Setup

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR100

import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import os
from tqdm import tqdm

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

Using device: cuda


## Prepare the data

In [2]:
num_classes = 100
input_shape = (3, 32, 32)  # PyTorch uses (C, H, W) format

# Define transforms for training and testing
train_transform = transforms.Compose([
    transforms.Resize((72, 72)),  # Resize to match the original code
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=18),  # 0.1 * 180 degrees
    transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)),  # Random translation
    transforms.ColorJitter(brightness=0.2, contrast=0.2),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5071, 0.4867, 0.4408], std=[0.2675, 0.2565, 0.2761])
])

test_transform = transforms.Compose([
    transforms.Resize((72, 72)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5071, 0.4867, 0.4408], std=[0.2675, 0.2565, 0.2761])
])

# Load CIFAR-100 dataset
train_dataset = CIFAR100(root='./data', train=True, download=True, transform=train_transform)
test_dataset = CIFAR100(root='./data', train=False, download=True, transform=test_transform)

print(f"Training samples: {len(train_dataset)}")
print(f"Test samples: {len(test_dataset)}")

100%|██████████| 169M/169M [00:03<00:00, 43.4MB/s]


Training samples: 50000
Test samples: 10000


## Configure the hyperparameters

In [3]:
learning_rate = 0.001
weight_decay = 0.0001
batch_size = 128  # Reduced for better gradient updates
num_epochs = 10  # Increased for better training
image_size = 72  # We'll resize input images to this size
mlp_head_units = [
    2048,
    1024,
]  # Size of the dense layers of the final classifier

# New hyperparameters for improved training
initial_learning_rate = 0.001
label_smoothing = 0.1
dropout_rate = 0.3

## Implement multilayer perceptron (MLP)

In [4]:
class MLP(nn.Module):
    def __init__(self, input_dim, hidden_units, dropout_rate):
        super(MLP, self).__init__()
        layers = []
        prev_dim = input_dim

        for units in hidden_units:
            layers.extend([
                nn.Linear(prev_dim, units),
                nn.GELU(),
                nn.Dropout(dropout_rate)
            ])
            prev_dim = units

        self.layers = nn.Sequential(*layers)

    def forward(self, x):
        return self.layers(x)

## CNN Architecture

We'll build a CNN with convolutional layers for feature extraction followed by dense layers for classification.

In [5]:
class SqueezeExcitationBlock(nn.Module):
    """Squeeze-and-Excitation block for attention mechanism"""
    def __init__(self, channels, ratio=16):
        super(SqueezeExcitationBlock, self).__init__()
        self.global_pool = nn.AdaptiveAvgPool2d(1)
        self.fc1 = nn.Linear(channels, channels // ratio)
        self.fc2 = nn.Linear(channels // ratio, channels)
        self.swish = nn.SiLU()  # Swish activation
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        b, c, _, _ = x.size()
        se = self.global_pool(x).view(b, c)
        se = self.swish(self.fc1(se))
        se = self.sigmoid(self.fc2(se)).view(b, c, 1, 1)
        return x * se

class ImprovedResidualBlock(nn.Module):
    """Enhanced residual block with Group Normalization and Swish activation"""
    def __init__(self, in_channels, out_channels, kernel_size=3, stride=1, use_se=True):
        super(ImprovedResidualBlock, self).__init__()
        self.use_se = use_se

        # First conv layer with Group Normalization
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding=kernel_size//2, bias=False)
        self.gn1 = nn.GroupNorm(8, out_channels)

        # Second conv layer
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size, 1, padding=kernel_size//2, bias=False)
        self.gn2 = nn.GroupNorm(8, out_channels)

        # Squeeze-and-Excitation
        if use_se:
            self.se = SqueezeExcitationBlock(out_channels)

        # Shortcut connection
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, 1, stride, bias=False),
                nn.GroupNorm(8, out_channels)
            )
        else:
            self.shortcut = nn.Identity()

        self.swish = nn.SiLU()

    def forward(self, x):
        residual = self.shortcut(x)

        out = self.swish(self.gn1(self.conv1(x)))
        out = self.gn2(self.conv2(out))

        if self.use_se:
            out = self.se(out)

        out = out + residual
        out = self.swish(out)
        return out

class EfficientConvBlock(nn.Module):
    """EfficientNet-style inverted residual block"""
    def __init__(self, in_channels, out_channels, kernel_size=3, stride=1, expansion_factor=4):
        super(EfficientConvBlock, self).__init__()
        expanded_channels = out_channels * expansion_factor

        # Expansion phase
        if expansion_factor != 1:
            self.expand = nn.Sequential(
                nn.Conv2d(in_channels, expanded_channels, 1, bias=False),
                nn.GroupNorm(8, expanded_channels),
                nn.SiLU()
            )
            in_channels = expanded_channels
        else:
            self.expand = nn.Identity()

        # Depthwise convolution
        self.depthwise = nn.Sequential(
            nn.Conv2d(in_channels, in_channels, kernel_size, stride, padding=kernel_size//2,
                     groups=in_channels, bias=False),
            nn.GroupNorm(8, in_channels),
            nn.SiLU()
        )

        # Squeeze-and-Excitation
        self.se = SqueezeExcitationBlock(in_channels)

        # Projection phase
        self.project = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 1, bias=False),
            nn.GroupNorm(8, out_channels)
        )

    def forward(self, x):
        x = self.expand(x)
        x = self.depthwise(x)
        x = self.se(x)
        x = self.project(x)
        return x

class CBAMAttentionBlock(nn.Module):
    """Convolutional Block Attention Module (CBAM)"""
    def __init__(self, channels, ratio=16):
        super(CBAMAttentionBlock, self).__init__()
        # Channel Attention
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.max_pool = nn.AdaptiveMaxPool2d(1)
        self.fc1 = nn.Linear(channels, channels // ratio)
        self.fc2 = nn.Linear(channels // ratio, channels)
        self.swish = nn.SiLU()
        self.sigmoid = nn.Sigmoid()

        # Spatial Attention
        self.spatial_conv = nn.Conv2d(2, 1, 7, padding=3, bias=False)

    def forward(self, x):
        b, c, h, w = x.size()

        # Channel Attention
        avg_pool = self.avg_pool(x).view(b, c)
        max_pool = self.max_pool(x).view(b, c)

        avg_out = self.sigmoid(self.fc2(self.swish(self.fc1(avg_pool))))
        max_out = self.sigmoid(self.fc2(self.swish(self.fc1(max_pool))))

        channel_attention = (avg_out + max_out).view(b, c, 1, 1)
        x = x * channel_attention

        # Spatial Attention
        avg_pool_spatial = torch.mean(x, dim=1, keepdim=True)
        max_pool_spatial, _ = torch.max(x, dim=1, keepdim=True)
        spatial_concat = torch.cat([avg_pool_spatial, max_pool_spatial], dim=1)

        spatial_attention = self.sigmoid(self.spatial_conv(spatial_concat))
        x = x * spatial_attention

        return x

class CNNClassifier(nn.Module):
    def __init__(self, num_classes=100, mlp_head_units=[2048, 1024]):
        super(CNNClassifier, self).__init__()

        # Stem: Initial feature extraction
        self.stem = nn.Sequential(
            nn.Conv2d(3, 48, 3, padding=1, bias=False),
            nn.GroupNorm(8, 48),
            nn.SiLU()
        )

        # Initial attention
        self.initial_attention = CBAMAttentionBlock(48)

        # Stage 1: Enhanced residual blocks with attention
        self.stage1 = nn.Sequential(
            ImprovedResidualBlock(48, 64, stride=1, use_se=True),
            ImprovedResidualBlock(64, 64, stride=1, use_se=True),
            ImprovedResidualBlock(64, 64, stride=2, use_se=True),
            nn.Dropout2d(0.15)
        )

        # Stage 2: EfficientNet-style blocks
        self.stage2 = nn.Sequential(
            EfficientConvBlock(64, 96, stride=1, expansion_factor=4),
            EfficientConvBlock(96, 96, stride=1, expansion_factor=4),
            CBAMAttentionBlock(96),
            EfficientConvBlock(96, 96, stride=2, expansion_factor=4),
            nn.Dropout2d(0.2)
        )

        # Stage 3: Mixed convolution types
        self.stage3 = nn.Sequential(
            ImprovedResidualBlock(96, 144, stride=1, use_se=True),
            EfficientConvBlock(144, 144, stride=1, expansion_factor=6),
            ImprovedResidualBlock(144, 144, stride=1, use_se=True),
            CBAMAttentionBlock(144),
            nn.AvgPool2d(2),
            nn.Dropout2d(0.25)
        )

        # Stage 4: High-level feature extraction
        self.stage4 = nn.Sequential(
            EfficientConvBlock(144, 192, stride=1, expansion_factor=6),
            ImprovedResidualBlock(192, 192, stride=1, use_se=True),
            EfficientConvBlock(192, 192, stride=1, expansion_factor=6),
            CBAMAttentionBlock(192),
            nn.Dropout2d(0.3)
        )

        # Stage 5: Final feature maps
        self.stage5 = nn.Sequential(
            ImprovedResidualBlock(192, 256, stride=1, use_se=True),
            EfficientConvBlock(256, 256, stride=1, expansion_factor=8)
        )

        # Multi-scale feature aggregation
        self.global_avg_pool = nn.AdaptiveAvgPool2d(1)
        self.global_max_pool = nn.AdaptiveMaxPool2d(1)

        # Enhanced classifier head with Ghost modules
        self.classifier_head = nn.Sequential(
            nn.Linear(512, 768, bias=False),  # 256*2 from concatenated pooling
            nn.GroupNorm(1, 768),  # LayerNorm equivalent
            nn.SiLU(),
            nn.Dropout(0.5),

            # Ghost bottleneck
            nn.Linear(768, 384, bias=False),
            nn.GroupNorm(1, 384),
            nn.SiLU(),
            nn.Dropout(0.4)
        )

        # MLP classification head
        self.mlp = MLP(384, mlp_head_units, 0.5)

        # Final classification layer
        self.final_classifier = nn.Linear(mlp_head_units[-1], num_classes)

        # Initialize weights
        self._initialize_weights()

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, nn.Linear):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, (nn.GroupNorm, nn.BatchNorm2d)):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    def forward(self, x):
        # Stem
        x = self.stem(x)
        x = self.initial_attention(x)

        # Stages
        x = self.stage1(x)
        x = self.stage2(x)
        x = self.stage3(x)
        x = self.stage4(x)
        x = self.stage5(x)

        # Multi-scale pooling
        gap = self.global_avg_pool(x).flatten(1)
        gmp = self.global_max_pool(x).flatten(1)
        x = torch.cat([gap, gmp], dim=1)

        # Classifier
        x = self.classifier_head(x)
        x = self.mlp(x)
        x = self.final_classifier(x)

        return x

## Training and Evaluation Functions

In [6]:
class LabelSmoothingCrossEntropy(nn.Module):
    """Label smoothing cross entropy loss"""
    def __init__(self, smoothing=0.1):
        super(LabelSmoothingCrossEntropy, self).__init__()
        self.smoothing = smoothing

    def forward(self, pred, target):
        log_prob = F.log_softmax(pred, dim=-1)
        weight = pred.new_ones(pred.size()) * self.smoothing / (pred.size(-1) - 1.)
        weight.scatter_(-1, target.unsqueeze(-1), (1. - self.smoothing))
        loss = (-weight * log_prob).sum(dim=-1).mean()
        return loss

def calculate_accuracy(outputs, targets, topk=(1, 5, 10)):
    """Calculate top-k accuracy"""
    with torch.no_grad():
        maxk = max(topk)
        batch_size = targets.size(0)

        _, pred = outputs.topk(maxk, 1, True, True)
        pred = pred.t()
        correct = pred.eq(targets.view(1, -1).expand_as(pred))

        res = []
        for k in topk:
            correct_k = correct[:k].reshape(-1).float().sum(0, keepdim=True)
            res.append(correct_k.mul_(100.0 / batch_size))
        return res

def train_epoch(model, train_loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    pbar = tqdm(train_loader, desc='Training')
    for batch_idx, (data, target) in enumerate(pbar):
        data, target = data.to(device), target.to(device)

        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = output.max(1)
        total += target.size(0)
        correct += predicted.eq(target).sum().item()

        pbar.set_postfix({
            'Loss': f'{running_loss/(batch_idx+1):.3f}',
            'Acc': f'{100.*correct/total:.2f}%'
        })

    return running_loss / len(train_loader), 100. * correct / total

def validate(model, val_loader, criterion, device):
    model.eval()
    val_loss = 0.0
    top1_correct = 0
    top5_correct = 0
    top10_correct = 0
    total = 0

    with torch.no_grad():
        pbar = tqdm(val_loader, desc='Validation')
        for data, target in pbar:
            data, target = data.to(device), target.to(device)
            output = model(data)
            val_loss += criterion(output, target).item()

            # Calculate top-k accuracies
            acc1, acc5, acc10 = calculate_accuracy(output, target, topk=(1, 5, 10))
            top1_correct += acc1.item() * target.size(0) / 100
            top5_correct += acc5.item() * target.size(0) / 100
            top10_correct += acc10.item() * target.size(0) / 100
            total += target.size(0)

            pbar.set_postfix({
                'Loss': f'{val_loss/(len(pbar.dataset)):.3f}',
                'Top1': f'{100.*top1_correct/total:.2f}%',
                'Top5': f'{100.*top5_correct/total:.2f}%'
            })

    return (val_loss / len(val_loader),
            100. * top1_correct / total,
            100. * top5_correct / total,
            100. * top10_correct / total)

def cosine_annealing_lr(epoch, total_epochs, initial_lr):
    """Cosine annealing learning rate schedule"""
    return initial_lr * 0.5 * (1 + np.cos(np.pi * epoch / total_epochs))

def train_model(model, train_loader, val_loader, num_epochs, device):
    # Loss function with label smoothing
    criterion = LabelSmoothingCrossEntropy(smoothing=label_smoothing)

    # Optimizer
    optimizer = optim.AdamW(model.parameters(),
                           lr=initial_learning_rate,
                           weight_decay=weight_decay,
                           betas=(0.9, 0.999),
                           eps=1e-7)

    # Learning rate scheduler
    scheduler = optim.lr_scheduler.LambdaLR(optimizer,
                                           lambda epoch: cosine_annealing_lr(epoch, num_epochs, 1.0))

    # Training history
    history = {
        'train_loss': [], 'train_acc': [],
        'val_loss': [], 'val_acc': [], 'val_top5_acc': [], 'val_top10_acc': [],
        'lr': []
    }

    best_val_acc = 0.0
    patience_counter = 0
    patience = 10

    for epoch in range(num_epochs):
        print(f'\nEpoch {epoch+1}/{num_epochs}')
        print('-' * 50)

        # Training
        train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)

        # Validation
        val_loss, val_acc, val_top5_acc, val_top10_acc = validate(model, val_loader, criterion, device)

        # Update learning rate
        scheduler.step()
        current_lr = optimizer.param_groups[0]['lr']

        # Save history
        history['train_loss'].append(train_loss)
        history['train_acc'].append(train_acc)
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc)
        history['val_top5_acc'].append(val_top5_acc)
        history['val_top10_acc'].append(val_top10_acc)
        history['lr'].append(current_lr)

        print(f'Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%')
        print(f'Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%, Val Top5: {val_top5_acc:.2f}%, Val Top10: {val_top10_acc:.2f}%')
        print(f'Learning Rate: {current_lr:.6f}')

        # Early stopping and model checkpoint
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            patience_counter = 0
            torch.save(model.state_dict(), 'best_model.pth')
            print(f'New best validation accuracy: {best_val_acc:.2f}%')
        else:
            patience_counter += 1

        if patience_counter >= patience:
            print(f'Early stopping at epoch {epoch+1}')
            break

    # Load best model
    model.load_state_dict(torch.load('best_model.pth'))

    return model, history

## Compile, train, and evaluate the model

In [7]:
# Create data loaders
# Split training data for validation
train_size = int(0.8 * len(train_dataset))
val_size = len(train_dataset) - train_size
train_subset, val_subset = torch.utils.data.random_split(train_dataset, [train_size, val_size])

train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True, num_workers=2, pin_memory=True)
val_loader = DataLoader(val_subset, batch_size=batch_size, shuffle=False, num_workers=2, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=2, pin_memory=True)

# Create model
model = CNNClassifier(num_classes=num_classes, mlp_head_units=mlp_head_units).to(device)

# Print model summary
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print("Model Architecture:")
print(model)
print(f"\nTotal trainable parameters: {count_parameters(model):,}")

# Train the model
print("\nStarting training...")
model, history = train_model(model, train_loader, val_loader, num_epochs, device)

# Evaluate on test set
print("\nEvaluating on test set...")
criterion = LabelSmoothingCrossEntropy(smoothing=0.0)  # No smoothing for evaluation
test_loss, test_acc, test_top5_acc, test_top10_acc = validate(model, test_loader, criterion, device)

print(f"\nFinal Test Results:")
print(f"Test accuracy: {test_acc:.2f}%")
print(f"Test top 5 accuracy: {test_top5_acc:.2f}%")
print(f"Test top 10 accuracy: {test_top10_acc:.2f}%")
print(f"Test loss: {test_loss:.4f}")

Model Architecture:
CNNClassifier(
  (stem): Sequential(
    (0): Conv2d(3, 48, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (1): GroupNorm(8, 48, eps=1e-05, affine=True)
    (2): SiLU()
  )
  (initial_attention): CBAMAttentionBlock(
    (avg_pool): AdaptiveAvgPool2d(output_size=1)
    (max_pool): AdaptiveMaxPool2d(output_size=1)
    (fc1): Linear(in_features=48, out_features=3, bias=True)
    (fc2): Linear(in_features=3, out_features=48, bias=True)
    (swish): SiLU()
    (sigmoid): Sigmoid()
    (spatial_conv): Conv2d(2, 1, kernel_size=(7, 7), stride=(1, 1), padding=(3, 3), bias=False)
  )
  (stage1): Sequential(
    (0): ImprovedResidualBlock(
      (conv1): Conv2d(48, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (gn1): GroupNorm(8, 64, eps=1e-05, affine=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (gn2): GroupNorm(8, 64, eps=1e-05, affine=True)
      (se): SqueezeExcitation

Training:  95%|█████████▍| 296/313 [02:23<00:08,  2.06it/s, Loss=4.798, Acc=1.05%]


KeyboardInterrupt: 

## Plot Training History

In [None]:
def plot_training_history(history):
    """Plot comprehensive training history"""
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))

    # Plot loss
    axes[0, 0].plot(history['train_loss'], label='Training Loss')
    axes[0, 0].plot(history['val_loss'], label='Validation Loss')
    axes[0, 0].set_title('Model Loss')
    axes[0, 0].set_xlabel('Epoch')
    axes[0, 0].set_ylabel('Loss')
    axes[0, 0].legend()
    axes[0, 0].grid(True)

    # Plot accuracy
    axes[0, 1].plot(history['train_acc'], label='Training Accuracy')
    axes[0, 1].plot(history['val_acc'], label='Validation Accuracy')
    axes[0, 1].set_title('Model Accuracy')
    axes[0, 1].set_xlabel('Epoch')
    axes[0, 1].set_ylabel('Accuracy (%)')
    axes[0, 1].legend()
    axes[0, 1].grid(True)

    # Plot top-5 accuracy
    axes[1, 0].plot(history['val_top5_acc'], label='Validation Top-5 Accuracy')
    axes[1, 0].plot(history['val_top10_acc'], label='Validation Top-10 Accuracy')
    axes[1, 0].set_title('Model Top-K Accuracy')
    axes[1, 0].set_xlabel('Epoch')
    axes[1, 0].set_ylabel('Accuracy (%)')
    axes[1, 0].legend()
    axes[1, 0].grid(True)

    # Plot learning rate
    axes[1, 1].plot(history['lr'])
    axes[1, 1].set_title('Learning Rate Schedule')
    axes[1, 1].set_xlabel('Epoch')
    axes[1, 1].set_ylabel('Learning Rate')
    axes[1, 1].set_yscale('log')
    axes[1, 1].grid(True)

    plt.tight_layout()
    plt.show()

# Plot the training history
plot_training_history(history)