In [None]:
# Importing necessary libraries and modules

import matplotlib.pyplot as plt
from matplotlib.ticker import MultipleLocator
import pandas as pd
import numpy as np
from scipy.spatial import distance

import torch
from torchvision import datasets
import torchvision.transforms as transforms
from torchvision.models import (
    resnet34,
    ResNet34_Weights,
    resnet18,
    ResNet18_Weights,
    vgg11,
    VGG11_Weights,
)

from sklearn.metrics import confusion_matrix, f1_score, accuracy_score
from tqdm.notebook import tqdm
import seaborn as sb

from copy import deepcopy

# Set seaborn theme
sb.set_theme()

In [None]:
# Check if CUDA is available and use it if possible
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

### Load Data

In [None]:
# Define a series of transformations to apply to an image.

transform = transforms.Compose(
    [
        transforms.Resize(size=256),  # Resize the image to 256x256 pixels.
        transforms.CenterCrop(
            size=224
        ),  # Crop the center of the image to 224x224 pixels.
        transforms.ToTensor(),  # Convert the image to a tensor.
        transforms.Normalize(
            [0.485, 0.456, 0.406], [0.229, 0.224, 0.225]
        ),  # Normalize the image with mean and standard deviation.
    ]
)

In [None]:
# Load the training data
train_data = datasets.CIFAR10("data", train=True, download=True, transform=transform)

# Load the test data
test_data = datasets.CIFAR10("data", train=False, download=True, transform=transform)

In [None]:
# Split the training data into training and validation subsets
train_subset, val_subset = torch.utils.data.random_split(
    train_data, [0.85, 0.15], generator=torch.Generator().manual_seed(1)
)

### Dense (Fully Connected) Layer

In [None]:
class Dense:
    def __init__(self, n_inputs, n_neurons):
        self.n_inputs = n_inputs
        self.n_neurons = n_neurons

        # Initialize the weights with random values based on normal distribution with He heuristic
        self.weights = (
            torch.randn(n_inputs, n_neurons) * torch.sqrt(torch.tensor(2.0 / n_inputs))
        ).to(device)
        # Initialize the biases with zeros
        self.biases = torch.zeros(1, n_neurons).to(device)

    def forward(self, inputs):
        self.inputs = inputs
        self.output = torch.matmul(self.inputs, self.weights) + self.biases

    def backward(self, b_input):
        self.weights_gradient = torch.matmul(self.inputs.T, b_input)
        self.biases_gradient = torch.sum(b_input, dim=0, keepdims=True)
        self.inputs_gradient = torch.matmul(b_input, self.weights.T)

        self.b_output = self.inputs_gradient

### Activation Functions

In [None]:
class ReLU:
    def __repr__(self):
        return "ReLU"

    def forward(self, inputs):
        # Calculate the output using the ReLU activation function
        self.output = torch.maximum(torch.zeros(inputs.size()).to(device), inputs)

    def backward(self, b_input):
        inputs_gradient = b_input.clone()
        # Set the gradient to zero where the output is less than or equal to zero
        inputs_gradient[self.output <= 0] = 0
        self.b_output = inputs_gradient

In [None]:
class Sigmoid:
    def __repr__(self):
        return "Sigmoid"

    def forward(self, inputs):
        # Calculate the sigmoid function of the input tensor
        self.output = 1 / (1 + torch.exp(-inputs))

    def backward(self, b_input):
        inputs_gradient = b_input * (1 - self.output) * self.output
        self.b_output = inputs_gradient

### Loss Function

In [None]:
class Categorical_Cross_Entropy_loss_Softmax:
    def forward(self, inputs, batch_size, class_label=None):
        max_out, max_inds = torch.max(inputs, dim=1, keepdims=True)

        # Calculate the exponential values
        exp_vals = torch.exp(inputs - max_out)

        # Calculate the softmax values
        self.softmax = exp_vals / torch.sum(exp_vals, dim=1, keepdims=True)
        self.softmax += 1e-7  # Add a small value to avoid division by zero

        # Calculate the loss
        if class_label is not None:
            self.class_label = class_label
            self.batch_size = batch_size
            self.loss = -torch.sum(self.class_label * torch.log(self.softmax))

    def backward(self):
        # Calculate the gradient of the inputs
        self.b_output = (self.softmax - self.class_label) / self.batch_size

### Optimizer

In [None]:
class SGD:
    def __init__(self, learning_rate=0.001):
        self.learning_rate = learning_rate

    def update(self, layer):
        layer.weights -= (
            self.learning_rate * layer.weights_gradient
        )  # Update the weights
        layer.biases -= self.learning_rate * layer.biases_gradient  # Update the biases

### Architecture

#### Features Extractor

