<a href="https://colab.research.google.com/github/ninadaptegit/pytorchCustomLayers/blob/main/convolutionvsmlp.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
from torch import nn
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.datasets import fetch_openml

import pandas as pd


# Data Fetching and Preprocessing

In [None]:
mnist = fetch_openml('mnist_784', version=1, as_frame=False)
X, y = mnist.data[:5000,:], mnist.target[:5000]


In [None]:
X.shape,y.shape

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

In [None]:
X_train,X_test,y_train,y_test = train_test_split(X,y,test_size=0.2,shuffle = True,random_state=42)

In [None]:
X_train.shape, X_test.shape, y_train.shape, y_test.shape

In [None]:
from sklearn.preprocessing import OneHotEncoder
y_train = y_train.astype(int).reshape(y_train.shape[0],-1)
y_test = y_test.astype(int).reshape(y_test.shape[0],-1)

onehot = OneHotEncoder(sparse_output=False)
y_train = onehot.fit_transform(y_train)
y_test = onehot.fit_transform(y_test)

In [None]:
X_train = torch.tensor(X_train)
X_test = torch.tensor(X_test)
y_train = torch.tensor(y_train)
y_test = torch.tensor(y_test)

In [None]:
# y_train = torch.unsqueeze(y_train,dim=1)
# y_test = torch.unsqueeze(y_test,dim=1)

In [None]:
X_train = X_train.to(device)
X_test = X_test.to(device)
y_train = y_train.to(device)
y_test = y_test.to(device)

In [None]:
X_train = X_train.type(torch.float)
X_test = X_test.type(torch.float)
y_train = y_train.type(torch.float)
y_test = y_test.type(torch.float)

In [None]:
mean = X_train.mean()
std = X_train.std()

In [None]:
mean , std

In [None]:
X_train = (X_train - mean)/std
X_test = (X_test - mean)/std

In [None]:
y_pandas = pd.DataFrame(y)

In [None]:
spread = y_pandas.value_counts().sort_index()
plt.figure(figsize=(8, 4))
spread.plot(kind='bar', color='skyblue', edgecolor='black')

plt.title("MNIST Label Distribution")
plt.xlabel("Digit Label")
plt.ylabel("Frequency")
plt.xticks(rotation=0)
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.tight_layout()
plt.show()

In [None]:
class CustomFullyConnectedLayer(nn.Module):
  def __init__(self, input_size, output_size,  activation = None):
    super().__init__()
    self.weights = nn.Parameter(torch.randn(output_size, input_size))
    self.bias = nn.Parameter(torch.randn(output_size,))
    self.activation = activation
  def forward(self, x):

    # x = (batch_size , in_nodes) After straightning the input
    # print(x.shape, self.weights.shape,self.bias.shape)
    z =  torch.matmul(x,self.weights.T) + self.bias
    if self.activation is not None:

      # To support custom activation functions example given below
      if isinstance(self.activation, type) and issubclass(self.activation, torch.autograd.Function):
        z = self.activation.apply(z)
      else:
        z = self.activation(z)
    return z






In [None]:
class CustomActivationFunction(torch.autograd.Function):
    @staticmethod
    def forward(ctx, input):
        ctx.save_for_backward(input)
        return input**2  # x^2

    @staticmethod
    def backward(ctx, grad_output):
        input, = ctx.saved_tensors
        grad_input = 2 * input * grad_output
        return grad_input

In [None]:
class CustomPooling2D(nn.Module):
  def __init__(self, kernel_size , stride = 1 , padding = 0 , mode = 'max'):
    super().__init__()

    self.kernel_size = (kernel_size,kernel_size)
    self.stride = stride
    self.padding = padding
    self.mode = mode

  def forward(self, x):
    B,C,H,W = x.shape
    kH,kW = self.kernel_size
    x = torch.nn.functional.pad(x,(self.padding,self.padding,self.padding,self.padding))
    patches = torch.nn.functional.unfold(x,kernel_size= self.kernel_size, stride = self.stride)
    patches = patches.view(B,C,kH*kW,-1)
    if self.mode == 'max':
      pooled = patches.max(dim=2).values
    else:
      pooled = patches.mean(dim=2)

    H_out = (H + 2 * self.padding - kH) // self.stride + 1
    W_out = (W + 2 * self.padding - kW) // self.stride + 1

    return pooled.view(B, C, H_out, W_out)




In [None]:
class CustomFlattenLayer(nn.Module):
  def __init__(self):
    super().__init__()

  def forward(self , x):
    return x.view(x.shape[0],-1)

