# Neural Networks and Deep Learning coursework

---

# Student ID:210899247


**Google Drive Setup**

---

Following the lecture explanations, I mounted my Google Drive so I could use external file "my_utils.py" provided on qmplus inside the Colab environment. In addition, to find the file, I have added path to the directory where its stored using sys.path.append.

In [1]:
# === Google Drive Setup ===
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)
# Mounting Google Drive so I can access the file stored there.


import sys
sys.path.append('/content/gdrive/MyDrive/Colab Notebooks')
# Adding the path where my my_utils.py is saved so I can import it like a normal Python module.

Mounted at /content/gdrive


**Importing necessary packages**

---

Here I am setting up all imports I would need for this coursework. This includes everything, from PyTorch and torchvision to helper libraries like tqdm for progress bars and matplotlib for visualisations of the accuracy results. I am also importing my_utils to use it within the assignment.

In [2]:
# === Imports ===

# The custom utility file from qmplus with helper functions I used in different parts of the training loop
import my_utils as mu

# Core PyTorch library
import torch

# Needed for building neural networks
from torch import nn

# Useful PyTorch functions for things like activations and loss computations
import torch.nn.functional as F

# Torchvision gives access to standard datasets like CIFAR-10 and includes transforms
import torchvision

# Contains predefined transforms for image preprocessing and augmentation
from torchvision import transforms

# For loading data in batches and shuffling
from torch.utils.data import DataLoader

# For plotting accuracy/loss graphs at the end
import matplotlib.pyplot as plt

# NumPy is helpful for generating random values (used in mixup, for instance)
import numpy as np

# Adds a progress bar to loops — super handy to monitor training
from tqdm import tqdm

**Mixup functions**

---

In this part I implement mixup which is a data augmentation technique that blends two images and their labels. I only apply it during first few epoches of training to help the model generalise better and reduce overfitting. I have also used a helper function mixup_criterion here that computes the loss using mixedup targets

In [3]:
def mixup_data(x, y, alpha=1.0):
    '''Returns mixed inputs, pairs of targets, and lambda'''

    # If alpha is positive, we sample lambda from the beta distribution. This controls how much mixing happens.
    # The closer lambda is to 1 or 0, the more like one sample it will be.
    if alpha > 0:
        lam = np.random.beta(alpha, alpha)
    else:
        lam = 1  # No mixing at all if alpha is zero

    batch_size = x.size()[0]  # Number of samples in the batch
    index = torch.randperm(batch_size).to(x.device)  # Random permutation of the batch indices, used to shuffle

    # Mix the images: a weighted sum of the original batch and a randomly shuffled batch
    mixed_x = lam * x + (1 - lam) * x[index, :]

    # This allows us to keep track of the original labels and the shuffled labels
    y_a, y_b = y, y[index]

    return mixed_x, y_a, y_b, lam  # Returns everything needed for computing the mixed-up loss

def mixup_criterion(criterion, pred, y_a, y_b, lam):
    # Loss is also blended: proportionally apply the loss to both original and shuffled labels
    return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)


**Device Setup**

---

This part sets up the computation device to GPU to train faster. Since I am training on CIFAR-10, which is quite slow on CPU

In [4]:
# === Device Setup ===
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Running on: {device}")  # This helps verify whether we're using the GPU or CPU


Running on: cpu


**Data Transformations**

---

Here I am setting up data augmentation and normalisation for CIFAR-10 dataset. I have also applied few common data augmentation techniques like random horizontal flipping, random cropping with padding, and color jitter to add slight variations to brightness, contrast, and saturation. This helps the model generalise better by not overfitting to fixed patterns in the training data.
Lastly, these transformations are all composed and will be applied to both training and validation(testing) datasets

In [5]:
# === Data Transformations ===
# Normalize using CIFAR-10 statistics: mean/std per channel
cifar_mean = (0.4915, 0.4823, 0.4466)  # channel-wise means for RGB
cifar_std = (0.2024, 0.1995, 0.2011)   # channel-wise std devs for RGB

transform_config = transforms.Compose([
    transforms.RandomHorizontalFlip(),  # randomly flip the image horizontally
    transforms.RandomCrop(32, padding=4),  # randomly crop image with padding to preserve structure
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),  # add color variation
    transforms.ToTensor(),  # convert image from PIL to tensor
    transforms.Normalize(cifar_mean, cifar_std)  # normalize with CIFAR-10 stats
])