In [None]:
class ResNet34:
    def __init__(self):
        # Load the ResNet34 model with default weights
        self.resnet34 = resnet34(weights=ResNet34_Weights.DEFAULT)
        modules = list(self.resnet34.children())[:-1]

        # Create a new model with all the layers except the last one
        self.resnet34 = torch.nn.Sequential(*modules)

        self.resnet34.eval()  # Set the model to evaluation mode

        self.resnet34 = self.resnet34.to(device)

    def __repr__(self):
        return "ResNet34"

    def get_features(self, images):
        with torch.no_grad():
            # Get the features from the features extractor model
            features = self.resnet34(images)
        return features

    def get_size(self):
        with torch.no_grad():
            features = torch.flatten(
                self.resnet34(torch.zeros(1, 3, 224, 224).to(device)), start_dim=1
            )
        # Return the size of the features
        return features.shape

In [None]:
class ResNet18:
    def __init__(self):
        # Load the ResNet18 model with default weights
        self.resnet18 = resnet18(weights=ResNet18_Weights.DEFAULT)
        modules = list(self.resnet18.children())[:-1]

        # Create a new model with all the layers except the last one
        self.resnet18 = torch.nn.Sequential(*modules)

        self.resnet18.eval()  # Set the model to evaluation mode

        self.resnet18 = self.resnet18.to(device)

    def __repr__(self):
        return "ResNet18"

    def get_features(self, images):
        with torch.no_grad():
            # Get the features from the features extractor model
            features = self.resnet18(images)
        return features

    def get_size(self):
        with torch.no_grad():
            features = torch.flatten(
                self.resnet18(torch.zeros(1, 3, 224, 224).to(device)), start_dim=1
            )
        # Return the size of the features
        return features.shape

In [None]:
class VGG11:
    def __init__(self):
        # Load the VGG11 model with default weights
        self.vgg11 = vgg11(weights=VGG11_Weights.DEFAULT)
        modules = list(self.vgg11.children())[:-1]

        # Create a new model with all the layers except the last one
        self.vgg11 = torch.nn.Sequential(*modules)

        self.vgg11.eval()  # Set the model to evaluation mode

        self.vgg11 = self.vgg11.to(device)

    def __repr__(self):
        return "VGG11"

    def get_features(self, images):
        with torch.no_grad():
            # Get the features from the features extractor model
            features = self.vgg11(images)
        return features

    def get_size(self):
        with torch.no_grad():
            features = torch.flatten(
                self.vgg11(torch.zeros(1, 3, 224, 224).to(device)), start_dim=1
            )
        # Return the size of the features
        return features.shape

In [None]:
fe_resnet18 = ResNet18()
fe_resnet34 = ResNet34()
fe_vgg11 = VGG11()

#### Neural Network