In [None]:
class CustomConv2D(nn.Module):
  def __init__(self,  in_channels , out_channels , kernel_size , stride = 1  , padding = 0 , activation = None):
    super().__init__()
    self.stride = stride
    self.padding = padding
    self.kH , self.kW = kernel_size
    self.out_channels = out_channels
    self.activation = activation
    self.weight = nn.Parameter(torch.randn(out_channels,in_channels,self.kH,self.kW))
    self.bias = nn.Parameter(torch.randn(out_channels))


  def forward(self , x):
    # B is Batch_size , C_in is input channels, H, W
    Batch_size,C_in,H,W = x.shape
    # print(f"Batch size = {Batch_size} Input channels = {C_in} Height = {H} Width = {W}")

    # Add some padding to the input matrix based on what padding was given in the
    # constructor
    x = torch.nn.functional.pad(x , (self.padding,self.padding,self.padding , self.padding))

    # Get the kernal patches where kernal should be multiplied at using unfold
    # Look at the patches vertically, each column corresponds to to the kernal multiplied over the in_dim after the weight matrix is flattened.
    patches = torch.nn.functional.unfold(x,kernel_size = (self.kH,self.kW),stride = self.stride)

    # Flatten the weight matrix to help with the matrix multiplication operation
    # The first dim corresponds to the out_dim. So I am multiplying each row of the weight matrix to the column of the patch matrix. each row multiplication corresponds to
    # corresponds to each output matrix.
    W_flat = self.weight.view(self.weight.shape[0] , -1)
    # print(W_flat.shape,patches.shape)
    # Multiply the Weight and the patches
    out = W_flat.matmul(patches)

    # Add the bias term to each of the output matrices
    out = out + self.bias[:,None]

    # make sure the dimensions of the output align with (Batch , output_size, number of patches)
    out = out.permute(1,0,2)

    H_out = (H + 2*self.padding - self.kH)//self.stride + 1
    W_out = (W + 2*self.padding - self.kW)//self.stride + 1

    # print(f"Out Channels = {self.out_channels} H_out = {H_out} W_out = {W_out} Batch Size = {Batch_size}")

    # reshape in memory to match the size of the output result with formula
    # H_new = (H_old - kernal_size + 2*P)//stride + 1

    out = out.reshape(Batch_size,self.out_channels,H_out,W_out)
    if self.activation is not None:
      if isinstance(self.activation, type) and hasattr(self.activation, 'apply'):
          out = self.activation.apply(out)
      else:
          out = self.activation(out)
    return out



In [None]:
X_train.shape

In [None]:
class ConvModel(nn.Module):
  def __init__(self):
    super().__init__()
    # 28*28
    self.conv2d_fir = CustomConv2D(in_channels = 1, out_channels = 8, kernel_size = (3,3), stride = 1, padding=2,activation = nn.ReLU())
    # 30*30
    self.maxpool_fir = CustomPooling2D(kernel_size = 2, stride = 2, mode = 'max')
    # 15*15
    self.conv2d_sec = CustomConv2D(in_channels = 8, out_channels = 16, kernel_size = (5,5), stride = 1, padding=2,activation = nn.ReLU())
    # 15*15
    self.maxpool_sec = CustomPooling2D(kernel_size = 2, stride = 2, mode = 'max')
    # 16*7*7
    self.flatten = CustomFlattenLayer()
    #
    self.fully_con = CustomFullyConnectedLayer(input_size = 784 , output_size = 10 )
  def forward(self, x):

    x = self.conv2d_fir(x)
    # print(f"Shape of output from first conv2d is {x.shape}")
    x = self.maxpool_fir(x)
    # print(f"Shape of output from first pool is {x.shape}")
    x = self.conv2d_sec(x)
    # print(f"Shape of output from second conv2d is {x.shape}")
    x = self.maxpool_sec(x)
    # print(f"Shape of output from second pool is {x.shape}")
    flat = self.flatten(x)
    # print(f"Shape of output from flatten is {flat.shape}")
    ans = self.fully_con(flat)
    return ans


In [None]:
convModel = ConvModel().to(device)

In [None]:
X_test_hold = X_test.reshape(X_test.shape[0],28,28)
X_test_hold = X_test_hold.unsqueeze(dim=0)
X_test_hold = X_test_hold.permute(1,0,2,3)
X_train_hold = X_train.reshape(X_train.shape[0],28,28)
X_train_hold = X_train_hold.unsqueeze(dim=0)
X_train_hold = X_train_hold.permute(1,0,2,3)


In [None]:
epoch_ls = []
test_ls_conv = [[],[]]
train_ls_conv = [[],[]]


In [None]:
conv_num = 0

In [None]:
from tqdm.notebook import tqdm
epochs = 5000
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(convModel.parameters(), lr=0.01,weight_decay=1e-5)

for epoch in tqdm(range(epochs)):
  convModel.train()
  y_pred = convModel(X_train_hold)
  loss = loss_fn(y_pred,y_train)
  optimizer.zero_grad()
  loss.backward()
  optimizer.step()
  convModel.eval()
  with torch.inference_mode():
    y_test_pred = convModel(X_test_hold)
    loss_test = loss_fn(y_test_pred,y_test)

  if epoch%10 == 0:
    print(f"Epoch = {epoch} Loss = {loss}")
    print(f"Test Loss = {loss_test}")
    epoch_ls.append(epoch)
    test_ls_conv[conv_num].append(loss_test)
    train_ls_conv[conv_num].append(loss)



