In [None]:
# 🚀 **Importing Helper Modules**

import torch  # 🧠 Core PyTorch library for tensor operations and neural networks
import torch.nn as nn  # 🏗️ Neural network components (layers, loss functions)
import torch.optim as optim  # ⚙️ Optimization algorithms (SGD, Adam, etc.)
import torchvision  # 🎨 Computer vision utilities and datasets
import torchvision.transforms as transforms  # 🖼️ Data transformations (normalization, augmentation)
from torch.utils.data import DataLoader  # 🚚 For loading and batching data
import matplotlib.pyplot as plt  # 📊 Visualization for losses and accuracies
import torch.nn.functional as F
from tqdm import tqdm

In [None]:
import os
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'expandable_segments:True'

In [None]:
# 🚀 **Part 1: Data Loading and Preprocessing**

# 🛠️ TODO: Complete the data loading code 🧩
def load_mnist_data(batch_size=64):
    """
    📦 **Load and preprocess the MNIST dataset.**
    📜 **Returns:** train_loader and test_loader 🎯
    """
    # transform = transforms.Compose([
    #     transforms.ToTensor(),  # 🔄 Convert images to tensors 📊
    #     transforms.Normalize((0.1307,), (0.3081,))  # ⚖️ Normalize with mean and std for MNIST dataset
    # ])

    train_transform = transforms.Compose([
        transforms.Resize((224, 224)), # resize to OG VGG16 input with 1 channel
        transforms.RandomAffine(degrees=15, translate=(0.1, 0.1)), # random rotation of +- 15 degrees, with 10% shift at random
        transforms.ToTensor(),
        transforms.Normalize(mean=(0.1307,), std=(0.3081,))
    ])

    test_transform = transforms.Compose([
        transforms.Resize((224, 224)), # resize to OG VGG16 input with 3 channels
        transforms.ToTensor(),
        transforms.Normalize(mean=(0.1307,), std=(0.3081,))
    ])

    # 🛠️ **TODO: Load MNIST training and test datasets** 🖼️
    # 📌 Hint: Use `torchvision.datasets.MNIST` for dataset loading 📥
    #          Use `torch.utils.data.DataLoader` for creating data loaders 🔄

    # train_dataset = torchvision.datasets.MNIST(root='./data', train=True, transform=transform, download=True)
    # test_dataset = torchvision.datasets.MNIST(root='./data', train=False, transform=transform, download=True)
    # train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    # test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    train_dataset = torchvision.datasets.MNIST(root='./data', train=True, transform=train_transform, download=True)
    test_dataset = torchvision.datasets.MNIST(root='./data', train=False, transform=test_transform, download=True)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    # Example (not yet implemented):
    # train_dataset = datasets.MNIST(root='./data', train=True, transform=transform, download=True)
    # test_dataset = datasets.MNIST(root='./data', train=False, transform=transform, download=True)
    # train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    # test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    return train_loader, test_loader  # 🚚 Return the loaders 📦


In [None]:
# 🚀 **Part 2: Custom Dropout Implementation**

class CustomDropout(nn.Module):
    """
    🛠️ TODO: Implement custom dropout layer 🎯

    📜 **Requirements:**
    1️⃣ Initialize with **dropout probability** `p` 🎲
    2️⃣ Implement **forward pass** with proper scaling 🔄
    3️⃣ **Only drop** units during **training** (`self.training` flag) 🏋️‍♂️
    """

    def __init__(self, p=0.5):
        super(CustomDropout, self).__init__()
        # 🎲 Store dropout probability (p between 0 and 1)
        self.p = p
        # pass  # 🚧 Initialization complete! Time to implement the logic 🛠️

    def forward(self, x):
        # 🔄 **TODO: Implement forward pass**
        if self.training:  # 🏋️‍♂️ Drop units only during training mode
            # pass  # 🚧 Work in progress! Apply dropout logic 🧪
            mask = torch.bernoulli(torch.ones_like(x) * (1 - self.p)) # using a bernoulli distribution for generating random 0 or 1, p probability of mask being 0
            # alternate implementation
            # mask = (torch.rand_like(x) > self.p).float()
            return x * mask / (1 - self.p) # dividing by (1-p) is crucial so that the expected value of the activations ultimately remain unchanged
        return x  # 🔄 Return the (possibly dropped) output ✨


In [None]:
# 🚀 **Part 3: Custom BatchNorm2d Implementation**