In [None]:
class AKModel:
    def __init__(
        self,
        features_extractor=fe_resnet34,
        number_of_classes=10,
        batch_size=32,
        tune=False,
    ):
        self.batch_size = batch_size
        self.number_of_classes = number_of_classes

        self.optimizer = SGD(learning_rate=0.001)
        self.features_extractor = features_extractor
        self.n_inputs = self.features_extractor.get_size()[1]

        if not tune:
            self.layers = [Dense(self.n_inputs, 20), Dense(20, self.number_of_classes)]
            self.activations = [ReLU(), Categorical_Cross_Entropy_loss_Softmax()]

        # Create a data loader for the training data
        self.train_loader = torch.utils.data.DataLoader(
            dataset=train_subset, shuffle=True, batch_size=self.batch_size
        )

        # Create a data loader for the validation data
        self.val_loader = torch.utils.data.DataLoader(
            dataset=val_subset, shuffle=False, batch_size=self.batch_size
        )

        # Create a data loader for the test data
        self.test_loader = torch.utils.data.DataLoader(
            dataset=test_data, shuffle=False, batch_size=self.batch_size
        )

    def _forward_propagation(self, data, y_1hot=None):
        """
        This function performs forward propagation on a neural network.
        It takes as input a data tensor and an one-hot encoded label tensor.
        """
        if len(self.activations) != len(self.layers):
            raise Exception(
                "The number of activations should be equal to the number of layers"
            )

        inputs = torch.flatten(self.features_extractor.get_features(data), start_dim=1)
        for layer, activation in zip(self.layers, self.activations):
            layer.forward(inputs)
            if activation == self.activations[-1]:
                activation.forward(layer.output, self.batch_size, y_1hot)
                break
            else:
                activation.forward(layer.output)
            inputs = activation.output

    def _test_val(self):
        acc_hist = []
        loss_hist = []
        for data, label in self.val_loader:
            data = data.to(device)
            label = label.to(device)
            y_1hot = torch.nn.functional.one_hot(
                label, num_classes=self.number_of_classes
            )

            self._forward_propagation(data, y_1hot)

            # Calculate the accuracy
            y_predict = torch.argmax(self.activations[-1].softmax, dim=1)
            accuracy = torch.mean((torch.argmax(y_1hot, dim=1) == y_predict).float())
            acc_hist.append(accuracy)

            # Calculate the loss
            loss = self.activations[-1].loss
            loss_hist.append(loss)

        return torch.mean(torch.Tensor(loss_hist)), torch.mean(torch.Tensor(acc_hist))

    def evaluate(self, train=False):
        if train:
            y_true = []
            y_predict = []

            for data, label in self.train_loader:
                data = data.to(device)
                label = label.to(device)
                y_1hot = torch.nn.functional.one_hot(
                    label, num_classes=self.number_of_classes
                )

                self._forward_propagation(data, y_1hot)

                y_pred = torch.argmax(self.activations[-1].softmax, dim=1)

                for yt, yp in zip(torch.argmax(y_1hot, dim=1), y_pred):
                    y_true.append(yt.item())
                    y_predict.append(yp.item())

            return (
                accuracy_score(y_true, y_predict),
                f1_score(y_true, y_predict, average="weighted"),
            )

        else:
            y_true = []
            y_predict = []

            for data, label in self.test_loader:
                data = data.to(device)
                label = label.to(device)
                y_1hot = torch.nn.functional.one_hot(
                    label, num_classes=self.number_of_classes
                )

                self._forward_propagation(data, y_1hot)

                y_pred = torch.argmax(self.activations[-1].softmax, dim=1)

                for yt, yp in zip(torch.argmax(y_1hot, dim=1), y_pred):
                    y_true.append(yt.item())
                    y_predict.append(yp.item())

            return (
                accuracy_score(y_true, y_predict),
                f1_score(y_true, y_predict, average="weighted"),
            )

    def predict(self, data):
        data = data.to(device)

        self._forward_propagation(data)

        y_predict = torch.argmax(self.activations[-1].softmax, dim=1)
        return y_predict
    
    def train(self, epochs=20, disable_tqdm=False):
        self.epochs = epochs

        self.train_loss_history = []
        self.val_loss_history = []

        self.train_acc_history = []
        self.val_acc_history = []

        for epoch in range(epochs):
            with tqdm(self.train_loader, unit="batch", disable=disable_tqdm) as tepoch:
                # Initialize the history lists for each epoch
                acc_hist = []
                loss_hist = []
                for data, label in tepoch:
                    tepoch.set_description(f"Epoch {epoch + 1}")

                    # Transfer the data to the GPU
                    data = data.to(device)
                    label = label.to(device)

                    # Convert the labels to one-hot encoded vectors
                    y_1hot = torch.nn.functional.one_hot(
                        label, num_classes=self.number_of_classes
                    )

                    self._forward_propagation(data, y_1hot)

                    # Calculate the accuracy
                    y_predict = torch.argmax(self.activations[-1].softmax, dim=1)
                    accuracy = torch.mean(
                        (torch.argmax(y_1hot, dim=1) == y_predict).float()
                    )
                    acc_hist.append(accuracy)

                    # Calculate the loss
                    loss = self.activations[-1].loss
                    loss_hist.append(loss)

                    # Backward propagation
                    self.activations[-1].backward()
                    b_input = self.activations[-1].b_output
                    for layer, activation in zip(
                        reversed(self.layers), reversed(self.activations[:-1])
                    ):
                        layer.backward(b_input)
                        activation.backward(layer.b_output)
                        b_input = activation.b_output
                    self.layers[0].backward(b_input)

                    # Update the weights and biases
                    for layer in self.layers:
                        self.optimizer.update(layer)

                    tepoch.set_postfix(
                        loss=loss.item(), accuracy=f"{accuracy.item()*100:.2f}%"
                    )

            # Calculate the validation loss and accuracy
            val_loss, val_acc = self._test_val()

            # Append the history lists
            self.train_loss_history.append(torch.mean(torch.Tensor(loss_hist)))
            self.val_loss_history.append(val_loss)

            self.train_acc_history.append(torch.mean(torch.Tensor(acc_hist)) * 100)
            self.val_acc_history.append(val_acc * 100)

            # Print the results
            if not disable_tqdm:
                print(f"Epoch:{epoch+1}")
                print(f"Training Loss: {self.train_loss_history[-1]}")
                print(f"Validation Loss: {self.val_loss_history[-1]}")
                print(f"Training Accuracy: {self.train_acc_history[-1]:.2f}%")
                print(f"Validation Accuracy: {self.val_acc_history[-1]:.2f}%")
                print("--------------------------------------------------")
