# Installing Packages
The following packages need to be installed before running the code below
- torchsummary

In [1]:
!pip install torchsummary



# Imports
Importing necessary packages

In [2]:
from __future__ import print_function

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from tqdm import tqdm
from torch.optim.lr_scheduler import StepLR
from torchsummary import summary
from torchvision import datasets, transforms

ModuleNotFoundError: No module named 'torch'

# Dataset Preparation and Loading
The following steps are performed for preparing the dataset for the model.
- Downloading the MNIST dataset
- Visualizing the dataset and the data statistics
- Defining data transformations
- Splitting the dataset into train and validation set
- Creating data loader for train and validation set

## Data Visualization and Statistics
Let's see how our data looks like and what are the mean and standard deviation values of the dataset. This information will help us decide the transformations that can be used on the dataset.

In [None]:
# Download data
sample_data = datasets.MNIST('./data', train=True, download=True).data

# Setti32ng the values in the data to be within the range [0, 1]
sample_data = sample_data.numpy() / 255

# Display some data statistics
print('Data Statistics:')
print(' - Data Shape:', sample_data.shape)
print(' - min:', np.min(sample_data))
print(' - max:', np.max(sample_data))
print(' - mean: %.4f' % np.mean(sample_data))
print(' - std: %.4f' % np.std(sample_data))
print(' - var: %.4f\n' % np.var(sample_data))

# Visualize a sample from the data
plt.imshow(sample_data[0], cmap='gray_r')

Let's view some more images. This will help in getting any ideas for data augmentation later on

In [None]:
figure = plt.figure()
num_of_images = 60
for index in range(1, num_of_images + 1):
    plt.subplot(6, 10, index)
    plt.axis('off')
    plt.imshow(sample_data[index], cmap='gray_r')

## Data Transformations

The following transformations will be used
- ToTensor
- Normalize
- RandomRotation

In [None]:
# Train phase transformations
train_transforms = transforms.Compose([
    # Rotate image by 7 degrees
    transforms.RandomRotation((-6.0, 6.0), fill=(1,)),

    # convert the data to torch.FloatTensor with values within the range [0.0 ,1.0]
    transforms.ToTensor(),

    # normalize the data with mean and standard deviation
    # these values were obtained from the data statistics above
    transforms.Normalize((0.1307,), (0.3081,))
])

# Validation phase transformations
val_transforms = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))
])

## Train Data and Validation Data Split
The data is downloaded and split into two sets: train and validation

In [None]:
train = datasets.MNIST('./data', train=True, download=True, transform=train_transforms)
val = datasets.MNIST('./data', train=False, download=True, transform=val_transforms)

## Training and Validation Dataloaders
This is the final step in data preparation. It sets the dataloader arguments and then creates the dataloader

In [None]:
SEED = 1

cuda = torch.cuda.is_available()
print('CUDA Available?', cuda)

# For reproducibility of results
torch.manual_seed(SEED)
if cuda:
    torch.cuda.manual_seed(SEED)

# dataloader arguments
dataloader_args = dict(shuffle=True, batch_size=64, num_workers=4, pin_memory=True) if cuda else dict(shuffle=True, batch_size=32)

# train dataloader
train_loader = torch.utils.data.DataLoader(train, **dataloader_args)

# validation dataloader
val_loader = torch.utils.data.DataLoader(val, **dataloader_args)

# Model Architecture
Designing the model structure

