<a href="https://colab.research.google.com/github/sandeep-kumar-singh/emasters_ee954/blob/main/EE954_Assignment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Problem Statement
1. Download the Fashion_MNIST dataset. You can find it on the official Fashion-MNIST website or by using PyTorch's torchvision.datasets module. Split the dataset into training, validation and testing sets. A common split is 80% of the data to train, 10% to validate, and 10% to test scenarios, but you can adjust this as needed. Normalize the images. This involves scaling the pixel values to a range between 0 and 1.

2. Implement a MLP for classification. (total 40 marks)
    <ol type="a">
    <li>Flatten the images into a single dimensional vector before feeding it to the model. (1 marks)</li>
    <li>Write a pre-processing module for all the images. (3 marks)</li>
    <li>Write the Forward pass from scratch. Use of the inbuilt forward pass function will result in 0 marks for this sub-question. (8 marks)</li>
    <li>Write the Backward pass from scratch. Use the inbuilt back propagation function will result in 0 marks for this sub-question (12 marks)</li>
    <li>Write the module for cross entropy loss (1 marks)</li>
    <li>Experiment with different hyperparameters like number of layers, dropout, objective function, etc. and settle with a combination which performs the best for the given problem. (15 Marks)</li>
    </ol>

3. Implement a [CNN backbone model](https://www.baeldung.com/cs/neural-network-backbone) using pytorch. (total 40 marks)
    <ol type="a">
    <li>Build a small CNN model consisting of 5 convolution layers. Each convolution layer would be followed by a ReLU activation and a max pooling layer. (10 Marks )</li>
    <li>Experiment with different kernel size, number of kernel each layer (keep number of filter same in each layer, double it in each layer etc) and settle with a combination which performs the best for the given problem. (10 Marks)</li>
    <li>Try different weight initialization methods (random, Xavier, He) (5 Marks)</li>
    <li>After extracting feature from CNN model use MLP for classification (use code from question 2) (15 Marks)</li>
    </ol>

4. Submit a report clearly explaining how you have built the models, the architecture of the models, learning rate, epochs used for training, evaluation metrics and the instructions for running the models. Compare the performance of the models on the different hyperparameters you tried and justify the observed behavior. (20 Marks)

# Part 1 - Data Preparation

In [None]:
# ==========================================================
# Import Dependencies
# ==========================================================

# for MLP
import numpy as np

# for Pytorch based backbone CNN
import os
import torch
from torch import nn
from torch.utils.data import DataLoader, random_split
from torchvision import datasets, transforms
from torchvision.transforms import ToTensor

In [None]:
# ==========================================================
# Data Preparation (using torch)
# ==========================================================

# PyTorch based flow to prepare and train a backbone CNN
########################################################
# Download training data from open datasets.
training_data = datasets.FashionMNIST(
    root="data",
    train=True,
    download=True,
    transform=ToTensor(),                                                       # this transform does the normalisation of data from [1,255] to [0.0, 1.0]
)
# Download test data from open datasets.
test_data = datasets.FashionMNIST(
    root="data",
    train=False,
    download=True,
    transform=ToTensor(),                                                       # this transform does the normalisation of data from [1,255] to [0.0, 1.0]
)

# Split the training data into Training and Validation datasets
training_data_subset_size = int(0.8 * len(training_data))
validate_data_subset_size = len(training_data) - training_data_subset_size
training_data_subset, validation_data_subset = random_split(training_data, [training_data_subset_size, validate_data_subset_size])

# define batch-size to load data
batch_size = 64
# define num of epochs to be run for training
epochs = 10

# Create data loaders.
train_dataloader = DataLoader(training_data_subset, batch_size=batch_size)
validate_dataloader = DataLoader(validation_data_subset, batch_size=batch_size)
test_dataloader = DataLoader(test_data, batch_size=batch_size)

In [None]:
# ==========================================================
# Data Preparation (Alternate using numpy)
# ==========================================================

import os
import gzip
import urllib.request
import numpy as np

# URL and data filename for the Fashion MNIST dataset
DATASET_BASE_URL = 'http://fashion-mnist.s3-website.eu-central-1.amazonaws.com'
DATASET_BASE_FOLDER = './data/FashionMNIST/raw'
DATA_FILENAME = {
    'train_images': 'train-images-idx3-ubyte.gz',
    'train_labels': 'train-labels-idx1-ubyte.gz',
    'test_images': 't10k-images-idx3-ubyte.gz',
    'test_labels': 't10k-labels-idx1-ubyte.gz'
}

# Helper function to download and extract the dataset
def download_and_extract(filename, is_image=False):
    if not os.path.exists('/'.join([DATASET_BASE_FOLDER, filename])):
        os.makedirs(DATASET_BASE_FOLDER, exist_ok=True)
        urllib.request.urlretrieve('/'.join([DATASET_BASE_URL, filename]),
                                   '/'.join([DATASET_BASE_FOLDER, filename]))
    filename = os.path.join(DATASET_BASE_FOLDER, filename)
    with gzip.open(filename, 'rb') as f:
        if (is_image):
            return np.frombuffer(f.read(), np.uint8, offset=16).reshape(-1, 28 * 28) / 255.0
        else:
            return np.frombuffer(f.read(), np.uint8, offset=8)

# Download and extract all files
train_images__np = download_and_extract(DATA_FILENAME['train_images'], is_image=True)
train_labels__np = download_and_extract(DATA_FILENAME['train_labels'])
test_images__np = download_and_extract(DATA_FILENAME['test_images'], is_image=True)
test_labels__np = download_and_extract(DATA_FILENAME['test_labels'])

# Split the training set into training and validation sets
num_train = int(0.8 * len(train_images__np))
train_data__np, val_data__np = train_images__np[:num_train], train_images__np[num_train:]
train_labels__np, val_labels__np = train_labels__np[:num_train], train_labels__np[num_train:]

print(f'Training data shape   : Images - {train_data__np.shape} | Labels - {train_labels__np.shape}')
print(f'Validation data shape : Images - {val_data__np.shape} | Labels - {val_labels__np.shape}')
print(f'Test data shape       : Images - {test_images__np.shape} | Labels - {test_labels__np.shape}')

Training data shape   : Images - (48000, 784) | Labels - (48000,)
Validation data shape : Images - (12000, 784) | Labels - (12000,)
Test data shape       : Images - (10000, 784) | Labels - (10000,)


In [None]:
# ==========================================================
# Select Device for Execution
# ==========================================================

# Get cpu, gpu or mps device for training.
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")



Using cuda device


In [None]:
# ==========================================================
# Define MLP model for classification
# ==========================================================

# Define a class to represent dense layer
class DenseLayer:
    def __init__(self, input_dim, output_dim, activation, lambda_reg=0.1, reg_type=None):
        self.input_dim = input_dim
        self.output_dim = output_dim

        self.weights = np.random.randn(input_dim, output_dim) * 0.01
        self.biases =  np.zeros((1, output_dim))

        self.activation_name = activation
        self.lambda_reg = lambda_reg

        self.output = None
        self.input = None

        self.reg_type = reg_type

        if self.activation_name == 'relu':
            self.activation = self.relu
            self.activation_prime = self.relu_prime
        elif self.activation_name == 'sigmoid':
            self.activation = self.sigmoid
            self.activation_prime = self.sigmoid_prime
        elif self.activation_name == 'softmax':
            self.activation = self.softmax
            self.activation_prime = self.softmax_prime
        else:
            raise ValueError('activation function is not defined')

    def __str__(self):
        return f"""DenseLayer(input_dim:{self.input_dim}, output_dim:{self.output_dim}, activation:{self.activation_name})"""

    def forward(self, input_data):
        self.input = input_data
        #print(f"self.input: {self.input.shape} \n self.weights {self.weights.shape}")
        Z = np.dot(self.input, self.weights) + self.biases
        #print("Z ", Z.shape)
        self.output = self.activation(Z)
        #print(f"set..... self.output {self.output.shape}")

        return self.output

    def backward(self, dA, learning_rate, y=None):
        """
        Backward propagate through this layer.
        dA is the derivative of the loss with respect to the output of this layer.
        y is the true labels, which is only needed if this is an output layer with softmax activation.
        """
        #print(f"self.output {self.output.shape}")
        if self.activation_name == 'softmax':
            y_one_hot = np.zeros_like(self.output)
            y_one_hot[np.arange(len(y)), y] = 1
            # Calculate the derivative of the loss with respect to the softmax inputs
            print(len(y))
            dZ = (self.output - y_one_hot) / len(y)
        else:
            dZ = dA * self.activation_prime(self.output)

        dA_prev = np.dot(dZ, self.weights.T)
        dW = np.dot(self.input.T, dZ)
        db = np.sum(dZ, axis=0, keepdims=True)

        if self.reg_type:
            if self.reg_type.upper() == "L1":
                 #print("Using L1 regularization..")
                 weights_reg = self.lambda_reg * np.sign(self.weights)
                 biases_reg = self.lambda_reg * np.sign(self.biases)
            else:
                 #print("Using L2 regularization....")
                 weights_reg = self.lambda_reg * self.weights
                 biases_reg = self.lambda_reg * self.biases
            self.weights -= learning_rate * (dW + weights_reg)
            self.biases -= learning_rate * (db + biases_reg)
        else:
            #print("No regularization....")
            self.weights -= learning_rate * dW
            self.biases -= learning_rate * db

        return dA_prev

    # ==== Activation functions and their derivatives ====

    def relu(self, x):
        return np.maximum(0, x)

    def relu_prime(self, x):
        return np.where(x > 0, 1, 0)

    def sigmoid(self, x):
        return 1 / (1 + np.exp(-x))

    def sigmoid_prime(self, x):
        return self.sigmoid(x) * (1 - self.sigmoid(x))

    # Ref https://stackoverflow.com/questions/40575841/numpy-calculate-the-derivative-of-the-softmax-function
    def softmax(self,Z):
        exp_scores = np.exp(Z)
        return exp_scores / np.sum(exp_scores, axis=1, keepdims=True)  # Softmax activation

    # The derivative of the cross-entropy loss with respect to the input to the softmax is simply predictions - true_labels
    def softmax_prime(self,x):
        return 1


# Define a class to represent MLP
class MLP:
    def __init__(self):
        self.layers = []
        self.history = {'train_loss': [], 'val_loss': [], 'train_acc':[], 'val_acc':[]}

    def add_layer(self, layer):
        self.layers.append(layer)

    def forward(self, X):
        for layer in self.layers:
            X = layer.forward(X)
        return X

    def predict(self, X):
        output = self.forward(X)
        return np.argmax(output, axis=1)

    def cross_entropy_loss(self,y, output):
        m = y.shape[0]
        log_likelihood = -np.log(output[range(m), y] + 1e-9)
        loss = np.sum(log_likelihood) / m
        return loss

    def train(self, train_data, train_labels, val_data, val_labels, epochs=10, batch_size=64, learning_rate=0.01):
        for epoch in range(epochs):
            permutation = np.random.permutation(train_data.shape[0])
            train_data = train_data[permutation]
            train_labels = train_labels[permutation]
            for i in range(0, train_data.shape[0], batch_size):
                X_batch = train_data[i:i+batch_size]
                y_batch = train_labels[i:i+batch_size]
                output = self.forward(X_batch)
                self.backward(output, learning_rate, y_batch)
            train_loss = self.cross_entropy_loss(train_labels, self.forward(train_data))
            self.history['train_loss'].append(train_loss)

            val_output = self.forward(val_data)
            val_loss = self.cross_entropy_loss(val_labels, val_output)  # Use val_labels directly
            self.history['val_loss'].append(val_loss)

            val_accuracy = np.mean(self.predict(val_data) == val_labels)
            train_acc = np.mean(self.predict(train_data) == train_labels)
            self.history['train_acc'].append(train_acc)
            self.history['val_acc'].append(val_accuracy)
            print(f'Epoch {epoch+1}, Training Loss: {train_loss:.4f}, Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}')

    def backward(self,output, learning_rate, y_train_batch):
        for layer in reversed(self.layers):
            #print(layer)
            output = layer.backward(output, learning_rate,y_train_batch)

In [None]:
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# For Debugging - CNN Layer Output Size
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

# input = torch.randn(784, 3, 3)

# # m = nn.Conv2d(392, 784, (2, 2), stride=(1, 1), padding=(1, 1))
# m = nn.MaxPool2d((2, 2))
# output = m(input)

# print(input.size(), output.size())

torch.Size([784, 3, 3]) torch.Size([784, 1, 1])


In [None]:
# ==========================================================
# Define backbone CNN model for feature extraction
# ==========================================================

class BackboneNeuralNetwork(nn.Module):
    """
    Build a small CNN model consisting of 5 convolution layers. Each convolution
    layer would be followed by a ReLU activation and a max pooling layer.

    Dense network with 2 layers, with a ReLU activation after first layer. This
    layer can be used ONLY when testing CNN model in isolation. After extracting
    feature from CNN model, use MLP for classification.

    NOTE: Dense network is not used, if `backbone_only` is `True`.

    REFERENCES:
        1. https://pytorch.org/docs/stable/generated/torch.nn.Sequential.html
        nn.Sequential()

        2. https://pytorch.org/docs/stable/generated/torch.nn.Conv2d.html#torch.nn.Conv2d
        nn.Conv2d(
            in_channels = number of layers in input images. Grayscale or monochrome images have 1 in_channels
            out_channels = number of channels in the output produced. This is a hyperparameter, which signifies the number of kernels
            kernel_size = `(m,n)` for a kernel/filter dimension, or simply n for a square (n,n) kernel/filter dimension
            stride=1, padding=0, dilation=1, groups=1, bias=True, padding_mode='zeros', device=None, dtype=None are other properties with default values
        )

        3. https://pytorch.org/docs/stable/generated/torch.nn.ReLU.html#torch.nn.ReLU
        nn.ReLU()

        4. https://pytorch.org/docs/stable/generated/torch.nn.MaxPool2d.html#torch.nn.MaxPool2d
        nn.MaxPool2d(
            kernel_size = `(m,n)` for a kernel/filter dimension, or simply n for a square (n,n) kernel/filter dimension
            stride=None, padding=0, dilation=1, return_indices=False, ceil_mode=False are other properties with default values
        )

        5. https://pytorch.org/docs/stable/generated/torch.nn.Linear.html#torch.nn.Linear
        nn.Linear(
            in_features = size of each input sample
            out_features = size of each output sample
            bias=True, device=None, dtype=None are other properties with default values
        )

        6. https://pytorch.org/docs/stable/generated/torch.nn.Softmax.html#torch.nn.Softmax
        nn.Softmax()

        7. https://pytorch.org/tutorials/beginner/blitz/cifar10_tutorial.html
        Training a Classifier
    """

    # Constructor for the CNN Model.
    #
    # NOTE: If `backbone_only` is `True`, dense network is not used.
    def __init__(self, backbone_only=False):
        super().__init__()

        # this property helps the CNN transition from a full-fledged network to a backbone CNN
        # the default value is False - meaning an object of this class can be used to predict the labels for Fashion-MNIST dataset
        # if the value is set to True - an object of this class will return the flattened output from conv layers - thus acting as a backbone
        self.backbone_only = backbone_only

        self.flatten = nn.Flatten()

        # CNN model consisting of 5 convolution layers with each convolution
        # layer followed by a ReLU activation and a max pooling layer.
        self.convolutional_relu_stack = nn.Sequential(
            nn.Conv2d(1, 49, (3, 3), stride=(1, 1), padding=(1, 1)),    # input = (1,28,28), output = (49, 28, 28)
            nn.ReLU(),
            nn.MaxPool2d((2, 2)),                                       # input = (49, 28, 28), output = (49, 14, 14)
            nn.Conv2d(49, 98, (2, 2), stride=(1, 1), padding=(1,1)),    # input = (49, 14, 14), output = (98, 15, 15)
            nn.ReLU(),
            nn.MaxPool2d((2, 2)),                                       # input = (98, 15, 15), output = (98, 7, 7)
            nn.Conv2d(98, 196, (2, 2), stride=(1, 1), padding=(1,1)),   # input = (98, 7, 7), output = (196, 8, 8)
            nn.ReLU(),
            nn.MaxPool2d((2, 2)),                                       # input = (196, 8, 8), output = (196, 4, 4)
            nn.Conv2d(196, 392, (2, 2), stride=(1, 1), padding=(1, 1)), # input = (196, 4, 4), output = (392, 5, 5)
            nn.ReLU(),
            nn.MaxPool2d((2, 2)),                                       # input = (392, 5, 5), output = (392, 2, 2)
            nn.Conv2d(392, 784, (2, 2), stride=(1, 1), padding=(1, 1)), # input = (392, 2, 2), output = (784, 3, 3)
            nn.ReLU(),
            nn.MaxPool2d((2, 2))                                        # input = (784, 3, 3), output = (784,1,1)
        )

        # Dense network with 2 layers with a ReLU activation after first layer.
        # This layer can be used ONLY when testing CNN model in isolation.
        #
        # NOTE: Dense network is not used, if `backbone_only` is `True`.
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(784, 128),
            nn.ReLU(),
            nn.Linear(128, 10)
        )

    # Forward pass for CNN Model
    def forward(self, x):
        print(f'CNN Input Size: {x.size()}')
        x1 = self.convolutional_relu_stack(x)
        print(f'CNN Output Size: {x1.size()}')
        x2 = self.flatten(x1)
        print(f'Flatten Size: {x2.size()}')

        if self.backbone_only:  # return the flattened tensor containing feature extraction data
            return x2

        # default behaviour is to return the predicted labels
        x3 = self.linear_relu_stack(x2)
        return x3