class CustomBatchNorm2d(nn.Module):
    """
    🛠️ TODO: Implement custom 2D batch normalization 🔄

    📜 **Requirements:**
    1️⃣ Initialize **running mean**, **variance**, **gamma (scale)**, and **beta (shift)** ⚖️
    2️⃣ Implement **forward pass** with proper normalization ✨
    3️⃣ Track **running statistics** during training 📊
    """

    def __init__(self, num_features, eps=1e-5, momentum=0.1):
        super(CustomBatchNorm2d, self).__init__()
        # 🛠️ **TODO: Initialize parameters and buffers**
        # pass  # 🚧 Work in progress 🚀
        self.num_features = num_features # number of channels
        self.eps = eps
        self.momentum = momentum

        self.gamma = nn.Parameter(torch.ones(num_features)) # scaling factor
        self.beta = nn.Parameter(torch.zeros(num_features)) # shifting factor
        self.register_buffer('running_mean', torch.zeros(num_features)) # simple buffer to store the running values, optional
        self.register_buffer('running_var', torch.ones(num_features))

    def forward(self, x):
        # 🔄 **TODO: Implement forward pass for batch normalization**
        # Steps:
        # 1️⃣ Calculate batch mean and variance 📊
        # 2️⃣ Normalize the input 🎯
        # 3️⃣ Apply learnable parameters (gamma and beta) ⚙️
        # 4️⃣ Update running statistics during training 🏋️‍♂️
        # pass  # 🚧 Normalize and return the output 🧪
        if self.training:
            batch_mean = x.mean([0, 2, 3]) # mean and variance is per channel, the input here is 4D (batch_size, num_channels, height, width) and we are applying across (0, 2, 3) indexes in the given array
            batch_var = x.var([0, 2, 3], unbiased=False)

            self.running_mean.mul_(1 - self.momentum).add_(self.momentum * batch_mean)
            self.running_var.mul_(1 - self.momentum).add_(self.momentum * batch_var)

            mean = batch_mean.view(1, -1, 1, 1)
            var = batch_var.view(1, -1, 1, 1)
        else:
            mean = self.running_mean.view(1, -1, 1, 1)
            var = self.running_var.view(1, -1, 1, 1)

        x_normalized = (x - mean) / torch.sqrt(var + self.eps)
        gamma = self.gamma.view(1, -1, 1, 1)
        beta = self.beta.view(1, -1, 1, 1)
        ret_val = gamma * x_normalized + beta

        return ret_val


In [None]:
class CustomReLU(nn.Module):
    """
    🛠️ TODO: Implement custom ReLU activation function ✨

    📜 **Requirements:**
    1️⃣ Apply ReLU manually using tensor operations (avoid using `F.relu`) 🔄
    2️⃣ Output should replace all negative values with 0 (ReLU behavior) 🧹
    """

    def forward(self, x):
        # 🔄 **TODO: Implement forward pass for ReLU**
        # Hint: Use `torch.max` to replace all negative values with 0 🎯
        # pass  # 🚧 Replace and return the ReLU-activated output ⚡
        return torch.max(torch.zeros_like(x), x) # normal ReLU = max(0, x) implementation but vectorized


In [None]:
class CustomMaxPooling2d(nn.Module):
    """
    🛠️ TODO: Implement custom 2D MaxPooling layer 🏊

    📜 **Requirements:**
    1️⃣ Implement a max-pooling operation with a given kernel size and stride 📐
    2️⃣ Return the maximum value in each pooling window 🌊
    3️⃣ Ensure it supports both training and evaluation modes 🔄
    """

    def __init__(self, kernel_size=2, stride=2):
        super(CustomMaxPooling2d, self).__init__()
        self.kernel_size = kernel_size
        self.stride = stride

    def forward(self, x):
        # 🔄 **TODO: Implement forward pass for max-pooling**
        # Hint: Use `unfold` to break the input into windows and compute the max for each window 🔍
        # pass  # 🚧 Pool and return the reduced output 🏊‍♂️
        N, C, H, W = x.shape # batch, channels, height, width
        out_H = (H - self.kernel_size) // self.stride + 1 # normal theory formula O = floor ((I + 2P - K) / S) + 1
        out_W = (W - self.kernel_size) // self.stride + 1

        x_unfold = F.unfold(x, kernel_size = self.kernel_size, stride = self.stride)
        x_unfold = x_unfold.view(N, C, self.kernel_size * self.kernel_size, -1)

        out, _ = x_unfold.max(dim = 2)
        out = out.view(N, C, out_H, out_W)
        
        return out