**Dataset Download and Preprocessing**

---

Here I am loading CIFAR-10 dataset. When I load the datasets, I pass in the transformation pipeline I defined earlier so that all the augmentation and normalisation is automatically applied to each image when it is loaded. In addition, I explicitly set download=True to download the dataset and set train=True for training set and train=Falase for the test (validation) set

In [6]:
# === Dataset Download and Preprocessing ===
train_data = torchvision.datasets.CIFAR10(root='./data', train=True, transform=transform_config, download=True)
val_data = torchvision.datasets.CIFAR10(root='./data', train=False, transform=transform_config, download=True)


100%|██████████| 170M/170M [00:10<00:00, 16.1MB/s]


**Data Loader Setup**

---

In this part I set up the data loaders for both training and validation dataset. I used a batch size of 256, which works well with my GPU and provides a good balance between performance and memory usage.

In [7]:
# === Data Loaders ===

# Batch size used for training and validation
loader_batch_size = 256

# DataLoader for training data
# - shuffle=True randomizes the data order each epoch for better generalization
# - num_workers=2 enables two threads to load data in parallel
train_loader = DataLoader(train_data, batch_size=loader_batch_size, shuffle=True, num_workers=2)

# DataLoader for validation data
# - shuffle=False keeps order consistent for accurate validation
val_loader = DataLoader(val_data, batch_size=loader_batch_size, shuffle=False, num_workers=2)


**Meta Info**

---

I am printing the labels here, also I use a function to increase the number of CPU threads Pytorch can use. This makes the data loading process more effective

In [8]:
# === Meta Info ===
class_labels = train_data.classes  # Store CIFAR-10 class names for later reference
print("CIFAR-10 Classes:", class_labels)  # Print them to make sure they're correct

torch.set_num_threads(8)  # Use more CPU threads to speed up data loading and preprocessing


CIFAR-10 Classes: ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']


**Model architecture - Stem Block**

---

Here I am building a StemBlock which is the initial layer of my CNN model. It's responsible for receiving the input image and extracting low level features like edges or textures. The block takes the raw image and extracts the initial feature map. This sets up the input so it's in the right form for the deeper layers in the network to process

In [9]:
class StemBlock(nn.Module):
    # This is the very first part of the model — the "stem".
    # It takes the raw image input and applies one convolution to start feature extraction.
    def __init__(self, in_channels=3, out_channels=48):
        # I’m using 3 input channels because CIFAR-10 images are RGB.
        # I chose 48 output channels to give a slightly wider feature representation without being too heavy.
        super(StemBlock, self).__init__()

        # This is the convolution layer.
        # I use a 3x3 kernel, stride of 1, and padding of 1 so that the output has the same spatial size as the input.
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=1, padding=1)

        # After convolution, I normalize the output using BatchNorm.
        # This helps the training converge faster and be more stable.
        self.bn = nn.BatchNorm2d(out_channels)

        # Finally, I add a ReLU activation to introduce non-linearity.
        # It’s standard practice and helps the network learn more complex patterns.
        self.act = nn.ReLU()

    def forward(self, x):
        # In the forward pass, first I apply the convolution layer
        x = self.conv(x)

        # Then I apply BatchNorm to the conv output
        x = self.bn(x)

        # Lastly, I activate the result with ReLU
        return self.act(x)


**Model Architecture - Routing Block - Backbone (B1 to BN)**

---

This is the backbone of my model, here the learning happens. Each block has an attention mechanism that decides how to combine convolutional experts. I use global average pooling and two linear layers and softmax to generate the attention weights. Then I apply each expert separately and blend their outputs using thsoe weights. Finally, I normalise, drop out, and activate the results.