In [None]:
# ==========================================================
# Data Preparation
# ==========================================================

# PyTorch based flow to prepare and train a backbone CNN
########################################################
# Download training data from open datasets.
training_data = datasets.FashionMNIST(
    root="data",
    train=True,
    download=True,
    transform=ToTensor(),
)
# Download test data from open datasets.
test_data = datasets.FashionMNIST(
    root="data",
    train=False,
    download=True,
    transform=ToTensor(),
)

# Split the training data into Training and Validation datasets
training_data_subset_size = int(0.8 * len(training_data))
validate_data_subset_size = len(training_data) - training_data_subset_size
training_data_subset, validation_data_subset = random_split(training_data, [training_data_subset_size, validate_data_subset_size])

print(f'Complete Training Data: {training_data.size}')

# define batch-size to load data
batch_size = 64
# define num of epochs to be run for training
epochs = 10

# Create data loaders.
train_dataloader = DataLoader(training_data_subset, batch_size=batch_size)
validate_dataloader = DataLoader(validation_data_subset, batch_size=batch_size)
test_dataloader = DataLoader(test_data, batch_size=batch_size)

# for X, y in test_dataloader:
#     print(f"Shape of X [N, C, H, W]: {X.shape}")
#     print(f"Shape of y: {y.shape} {y.dtype}")
#     break