In [None]:
class Net(nn.Module):
    def __init__(self):
        """ This function instantiates all the model layers """
        super(Net, self).__init__()

        dropout_rate = 0.01

        self.convblock1 = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=8, kernel_size=3),
            nn.ReLU(),
            nn.BatchNorm2d(8),
            nn.Dropout(dropout_rate)
        )  # Input: 28x28x1 | Output: 26x26x8 | RF: 3x3

        self.convblock2 = nn.Sequential(
            nn.Conv2d(in_channels=8, out_channels=8, kernel_size=3),
            nn.ReLU(),
            nn.BatchNorm2d(8),
            nn.Dropout(dropout_rate)
        )  # Input: 26x26x8 | Output: 24x24x8 | RF: 5x5

        self.convblock3 = nn.Sequential(
            nn.Conv2d(in_channels=8, out_channels=16, kernel_size=3),
            nn.ReLU(),
            nn.BatchNorm2d(16),
            nn.Dropout(dropout_rate)
        )  # Input: 24x24x8 | Output: 22x22x16 | RF: 7x7

        self.convblock4 = nn.Sequential(
            nn.Conv2d(in_channels=16, out_channels=16, kernel_size=3),
            nn.ReLU(),
            nn.BatchNorm2d(16),
            nn.Dropout(dropout_rate)
        )  # Input: 22x22x16 | Output: 20x20x16 | RF: 9x9

        self.pool = nn.MaxPool2d(2, 2)  # Input: 20x20x16 | Output: 10x10x16 | RF: 10x10

        self.convblock5 = nn.Sequential(
            nn.Conv2d(in_channels=16, out_channels=16, kernel_size=3),
            nn.ReLU(),
            nn.BatchNorm2d(16),
            nn.Dropout(dropout_rate)
        )  # Input: 10x10x16 | Output: 8x8x16 | RF: 14x14

        self.convblock6 = nn.Sequential(
            nn.Conv2d(in_channels=16, out_channels=16, kernel_size=3),
            nn.ReLU(),
            nn.BatchNorm2d(16),
            nn.Dropout(dropout_rate)
        )  # Input: 8x8x16 | Output: 6x6x16 | RF: 18x18

        self.convblock7 = nn.Sequential(
            nn.Conv2d(in_channels=16, out_channels=10, kernel_size=1),
            nn.ReLU(),
            nn.BatchNorm2d(10),
            nn.Dropout(dropout_rate)
        )  # Input: 6x6x16 | Output: 6x6x10 | RF: 18x18

        self.gap = nn.Sequential(
            nn.AdaptiveAvgPool2d(1)
        )  # Input: 6x6x10 | Output: 1x1x10 | RF: 28x28
    
    def forward(self, x):
        """ This function defines the network structure """
        x = self.convblock1(x)
        x = self.convblock2(x)
        x = self.convblock3(x)
        x = self.convblock4(x)
        x = self.pool(x)
        x = self.convblock5(x)
        x = self.convblock6(x)
        x = self.convblock7(x)
        x = self.gap(x)
        x = x.view(-1, 10)
        return F.log_softmax(x, dim=-1)

## Model Parameters
Let's see the model summary

In [None]:
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
print(device)
model = Net().to(device)
summary(model, input_size=(1, 28, 28))

# Model Training and Validation

Function for model training

In [None]:
def train(model, device, train_loader, optimizer, epoch, l1_factor):
    model.train()
    pbar = tqdm(train_loader)
    correct = 0
    processed = 0
    for batch_idx, (data, target) in enumerate(pbar):
        # Get samples
        data, target = data.to(device), target.to(device)

        # Set gradients to zero before starting backpropagation
        optimizer.zero_grad()

        # Predict output
        y_pred = model(data)

        # Calculate loss
        loss = F.nll_loss(y_pred, target)
        if l1_factor > 0:  # Apply L1 regularization
            l1_criteria = nn.L1Loss(size_average=False)
            regularizer_loss = 0
            for parameter in model.parameters():
                regularizer_loss += l1_criteria(parameter, torch.zeros_like(parameter))
            loss += l1_factor * regularizer_loss

        # Perform backpropagation
        loss.backward()
        optimizer.step()

        # Update Progress Bar
        pred = y_pred.argmax(dim=1, keepdim=True)
        correct += pred.eq(target.view_as(pred)).sum().item()
        processed += len(data)
        pbar.set_description(desc=f'Loss={loss.item():0.2f} Batch_ID={batch_idx} Accuracy={(100 * correct / processed):.2f}')

Function for model validation

In [None]:
def val(model, device, val_loader, losses, accuracies, incorrect_samples):
    model.eval()
    val_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in val_loader:
            img_batch = data  # This is done to keep data in CPU
            data, target = data.to(device), target.to(device)  # Get samples
            output = model(data)  # Get trained model output
            val_loss += F.nll_loss(output, target, reduction='sum').item()  # Sum up batch loss
            pred = output.argmax(dim=1, keepdim=True)  # Get the index of the max log-probability
            result = pred.eq(target.view_as(pred))

            # Save incorrect samples
            if len(incorrect_samples) < 25:
                for i in range(val_loader.batch_size):
                    if not list(result)[i]:
                        incorrect_samples.append({
                            'prediction': list(pred)[i],
                            'label': list(target.view_as(pred))[i],
                            'image': list(img_batch)[i]
                        })

            correct += result.sum().item()

    val_loss /= len(val_loader.dataset)
    losses.append(val_loss)
    accuracies.append(100. * correct / len(val_loader.dataset))

    print(f'\nValidation set: Average loss: {val_loss:.4f}, Accuracy: {correct}/{len(val_loader.dataset)} ({accuracies[-1]:.2f}%)\n')

