### Importing the packages

In [1]:
import numpy as np

import torch
import torch.nn as nn
from torchvision import datasets
import torchvision.transforms as transforms
from torchvision.models import resnet34
from torch.utils.data import DataLoader

from sklearn.metrics import confusion_matrix, f1_score
from tqdm import tqdm

from numpy.ma.core import ceil
from scipy.spatial import distance #distance calculation
from sklearn.cluster import MiniBatchKMeans
from sklearn.metrics import accuracy_score #scoring
import matplotlib.pyplot as plt
from matplotlib import animation, colors

import math
import random


### Loading Data

In [2]:
transform = transforms.Compose([
    transforms.Resize((64, 64)), # Resize to 224x224 (height x width)
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                          std=[0.229, 0.224, 0.225])
])

In [3]:
batch_size = 32
#drop_last=True
train_data = datasets.CIFAR10('data', train=True,
                              download=True, transform=transform)
train_dataloader = DataLoader(train_data, batch_size=batch_size,shuffle=True )

#loading the test data
test_data = datasets.CIFAR10('data', train=False,
                             download=True, transform=transform)
test_dataloader = DataLoader(test_data,batch_size=batch_size, shuffle=True)

Files already downloaded and verified
Files already downloaded and verified


In [4]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


### Setting up the Feature Extractor

In [5]:
feature_extractor = resnet34(weights="DEFAULT")
num_features = feature_extractor.fc.in_features

for param in feature_extractor.parameters():
  param.requires_grad = False

feature_extractor.fc = nn.Identity()
feature_extractor = feature_extractor.to(device)

### Finding Centers using Minibatch K-Means

In [6]:
kmeans =  MiniBatchKMeans(n_clusters=20, max_iter=100, random_state=0, batch_size=32)

for x_train, y_train in tqdm(train_dataloader, desc=f"Training", colour="blue"):
    x_train, y_train = x_train.to(device), y_train.to(device)
    x = feature_extractor(x_train)
    x = x.cpu()
    kmeans = kmeans.partial_fit(x)