# create model instance
model = BackboneNeuralNetwork().to(device)
# print(model)

# define loss function and optimizer
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-1)

# forward pass implementation
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        optimizer.zero_grad()
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), (batch + 1) * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")


# validation implementation
def validate(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    validation_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            validation_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    validation_loss /= num_batches
    correct /= size
    print(f"Validation Phase: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {validation_loss:>8f} \n")

# testing implementation
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Phase: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

# Train and Validate the CNN
print("Training the backbone CNN model")
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, model, loss_fn, optimizer)
    validate(validate_dataloader, model, loss_fn)
print("Done!")

# Save the model
torch.save(model.state_dict(), "model.pth")
print("Saved PyTorch CNN backbone model state to model.pth")

# Load the model to perform testing on the trained variables
model = BackboneNeuralNetwork().to(device)
model.load_state_dict(torch.load("model.pth"))

# Test with the model
test(test_dataloader, model, loss_fn)

#####################################
# Once the backbone model is prepared, trained, and tested,
# start the integration of backbone model with custom MLP

# define the MLP architecture
input_size = 28 * 28
hidden_size = 128
output_size = 10

# create the instance of MLP
mlp = MLP()
mlp.add_layer(DenseLayer(input_size, hidden_size, 'relu'))      #reg_type="L2" does not help
mlp.add_layer(DenseLayer(hidden_size, output_size, 'softmax'))  #reg_type="L2" does not help