Function for model execution

In [None]:
def run_model(l1=0.0, l2=0.0):
    losses = []
    accuracies = []
    incorrect_samples = []
    
    model = Net().to(device)
    optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9, weight_decay=l2)
    scheduler = StepLR(optimizer, step_size=5, gamma=0.15)
    epochs = 40

    for epoch in range(1, epochs + 1):
        print(f'Epoch {epoch}:')
        train(model, device, train_loader, optimizer, epoch, l1)
        scheduler.step()
        val(model, device, val_loader, losses, accuracies, incorrect_samples)
    
    return losses, accuracies, incorrect_samples

### Without L1 and L2 Regularization

In [None]:
loss, accuracy, incorrect_pred = run_model()

### With L1 Regularization

In [None]:
l1_loss, l1_accuracy, incorrect_pred_l1 = run_model(l1=0.001)

### With L2 Regularization

In [None]:
l2_loss, l2_accuracy, incorrect_pred_l2 = run_model(l2=0.0001)

### With L1 and L2 Regularization

In [None]:
l1_l2_loss, l1_l2_accuracy, incorrect_pred_l1_l2 = run_model(l1=0.001, l2=0.0001)

## Plotting Results
Plotting changes in validation loss and accuracy obtained during model training in the following scenarios:
1. Without L1 and L2 regularization
2. With L1 regularization
3. With L2 regularization
4. With L1 and L2 regularization

In [None]:
def plot_metric(plain, l1, l2, l1_l2, metric):
    # Initialize a figure
    fig = plt.figure(figsize=(13, 11))

    # Plot values
    plain_plt, = plt.plot(plain)
    l1_plt, = plt.plot(l1)
    l2_plt, = plt.plot(l2)
    l1_l2_plt, = plt.plot(l1_l2)

    # Set plot title
    plt.title(f'Validation {metric}')

    # Label axes
    plt.xlabel('Epoch')
    plt.ylabel(metric)

    # Set legend
    location = 'upper' if metric == 'Loss' else 'lower'
    plt.legend(
        (plain_plt, l1_plt, l2_plt, l1_l2_plt),
        ('Plain', 'L1', 'L2', 'L1 + L2'),
        loc=f'{location} right',
        shadow=True,
        prop={'size': 20}
    )

    # Save plot
    fig.savefig(f'{metric.lower()}_change.png')

Plot changes in validation loss

In [None]:
plot_metric(loss, l1_loss, l2_loss, l1_l2_loss, 'Loss')

Plot changes in validation accuracy

In [None]:
plot_metric(accuracy, l1_accuracy, l2_accuracy, l1_l2_accuracy, 'Accuracy')

## Display Results

Display 25 misclassified images for the following trained models:
1. Without L1 and L2 regularization
2. With L1 regularization
3. With L2 regularization
4. With L1 and L2 regularization

In [None]:
def save_and_show_result(data, metric):
    # Initialize plot
    row_count = -1
    fig, axs = plt.subplots(5, 5, figsize=(10, 10))
    fig.tight_layout()

    for idx, result in enumerate(data):

        # If 25 samples have been stored, break out of loop
        if idx > 24:
            break
        
        label = result['label'].item()
        prediction = result['prediction'].item()

        # Plot image
        if idx % 5 == 0:
            row_count += 1
        axs[row_count][idx % 5].axis('off')
        axs[row_count][idx % 5].set_title(f'Label: {label}\nPrediction: {prediction}')
        axs[row_count][idx % 5].imshow(result['image'][0], cmap='gray_r')

        # Save each image individually in labelled format
        extent = axs[row_count][idx % 5].get_window_extent().transformed(fig.dpi_scale_trans.inverted())
        fig.savefig(f'{metric}/labelled/{metric}_{idx + 1}.png', bbox_inches=extent.expanded(1.1, 1.5))
    
    # Save image
    fig.savefig(f'{metric}/{metric}_incorrect_predictions.png', bbox_inches='tight')

### With L1 Regularization

In [None]:
# Making directories
!mkdir l1
!mkdir l1/labelled

save_and_show_result(incorrect_pred_l1, 'l1')

# For downloading the results
!zip -r l1.zip l1

### With L2 Regularization

In [None]:
# Making directories
!mkdir l2
!mkdir l2/labelled

save_and_show_result(incorrect_pred_l2, 'l2')

# For downloading the results
!zip -r l2.zip l2