Training: 100%|[34m██████████[0m| 1563/1563 [01:05<00:00, 23.95it/s]


In [7]:
centers = torch.from_numpy(kmeans.cluster_centers_.copy())

In [8]:
centers.shape

torch.Size([20, 512])

### calculating dmax for rbf radius

In [9]:
def max_distance_between_vectors(centers):
    """
    Calculates the maximum distance between any two vectors in a given set of centers.

    Args:
        centers (torch.Tensor): Tensor representing the centers/vectors.

    Returns:
        float: Maximum distance between any two vectors.
    """
    num_vectors = centers.shape[0]
    max_distance = 0.0

    for i in range(num_vectors):
        for j in range(i + 1, num_vectors):
            distance = torch.norm(centers[i] - centers[j])
            max_distance = max(max_distance, distance)

    return max_distance

dmax = max_distance_between_vectors(centers)

print(f"The maximum distance between any two vectors is: {dmax}")

The maximum distance between any two vectors is: 57.16002137279293


In [19]:
sigma = (dmax / math.sqrt(2 * centers.shape[0]) * torch.ones((1, 20))) ** 2
print(sigma)
sigma.shape

tensor([[81.6817, 81.6817, 81.6817, 81.6817, 81.6817, 81.6817, 81.6817, 81.6817,
         81.6817, 81.6817, 81.6817, 81.6817, 81.6817, 81.6817, 81.6817, 81.6817,
         81.6817, 81.6817, 81.6817, 81.6817]])


torch.Size([1, 20])

### RBF Layer

In [None]:
class RBF_Layer:
    """
    Radial Basis Function (RBF) Layer for neural networks.
    """
  
    def __init__(self, n_inputs, n_neurons, centers, dmax):
      """
        Initializes the RBF layer.

        Args:
            n_inputs (int): Number of input features.
            n_neurons (int): Number of RBF neurons.
            centers (torch.Tensor): Centers of the RBF neurons with shape (n_neurons, n_inputs).
            dmax (float): Maximum distance between any two vectors.

        """
      self.centers = centers # shape: (n_neurons, n_features)
      self.centers = self.centers.to(device)
      self.sigma = sigma # shape: (1, n_neurons)
      self.sigma = ((dmax / math.sqrt(2 * centers.shape[0]))) * torch.ones((1, n_neurons)) ** 2
      self.sigma = self.sigma.to(device)

    def forward(self, x):
      """
        Performs the forward pass of the RBF layer.

        Args:
            x (torch.Tensor): Input tensor with shape (batch_size, n_inputs).

        Returns:
            torch.Tensor: Output tensor with shape (batch_size, n_neurons).
        """
      self.inputs = x
      self.inputs = self.inputs.to(device)
      self.diff = self.diff.to(device)
      self.l2_norm = torch.cdist(self.inputs, self.centers, p=2)
      self.l2_norm  = self.l2_norm.to(device)
      # calculate the exponential term
      self.output = torch.exp(-self.l2_norm ** 2 / (2*self.sigma)) # shape: (batch_size, n_neurons)
      self.output = self.output.to(device)
      return self.output

    def backward(self, output_error):
        """
        Performs the backward pass of the RBF layer.

        Args:
            output_error (torch.Tensor): Output error tensor with shape (batch_size, n_neurons).

        Returns:
            torch.Tensor: Input error tensor with shape (batch_size, n_inputs).
        """
        self.centers_grad = torch.sum(output_error * (-torch.sum(self.diff, dim=2) / self.sigma) * self.output,dim=0).reshape(512,20)
        self.centers_grad =  self.centers_grad.to(device)
        self.sigma_grad = torch.sum(output_error * (self.l2_norm / (2*self.sigma**2)) * self.output, dim=0)
        self.sigma_grad = self.sigma_grad.to(device)

### Linear Output Layer

In [None]:
class Dense:
    """
    A class representing a dense layer in a neural network.

    Args:
        n_inputs (int): The number of input features.
        n_neurons (int): The number of neurons in the layer.

    Attributes:
        weights (torch.Tensor): The weight matrix of shape (n_inputs, n_neurons)
            initialized using the He weight initialization method.
        biases (torch.Tensor): The bias vector of shape (1, n_neurons) initialized
            with zeros.
        prev_wchange (torch.Tensor): The previous weight change matrix of shape
            (n_inputs, n_neurons) initialized with zeros.
        prev_bchange (torch.Tensor): The previous bias change matrix of shape
            (1, n_neurons) initialized with zeros.

    Methods:
        forward(inputs): Performs forward propagation and returns the output
            of the layer.
        backward(output_error): Performs backward propagation and returns
            the input error.
    """
    def __init__(self, n_inputs, n_neurons):
        self.weights = torch.randn(n_inputs, n_neurons)
        self.biases = torch.zeros((1, n_neurons))
        self.weights = self.weights.to(device)
        self.biases = self.biases.to(device)

    def forward(self, inputs):
        """
        Performs forward propagation for the dense layer.

        Args:
            inputs (torch.Tensor): The input tensor of shape (batch_size, n_inputs).

        Returns:
            torch.Tensor: The output tensor of shape (batch_size, n_neurons).
        """
        self.inputs = inputs.float()
        self.inputs = self.inputs.to(device)
        return torch.matmul(self.inputs, self.weights) + self.biases

    def backward(self, output_error):
        """
        Performs backward propagation for the dense layer.

        Args:
            output_error (torch.Tensor): The error tensor from the subsequent layer
                of shape (batch_size, n_neurons).

        Returns:
            torch.Tensor: The input error tensor of shape (batch_size, n_inputs).
        """
        # calculating errors
        self.inputs_error = torch.matmul(output_error, self.weights.T)
        self.inputs_error = self.inputs_error.to(device)
        self.weights_grad = torch.matmul(self.inputs.T, output_error)
        self.weights_grad = self.weights_grad.to(device)
        self.biases_grad = torch.sum(output_error, axis=0, keepdims=True)
        self.biases_grad =  self.biases_grad.to(device)
        return self.inputs_error


### Softmax

In [None]:
class Softmax:
    """
    A class representing the Softmax activation function.

    Methods:
        forward(inputs):
            Applies the Softmax activation function to the inputs and returns the result.
    """
    def forward(self, inputs):
        """
        Applies the Softmax activation function to the inputs.

        Args:
            inputs (torch.Tensor): The input tensor.

        Returns:
            torch.Tensor: The tensor after applying the Softmax activation function.
        """
        self.inputs = inputs.clone()
        self.inputs = self.inputs.to(device)
        exp_inputs = torch.exp(self.inputs - torch.max(self.inputs, dim=1, keepdim=True).values)
        exp_inputs = exp_inputs.to(device)
        self.outputs = exp_inputs / torch.sum(exp_inputs, dim=1, keepdim=True)
        self.outputs = self.outputs.to(device)
        return self.outputs



### Categorical Cross Entropy Loss

In [None]:
class Categorical_Cross_Entropy_loss:
    """
    A class representing the Categorical Cross-Entropy loss function.

    Methods:
        forward(softmax_output, class_label):
            Calculates the Categorical Cross-Entropy loss given the softmax output and class labels.

        backward(class_label):
            Calculates the input error for the Categorical Cross-Entropy loss function given the class labels.
    """
    def forward(self, softmax_output, class_label):
        """
        Calculates the Categorical Cross-Entropy loss given the softmax output and class labels.

        Args:
            softmax_output (torch.Tensor): The output tensor after applying the softmax activation function.
            class_label (torch.Tensor): The class labels tensor.

        Returns:
            torch.Tensor: The calculated Categorical Cross-Entropy loss.
        """
        # Apply softmax function to the output
        self.softmax_output = torch.clamp(softmax_output, 1e-12, 1. - 1e-12)
        self.softmax_output = self.softmax_output.to(device)
        N = softmax_output.shape[0]
        # Convert the class label to one-hot encoding
        self.class_label = torch.zeros_like(softmax_output)
        self.class_label = self.class_label.to(device)
        self.class_label[torch.arange(N), class_label] = 1
        # Calculate the cross-entropy loss
        self.loss = -torch.sum(self.class_label * torch.log(self.softmax_output)) / N
        self.loss = self.loss.to(device)
        return self.loss
    
    def backward(self, class_label):
        """
        Calculates the input error for the Categorical Cross-Entropy loss function given the class labels.

        Args:
            class_label (torch.Tensor): The class labels tensor.

        Returns:
            torch.Tensor: The input error tensor for the Categorical Cross-Entropy loss function.
        """
        # Calculate the derivative of the loss with respect to the softmax output
        N = self.softmax_output.shape[0]
        self.inputs_error = self.softmax_output.clone()
        self.inputs_error = self.inputs_error.to(device)
        self.inputs_error[torch.arange(N), class_label] -= 1
        self.inputs_error = self.inputs_error / N
        return self.inputs_error


### Optimizer

In [None]:
class SGD:
    """
    A class representing the Stochastic Gradient Descent (SGD) optimizer.

    Args:
        learning_rate (float): The learning rate for the optimizer.
        momentum (float): The momentum factor for SGD.

    Methods:
        __call__(layer, num_epoch):
            Updates the weights and biases of the layer using SGD optimization.
    """
    def __init__(self, learning_rate=0.01):
        self.learning_rate=learning_rate
    
    def __call__(self, layer):
        """
        Updates the weights and biases of the layer using SGD optimization.

        Args:
            layer (Dense): The layer to be updated.
            num_epoch (int): The current epoch.

        Returns:
            None
        """

        if type(layer).__name__ == 'Dense':

          layer.weights = layer.weights.to(device)
          layer.biases = layer.biases.to(device)

          layer.weights -=  self.learning_rate*layer.weights_grad.to(device)
          layer.biases -= self.learning_rate * layer.biases_grad.to(device)

        elif type(layer).__name__ == 'RBF_Layer':

          layer.centers = layer.centers.to(device)
          layer.sigma = layer.sigma.to(device)

          layer.centers -=  self.learning_rate*layer.centers_grad.to(device)
          layer.sigma -= self.learning_rate * layer.sigma_grad.to(device)
        



### Architecture

In [None]:
#model
Layer1 = RBF_Layer(num_features, 20, centers, dmax)
Layer2 = Dense(20,10)
Act2 = Softmax()
Loss = Categorical_Cross_Entropy_loss()
Optimizer = SGD(learning_rate=0.001)

In [None]:
train_acc = []
train_loss =[]
test_loss = []
test_acc = []

In [None]:

epochs = 20
y_predtr= torch.zeros(0,dtype=torch.long, device=device)
y_true_train = torch.zeros(0,dtype=torch.long, device=device)
y_predts= torch.zeros(0,dtype=torch.long, device=device)
y_true_test = torch.zeros(0,dtype=torch.long, device=device)
for epoch in range(epochs):
    epoch_train_loss = 0
    epoch_train_accuracy = 0
    epoch_test_loss = 0
    epoch_test_accuracy = 0
    for x_train, y_train in tqdm(train_dataloader, desc=f"Epoch {epoch+1}", colour="blue"):
        # Forward pass
        x_train, y_train = x_train.to(device), y_train.to(device)
        x = feature_extractor(x_train)
        x = Layer1.forward(x)
        x = Layer2.forward(x)
        x = Act2.forward(x)
        loss = Loss.forward(x, y_train)

        # Report batch metrics

        y_predict_train = torch.argmax(x, dim=1)
        accuracy = torch.mean((y_train == y_predict_train).float())
        epoch_train_loss += loss.item()
        epoch_train_accuracy += accuracy.item()
        if epoch == epochs-1:
            y_predtr=torch.cat([y_predtr,y_predict_train.view(-1)])
            y_true_train=torch.cat([y_true_train,y_train.view(-1)])
        # Backward pass
        x = Loss.backward(y_train)
        x = Layer2.backward(x)
        Layer1.backward(x)

        # Update parameters
        Optimizer(Layer1)
        Optimizer(Layer2)

    with torch.no_grad():
        for x_test, y_test in tqdm(test_dataloader, desc=f"Testing", colour="green"):
                # Forward pass
                x_test, y_test = x_test.to(device), y_test.to(device)
                x = feature_extractor(x_test)
                x = Layer1.forward(x)
                x = Layer2.forward(x)
                x = Act2.forward(x)
                loss = Loss.forward(x, y_test)

                # Report batch metrics
                y_predict_test = torch.argmax(x, dim=1)
                accuracy = torch.mean((y_test == y_predict_test).float())
                epoch_test_loss += loss.item()
                epoch_test_accuracy += accuracy.item()
                if epoch == epochs-1:
                    y_predts=torch.cat([y_predts,y_predict_test.view(-1)])
                    y_true_test=torch.cat([y_true_test,y_test.view(-1)])
                
    # Report epoch metrics
    epoch_train_loss /= len(train_dataloader)
    epoch_train_accuracy /= len(train_dataloader)
    epoch_test_loss /= len(test_dataloader)
    epoch_test_accuracy /= len(test_dataloader)
    train_loss.append(epoch_train_loss)
    train_acc.append(epoch_train_accuracy)
    test_loss.append(epoch_test_loss)
    test_acc.append(epoch_test_accuracy)
    print(f'Epoch: {epoch+1}')
    print(f'Train Loss: {epoch_train_loss:.7f}')
    print(f'Train Accuracy: {epoch_train_accuracy:.7f}')
    print(f'Test Loss: {epoch_test_loss:.7f}')
    print(f'Test Accuracy: {epoch_test_accuracy:.7f}')
    print('--------------------------')


Epoch 1:   0%|[34m          [0m| 0/1563 [00:00<?, ?it/s]

torch.Size([32, 20])
torch.Size([32, 20, 512])
torch.Size([32, 20])
torch.Size([1, 20])
torch.Size([32, 20])





RuntimeError: ignored

In [24]:
import torch
X = torch.rand(32, 512) # a tensor with shape 32*512
A = torch.rand(20, 512) # a tensor with shape 512*20
D = torch.cdist(X, A, p=2) # a tensor with shape 32*20
print(D.shape)
#print(D)
print((D/sigma).shape)

torch.Size([32, 20])
torch.Size([32, 20])


In [30]:
import torch
B = torch.rand(32, 512) # a tensor with shape 32*512
C = torch.rand(20, 512) # a tensor with shape 20*512
B_expanded = B.unsqueeze(1) # a tensor with shape 32*1*512
C_expanded = C.unsqueeze(0) # a tensor with shape 1*20*512
G = B_expanded - C_expanded # a tensor with shape 32*20*512

D_expanded = D.unsqueeze(2) # a tensor with shape 32*20*1
H = G / D_expanded # a tensor with shape 32*20*512
print(H.shape)


torch.Size([32, 20, 512])


In [32]:
U = torch.rand(32,20)
K = torch.exp(U)*H/(sigma)

RuntimeError: The size of tensor a (20) must match the size of tensor b (512) at non-singleton dimension 2