In [None]:
# 🚀 **Part 4: Custom VGG16 Model Implementation**
class CustomVGG16(nn.Module):
    """
    📜 Custom VGG16-like Model with:
    1️⃣ Convolutional blocks using nn.Conv2d, CustomBatchNorm2d, and CustomDropout 🔄
    2️⃣ ReLU activation ⚡ and MaxPooling 🏊
    3️⃣ Fully connected layers at the end
    """

    def __init__(self, num_classes=10):  # num_classes = 10 for MNIST
        super(CustomVGG16, self).__init__()
        # 🔨 **TODO: Define your layers here**
        # Example: self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size)
        # pass  # 🚧 Work in progress 🚀
        self.features = nn.Sequential(
            # Block 1
            nn.Conv2d(1, 64, kernel_size=3, padding=1),
            CustomBatchNorm2d(64),
            CustomReLU(),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            CustomBatchNorm2d(64),
            CustomReLU(),
            CustomMaxPooling2d(),

            # Block 2
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            CustomBatchNorm2d(128),
            CustomReLU(),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            CustomBatchNorm2d(128),
            CustomReLU(),
            CustomMaxPooling2d(),

            # Block 3
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            CustomBatchNorm2d(256),
            CustomReLU(),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            CustomBatchNorm2d(256),
            CustomReLU(),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            CustomBatchNorm2d(256),
            CustomReLU(),
            CustomMaxPooling2d(),

            # Block 4
            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            CustomBatchNorm2d(512),
            CustomReLU(),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            CustomBatchNorm2d(512),
            CustomReLU(),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            CustomBatchNorm2d(512),
            CustomReLU(),
            CustomMaxPooling2d(),

            # Block 5
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            CustomBatchNorm2d(512),
            CustomReLU(),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            CustomBatchNorm2d(512),
            CustomReLU(),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            CustomBatchNorm2d(512),
            CustomReLU(),
            CustomMaxPooling2d()

        )
        # Fully connected layers
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(512 * 7 * 7, 4096),
            CustomReLU(),
            CustomDropout(),
            nn.Linear(4096, 4096),
            CustomReLU(),
            CustomDropout(),
            nn.Linear(4096, num_classes)
        )

    def forward(self, x):
        # 🔄 **TODO: Implement the forward pass** 💡
        # Example: x = self.conv1(x)
        # pass  # 🚧 Process the input and return the output 🎯
        x = self.features(x)
        x = self.classifier(x)
        return x


In [None]:
# 🚀 **Part 5: Training Functions**

def train_epoch(model, train_loader, criterion, optimizer, device):
    """
    🛠️ TODO: Implement training loop for one epoch 🏋️‍♂️
    """
    model.train()  # 📈 Switch to training mode
    running_loss = 0.0  # 💰 Track the cumulative loss
    correct = 0  # ✅ Correct predictions counter
    total = 0  # 📊 Total samples counter
    pbar = tqdm(enumerate(train_loader), total=len(train_loader), desc="Training")

    for batch_idx, (data, target) in pbar:  # 🔄 Loop through batches
        # 📌 Your code here (e.g., forward pass, loss calculation, backward pass, optimizer step)
        # pass
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad() # clear from previous batch
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

        predicted = output.argmax(dim=1)
        total += target.size(0)
        correct += predicted.eq(target).sum().item()

        pbar.set_postfix({
            'loss': loss.item(),
            'acc': 100. * correct / total
        })

    # 📊 Return average loss and accuracy for the epoch
    avg_loss = running_loss / len(train_loader)
    accuracy = 100. * correct / total if total > 0 else 0
    return avg_loss, accuracy

def evaluate(model, test_loader, criterion, device):
    """
    🧪 TODO: Implement evaluation loop 🔍
    """
    model.eval()  # 🔕 Switch to evaluation mode (no gradients)
    test_loss = 0  # 💰 Track cumulative test loss
    correct = 0  # ✅ Correct predictions counter
    total = 0  # 📊 Total samples counter
    pbar = tqdm(enumerate(test_loader), total=len(test_loader), desc="Evaluating")

    with torch.no_grad():  # 🚫 No gradient calculation for evaluation
        # 📌 Your code here (e.g., forward pass, loss calculation, accuracy calculation)
        # pass
        for batch_idx, (data, target) in pbar:
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss = criterion(output, target)
            test_loss += loss.item()

            predicted = output.argmax(dim=1)
            total += target.size(0)
            correct += predicted.eq(target).sum().item()

            pbar.set_postfix({
                'loss': loss.item(),
                'acc': 100. * correct / total if total > 0 else 0
            })

    avg_loss = test_loss / len(test_loader)
    accuracy = 100. * correct / total if total > 0 else 0
    # 📊 Return average test loss and accuracy
    return avg_loss, accuracy
    

In [None]:
# 🚀 **Part 6: Main Training Loop**