In [None]:
train_ls_num = [[],[]]
test_ls_num = [[],[]]
train_ls_num[0] = [x.item() for x in train_ls_conv[0]]
test_ls_num[0] = [x.item() for x in test_ls_conv[0]]
train_ls_num[1] = [x.item() for x in train_ls_conv[1]]
test_ls_num[1] = [x.item() for x in test_ls_conv[1]]

In [None]:
len(train_ls_conv[1])

In [None]:

plt.figure(figsize=(10,6))
plt.plot(epoch_ls,train_ls_num[1],label = "Train Loss Model CNN : 1",c='lightgreen')
plt.plot(epoch_ls,test_ls_num[1],label = "Test Loss Model CNN : 1",c='lightcoral')

plt.xlabel("Epochs")
plt.ylabel("Loss")

plt.legend()
plt.show()


In [None]:
# with open("loss_data_CN.py", "w") as f:
#     f.write(f"train_losses = {train_ls_num}\n")
#     f.write(f"test_losses = {test_ls_num}\n")

## Fully Connected layer

In [None]:
class FullyConnected(nn.Module):
  def __init__(self):
    super().__init__()
    self.fcl1 = CustomFullyConnectedLayer(input_size = 784 , output_size = 16 , activation = nn.ReLU())
    self.fcl2 = CustomFullyConnectedLayer(input_size = 16 , output_size = 10 , activation = None)

  def forward(self,  x):
    x = x.view(x.shape[0],-1)
    x = self.fcl1(x)
    x = self.fcl2(x)
    return x




In [None]:
model_full = FullyConnected().to(device)

In [None]:
epoch_ls = []
test_ls_full = [[],[]]
train_ls_full = [[],[]]

In [None]:
conv_num = 1

In [None]:
from tqdm.notebook import tqdm
epochs = 5000
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model_full.parameters(), lr=0.001)

for epoch in tqdm(range(epochs)):
  model_full.train()
  y_pred = model_full(X_train_hold)
  loss = loss_fn(y_pred,y_train)
  optimizer.zero_grad()
  loss.backward()
  optimizer.step()
  model_full.eval()
  with torch.inference_mode():
    y_test_pred = model_full(X_test_hold)
    loss_test = loss_fn(y_test_pred,y_test)

  if epoch%10 == 0:
    print(f"Epoch = {epoch} Loss = {loss}")
    print(f"Test Loss = {loss_test}")
    # epoch_ls.append(epoch)
    test_ls_full[conv_num].append(loss_test)
    train_ls_full[conv_num].append(loss)



In [None]:
test_ls_full1 = [[],[]]
train_ls_full1 = [[],[]]
train_ls_full1[0] = [x.item() for x in train_ls_full[0]]
test_ls_full1[0] = [x.item() for x in test_ls_full[0]]
train_ls_full1[1] = [x.item() for x in train_ls_full[1]]
test_ls_full1[1] = [x.item() for x in test_ls_full[1]]

In [None]:
epoch_ls = [x for x in range(0,5000,10)]

In [None]:

plt.figure(figsize=(10,6))
# plt.plot(epoch_ls,train_ls_full1[0],label = "Train Loss Model MLP : 1",c='lightgreen')
# plt.plot(epoch_ls,test_ls_full1[0],label = "Test Loss Model MLP : 1",c='lightcoral')
plt.ylim([0,10])
plt.plot(epoch_ls,train_ls_full1[1],label = "Train Loss Model MLP : 2",c='darkgreen')
plt.plot(epoch_ls,test_ls_full1[1],label = "Test Loss Model MLP : 2",c='red')
plt.xlabel("Epochs")
plt.ylabel("Loss")

plt.legend()
plt.show()

In [None]:
len(train_ls_full[1])

In [None]:
plt.figure(figsize=(10,6))
# plt.plot(epoch_ls,test_ls_num[0],label = "Test Loss Model CNN : 1",c='green')
plt.plot(epoch_ls,test_ls_num[1],label = "Test Loss Model CNN : 2",c='lightgreen')
# plt.plot(epoch_ls,test_ls_full1[0],label = "Test Loss Model MLP : 1",c='lightcoral')
plt.ylim([0,10])
plt.plot(epoch_ls,test_ls_full1[1],label = "Test Loss Model MLP : 2",c='red')
plt.xlabel("Epochs")
plt.ylabel("Loss")

plt.legend()
plt.show()

In [None]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

In [None]:
count_parameters(model_full)

In [None]:
count_parameters(convModel)

In [None]:
y_pred_cnn = convModel(X_test_hold)
y_pred_full = model_full(X_test_hold)


In [None]:
y_pred_cnn = torch.argmax(y_pred_cnn,dim=1)
y_pred_full = torch.argmax(y_pred_full,dim=1)
y_test_hold = torch.argmax(y_test,dim=1)

In [None]:
y_pred_cnn = y_pred_cnn.detach().cpu()
y_pred_full = y_pred_full.detach().cpu()
y_test_hold = y_test_hold.detach().cpu()

In [None]:
from sklearn.metrics import accuracy_score
accuracy_score(y_pred_cnn,y_test_hold)