# train the MLP using features extracted from pre-trained backbone
def feature_extraction(dataloader, model):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    features = np.empty((size, 28*28), dtype=np.float64)
    labels = np.empty((size), dtype=np.int64)
    model.eval()
    with torch.no_grad():
        for batch, (X, y) in enumerate(dataloader):
            X, y = X.to(device), y.to(device)
            pred = model(X)
            features[(batch*batch_size):((batch+1)*batch_size)] = pred.numpy()  # extract the features in numpy arrays
            labels[(batch*batch_size):((batch+1)*batch_size)] = y.numpy()       # extract the features in numpy arrays
    return features, labels

# create the instance of `backbone CNN model`
backbone_model = BackboneNeuralNetwork(backbone_only=True).to(device)
backbone_model.load_state_dict(torch.load("model.pth"))

# extract the features using backbone CNN
classifier_train_data, classifier_train_labels = feature_extraction(train_dataloader, backbone_model)
classifier_validation_data, classifier_validation_labels = feature_extraction(validate_dataloader, backbone_model)

# let the MLP classify the data now based on feature-extracted dataset
epochs = 1
learning_rate = 0.01
print("Training the classigication MLP model")
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    mlp.train(classifier_train_data, classifier_train_labels, classifier_validation_data, classifier_validation_labels)