def main():
    # ⚙️ **Hyperparameters**
    BATCH_SIZE = 16  # 📦 Batch size for data loading
    EPOCHS = 2  # 🔄 Number of training epochs
    LEARNING_RATE = 0.001  # 🚀 Learning rate for optimizer
    DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')  # ⚡ Use GPU if available

    # 📊 **Load data**
    train_loader, test_loader = load_mnist_data(BATCH_SIZE)

    # 🛠️ **Initialize model, criterion, optimizer**
    model = CustomVGG16().to(DEVICE)  # 🖥️ Move model to the selected device
    criterion = nn.CrossEntropyLoss()  # 🎯 Loss function for classification
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)  # 🚀 Adam optimizer for better convergence

    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=1)

    # 🔄 **Training loop**
    train_losses = []  # 📉 Track training losses
    test_losses = []  # 📉 Track test losses
    train_accs = []  # 📊 Track training accuracy
    test_accs = []  # 📊 Track test accuracy

    for epoch in range(EPOCHS):
        # 🏋️‍♂️ **TODO: Implement main training loop**
        print(f"🌟 Epoch {epoch+1}/{EPOCHS}")  # 🕒 Display current epoch
        # Example Steps:
        # 1️⃣ Train for one epoch
        # 2️⃣ Evaluate on test set
        # 3️⃣ Record losses and accuracies
        # 4️⃣ Print progress 💬
        # pass
        train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, DEVICE)
        test_loss, test_acc = evaluate(model, test_loader, criterion, DEVICE)

        scheduler.step(test_loss)
        print(f"🔄 Learning rate changed to: {optimizer.param_groups[0]['lr']:.6f}")

        train_losses.append(train_loss)
        test_losses.append(test_loss)
        train_accs.append(train_acc)
        test_accs.append(test_acc)

        print(f'Epoch {epoch+1}/{EPOCHS} finished:')
        print(f'Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%')
        print(f'Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.2f}%')

    # 📈 **Plot results**
    # 🛠️ **TODO: Create loss and accuracy plots**
    # Example: plt.plot(train_losses), plt.plot(test_losses), etc.
    # pass  # 🎨 Generate and display plots 📊
    plt.figure(figsize=(12, 4))
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='Train Loss')
    plt.plot(test_losses, label='Test Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(train_accs, label='Train Accuracy')
    plt.plot(test_accs, label='Test Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.legend()

    plt.tight_layout()
    plt.show()

if __name__ == '__main__':
    main()

In [None]:
# For comparison with PyTorch
class PyTorchVGG16(nn.Module):
    """
    VGG16 using PyTorch's built-in layers
    """
    def __init__(self, num_classes=10):
        super(PyTorchVGG16, self).__init__()

        self.features = nn.Sequential(
            # Block 1
            nn.Conv2d(1, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            
            # Block 2
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            
            # Block 3
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            
            # Block 4
            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            
            # Block 5
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )

        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(512 * 7 * 7, 4096),
            nn.ReLU(True),
            nn.Dropout(0.5),
            nn.Linear(4096, 4096),
            nn.ReLU(True),
            nn.Dropout(0.5),
            nn.Linear(4096, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x

def train_epoch(model, train_loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()

        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = output.max(1)
        total += target.size(0)
        correct += predicted.eq(target).sum().item()

        if (batch_idx + 1) % 100 == 0:
            print(f"Batch {batch_idx + 1}/{len(train_loader)} "
                  f"Loss: {loss.item()} "
                  f"Accuracy: {100. * correct / total}")

    return running_loss / len(train_loader), 100. * correct / total

def evaluate(model, test_loader, criterion, device):
    model.eval()
    test_loss = 0
    correct = 0
    total = 0

    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss = criterion(output, target)
            test_loss += loss.item()
            _, predicted = output.max(1)
            total += target.size(0)
            correct += predicted.eq(target).sum().item()

    return test_loss / len(test_loader), 100. * correct / total

def main():
    # Hyperparameters
    BATCH_SIZE = 16
    EPOCHS = 2
    LEARNING_RATE = 0.001
    DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    # Load data
    train_loader, test_loader = load_mnist_data(BATCH_SIZE)

    # Initialize model, criterion, optimizer
    model = PyTorchVGG16().to(DEVICE)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, mode='min', factor=0.5, patience=1
    )

    # Training tracking
    train_losses = []
    test_losses = []
    train_accs = []
    test_accs = []

    # Training loop
    for epoch in range(EPOCHS):
        print(f"Epoch {epoch+1}/{EPOCHS}")

        train_loss, train_acc = train_epoch(
            model, train_loader, criterion, optimizer, DEVICE
        )
        test_loss, test_acc = evaluate(model, test_loader, criterion, DEVICE)

        scheduler.step(test_loss)
        print(f"Learning rate: {optimizer.param_groups[0]['lr']:.6f}")

        train_losses.append(train_loss)
        test_losses.append(test_loss)
        train_accs.append(train_acc)
        test_accs.append(test_acc)

        print(f'Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%')
        print(f'Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.2f}%')

    # Plot results
    plt.figure(figsize=(12, 4))
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='Train Loss')
    plt.plot(test_losses, label='Test Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(train_accs, label='Train Accuracy')
    plt.plot(test_accs, label='Test Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.legend()

    plt.tight_layout()
    plt.show()

if __name__ == '__main__':
    main()