In [10]:
class ExpertRoutingBlock(nn.Module):
    def __init__(self, features, num_experts=2, squeeze_ratio=4):
        # This block allows the model to pick between multiple convolution paths.
        # 'features' is the number of channels coming in and out of the block.
        # 'num_experts' is the number of parallel paths (like 2 specialists).
        # 'squeeze_ratio' controls the bottleneck size in the attention MLP.
        super(ExpertRoutingBlock, self).__init__()
        self.num_experts = num_experts

        # Here I create multiple parallel convolution layers (the experts).
        # Each one sees the same input, but the model learns to weigh them differently per input.
        self.expert_convs = nn.ModuleList([
            nn.Conv2d(features, features, kernel_size=3, stride=1, padding=1)
            for _ in range(num_experts)
        ])

        # This is a global average pooling that reduces each feature map to a single value.
        # It's used for computing attention weights later.
        self.attention_pool = nn.AdaptiveAvgPool2d(output_size=1)

        # First FC layer of the attention MLP — reduces dimension (bottleneck).
        self.attention_fc1 = nn.Linear(features, features // squeeze_ratio)

        # Second FC layer — outputs logits for each expert (routing weights).
        self.attention_fc2 = nn.Linear(features // squeeze_ratio, num_experts)

        # BatchNorm after combining the expert outputs
        self.norm = nn.BatchNorm2d(features)

        # ReLU activation after normalization and dropout
        self.activate = nn.ReLU()

    def forward(self, x):
        # Get batch size (B), channels (C), height (H), width (W)
        B, C, H, W = x.shape

        # Perform global average pooling, then flatten to (B, C)
        pooled = self.attention_pool(x).view(B, C)

        # Pass through first FC layer of the MLP and apply ReLU
        bottleneck = F.relu(self.attention_fc1(pooled))

        # Get the attention weights by softmax over the logits from the second FC
        weights = F.softmax(self.attention_fc2(bottleneck), dim=1)  # shape: (B, num_experts)

        # Run the input through all expert convs in parallel and stack results
        # Output shape: (B, num_experts, C, H, W)
        conv_outputs = torch.stack([conv(x) for conv in self.expert_convs], dim=1)

        # Reshape attention weights to broadcast across feature maps
        weights = weights.view(B, self.num_experts, 1, 1, 1)

        # Compute the weighted sum of the expert outputs
        blended = (weights * conv_outputs).sum(dim=1)  # shape: (B, C, H, W)

        # Normalize the combined result, apply dropout and ReLU
        blended = self.norm(blended)
        blended = F.dropout(blended, p=0.25, training=self.training)
        return self.activate(blended)


**Model Architecture - RoutingNet - Classifier(C)**

---

This is the classification head of my model. After feature map have gone through backbone, I apply global average pooling to condense the spatial dimensions. Then I apply dropout for reguralisation and a fully connected layer to map to the 10 output classes of CIFAR-10.

In [11]:
class RoutingNet(nn.Module):
    def __init__(self, num_layers=6, num_paths=2, hidden_channels=256, num_outputs=10):
        super(RoutingNet, self).__init__()

        # === Stem Block ===
        # This is the very first convolutional layer.
        # It takes the raw image and extracts some initial low-level features.
        self.stem = StemBlock(in_channels=3, out_channels=hidden_channels)

        # === Backbone ===
        # This is the main part of the network — it's a sequence of ExpertRoutingBlocks.
        # Each block contains multiple expert conv layers and attention-based routing.
        # I'm stacking `num_layers` of them here.
        self.blocks = nn.Sequential(*[
            ExpertRoutingBlock(features=hidden_channels, num_experts=num_paths)
            for _ in range(num_layers)
        ])

        # === Classifier Head ===
        # This part summarizes the feature map and produces the final class logits.

        # This is global average pooling: it collapses the HxW spatial dimensions into a single number per channel.
        # That way, I get a fixed-size feature vector of shape (batch_size, channels).
        self.avg_pool = nn.AdaptiveAvgPool2d(output_size=1)

        # This dropout layer is used to prevent overfitting before the final layer.
        # It's applied after pooling but before classification.
        self.drop = nn.Dropout(0.2)

        # This is the final linear layer.
        # It maps the pooled feature vector (of size `hidden_channels`) to 10 output classes for CIFAR-10.
        self.head = nn.Linear(hidden_channels, num_outputs)

        # I call a custom weight initialization method to initialize convs and linears properly.
        self._initialize_weights()

    def forward(self, x):
        # First, I pass the image through the stem to get low-level features.
        x = self.stem(x)

        # Then the features go through the full sequence of routing blocks.
        x = self.blocks(x)

        # After the backbone, I pool the features to remove spatial dimensions (global avg pool).
        x = self.avg_pool(x).flatten(1)

        # I apply dropout for regularization.
        x = self.drop(x)

        # Finally, I pass the feature vector to the linear layer to get the class scores.
        return self.head(x)

    def _initialize_weights(self):
        # This helper function initializes all conv and linear layers.
        # I use Kaiming initialization for ReLU-based models, which helps training stability.
        for layer in self.modules():
            if isinstance(layer, nn.Conv2d) or isinstance(layer, nn.Linear):
                nn.init.kaiming_normal_(layer.weight, nonlinearity='relu')
                if layer.bias is not None:
                    nn.init.constant_(layer.bias, 0)
# Creating an instance of my custom RoutingNet model
# I'm setting:
# - num_layers=5: the number of ExpertRoutingBlocks (this forms the backbone of the network)
# - num_paths=2: each block has 2 expert convolution branches (like 2 parallel specialists)
# - hidden_channels=164: the number of channels for the internal feature maps (kind of like the width of the network)
# I send the whole model to the GPU if available (for speed) using `.to(device)`
net = RoutingNet(num_layers=5, num_paths=2, hidden_channels=164).to(device)



**Training Configuration: Loss, Optimiser and Scheduler**

---

Here I define all the core training hyperparameters and components needed to optimise the network: the loss function, optimiser, and the learning rate scheduler. These are all necessary to train the model effectively.

In [12]:
# === Loss Function ===
# I’m using CrossEntropyLoss since it’s the standard loss function for classification tasks.
# I added a small amount of label smoothing (0.005) to improve generalization by preventing the model from being too confident.
loss_fn = nn.CrossEntropyLoss(label_smoothing=0.005)

# === Optimizer ===
# I’m using SGD (Stochastic Gradient Descent), which is commonly used in computer vision tasks.
# I chose a high initial learning rate of 0.19 and added momentum (0.9) for faster convergence.
# Weight decay (5e-4) is used for L2 regularization to help prevent overfitting.
optimizer = torch.optim.SGD(net.parameters(), lr=0.19, momentum=0.9, weight_decay=5e-4)

# This dropout variable was previously used in the classifier but defined here — I left it in case I needed it dynamically.
dropout = 0.3

# === Learning Rate Scheduler ===
# I’m using cosine annealing for learning rate scheduling over 35 epochs.
# This gradually reduces the learning rate following a cosine curve down to a minimum of 1e-4.
# I do it to improve final convergence and avoid getting stuck in bad minima.
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=35, eta_min=1e-4)


**Training Loop and Evaluation**

---

In this section I have implemented the full training pipeline. This includes the model over multiple epochs, applying mixup data augmentaton for the first 25 epochs, computing training and testing metrics, and stepping the cosine annealing scheduler. I have also added logging for accuracy and loss values across epochs

In [None]:
# I decided to train for 35 epochs to give the model enough time to converge.
total_epochs = 35

# I’ll track the best validation accuracy (useful if I wanted to save best model).
best_val_acc = 0.0

# These lists will keep track of losses and accuracies for plotting and analysis later.
train_losses, val_losses = [], []
train_scores, val_scores = [], []

# Now starting the main training loop
for epoch in range(total_epochs):
    # Set the model in training mode so things like dropout work properly
    net.train()

    # These help track the cumulative loss and correct predictions during the epoch
    epoch_loss = 0.0
    correct_preds, seen_samples = 0, 0

    # Loop through batches in the training set
    for inputs, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{total_epochs}"):
        # Move data to GPU if available
        inputs, labels = inputs.to(device), labels.to(device)

        # Clear any gradients from the last step
        optimizer.zero_grad()

        # For the first 25 epochs, I apply Mixup regularization
        if epoch < 25:
            inputs, targets_a, targets_b, lam = mixup_data(inputs, labels, alpha=0.1)
            outputs = net(inputs)
            loss = mixup_criterion(loss_fn, outputs, targets_a, targets_b, lam)
        else:
            # After epoch 25, I go back to using standard training
            outputs = net(inputs)
            loss = loss_fn(outputs, labels)

        # Backpropagation to compute gradients
        loss.backward()

        # Update model weights based on computed gradients
        optimizer.step()

        # Accumulate loss (scaled by batch size) and count correct predictions
        epoch_loss += loss.item() * inputs.size(0)
        preds = outputs.argmax(dim=1)  # Get predicted class
        correct_preds += (preds == labels).sum().item()
        seen_samples += labels.size(0)

    # After the training loop, I step the scheduler to update the learning rate
    scheduler.step()

    # Calculate and store average training loss and accuracy for the epoch
    avg_train_loss = epoch_loss / seen_samples
    train_acc = 100. * correct_preds / seen_samples
    train_losses.append(avg_train_loss)
    train_scores.append(train_acc)

    # === VALIDATION ===
    # Switch model to evaluation mode — turns off things like dropout
    net.eval()
    val_loss = 0.0
    val_correct, val_total = 0, 0

    # I don't need gradients when evaluating, so I wrap in torch.no_grad
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = net(inputs)
            loss = loss_fn(outputs, labels)

            # Accumulate validation loss and accuracy
            val_loss += loss.item() * inputs.size(0)
            preds = outputs.argmax(dim=1)
            val_correct += (preds == labels).sum().item()
            val_total += labels.size(0)

    # Calculate average validation loss and accuracy
    avg_val_loss = val_loss / val_total
    val_acc = 100. * val_correct / val_total
    val_losses.append(avg_val_loss)
    val_scores.append(val_acc)


# Update best validation accuracy if this epoch's is higher
    if val_acc > best_val_acc:
      best_val_acc = val_acc


    # I print results every epoch so I can track training progress
    print(f"[Epoch {epoch+1}] LR={scheduler.get_last_lr()[0]:.6f} | Training Accuracy={train_acc:.2f}% | Testing Accuracy={val_acc:.2f}%")

print(f"\nBest Validation Accuracy Achieved: {best_val_acc:.2f}%")


Epoch 1/35:   1%|          | 1/196 [01:33<5:04:17, 93.63s/it]

**Visualisation**


---


Here I am plotting the training and testing accuracy and loss curves. This helps to visualise how well the model is learning and generalising. I use it to both understand overfitting.

In [None]:
# Plotting the accuracy curves for both training and validation
plt.plot(train_scores, label='Training Accuracy')  # Training accuracy per epoch
plt.plot(val_scores, label='Validation Accuracy')  # Validation accuracy per epoch
plt.title("Accuracy over Epochs")  # Title of the plot
plt.xlabel("Epoch")  # X-axis label
plt.ylabel("Accuracy (%)")  # Y-axis label
plt.legend()  # Add a legend to differentiate lines
plt.grid(True)  # Add a grid for better readability
plt.show()  # Display the plot

# Plotting the loss curves for both training and validation
plt.plot(train_losses, label='Training Loss')  # Training loss per epoch
plt.plot(val_losses, label='Validation Loss')  # Validation loss per epoch
plt.title("Loss over Epochs")  # Title of the plot
plt.xlabel("Epoch")  # X-axis label
plt.ylabel("Loss")  # Y-axis label
plt.legend()  # Add a legend to differentiate lines
plt.grid(True)  # Add a grid for better readability
plt.show()  # Display the plot

# Accuracy gap = Train Acc - Val Acc
accuracy_gap = [train - val for train, val in zip(train_scores, val_scores)]

plt.figure(figsize=(8, 4))
plt.plot(accuracy_gap, label='Accuracy Gap (Train - Val)')
plt.axhline(0, color='gray', linestyle='--')
plt.title("Accuracy Gap Over Epochs")
plt.xlabel("Epoch")
plt.ylabel("Gap (%)")
plt.legend()
plt.grid(True)
plt.show()

plt.figure(figsize=(8, 4))
plt.plot(val_scores[-20:], marker='o', label='Val Acc (Last 20 Epochs)')
plt.title("Validation Accuracy - Last 20 Epochs")
plt.xlabel("Epoch")
plt.ylabel("Accuracy (%)")
plt.xticks(ticks=range(20), labels=range(len(val_scores)-19, len(val_scores)+1))
plt.legend()
plt.grid(True)
plt.show()

def moving_average(data, window=5):
    return np.convolve(data, np.ones(window)/window, mode='valid')

plt.plot(moving_average(train_scores), label='Train Accuracy (Smoothed)')
plt.plot(moving_average(val_scores), label='Val Accuracy (Smoothed)')
plt.title("Smoothed Accuracy Curves")
plt.xlabel("Epoch")
plt.ylabel("Accuracy (%)")
plt.legend()
plt.grid(True)
plt.show()
