# Torch code
Used to learn the weights and biases

## Training the weights and biases

In [None]:
# CNN quantized to INT8

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader

torch.manual_seed(42)

class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=5, stride=1, padding=2)
        self.conv2 = nn.Conv2d(32, 32, kernel_size=3, stride=2, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=3, stride=2)
        self.fc1 = nn.Linear(1152, 128)
        self.fc2 = nn.Linear(128, 10)
        self.relu = nn.ReLU()
        self.softmax = nn.Softmax(dim=1)
        self.quant = torch.quantization.QuantStub()
        self.dequant = torch.quantization.DeQuantStub()


    def forward(self, x):
        x = self.quant(x)
        x = self.relu(self.conv1(x))
        x = self.pool(self.relu(self.conv2(x)))
        x = torch.flatten(x, 1)
        x = self.relu(self.fc1(x))
        x = self.fc2(x)
        x = self.dequant(x)
        x = self.softmax(x)

        return x

# Load the MNIST dataset
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))
])

train_dataset = torchvision.datasets.MNIST(root='./data', train=True, download=True, transform=transform)
test_dataset = torchvision.datasets.MNIST(root='./data', train=False, download=True, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# Model, Loss, and Optimizer
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CNN().to(device)
model.qconfig = torch.ao.quantization.default_qconfig
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

model_quantized = torch.ao.quantization.prepare_qat(model,inplace=False)

# Training loop
epochs = 5
for epoch in range(epochs):
    model.train()
    running_loss = 0.0
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
    print(f"Epoch {epoch+1}, Loss: {running_loss/len(train_loader):.4f}")

model_quantized = torch.ao.quantization.convert(model_quantized)
model_quantized

# Testing the model
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f"Test Accuracy: {100 * correct / total:.2f}%")


100%|██████████| 9.91M/9.91M [00:00<00:00, 14.7MB/s]
100%|██████████| 28.9k/28.9k [00:00<00:00, 482kB/s]
100%|██████████| 1.65M/1.65M [00:00<00:00, 4.47MB/s]
100%|██████████| 4.54k/4.54k [00:00<00:00, 5.94MB/s]


Epoch 1, Loss: 1.5589
Epoch 2, Loss: 1.4825
Epoch 3, Loss: 1.4793
Epoch 4, Loss: 1.4769
Epoch 5, Loss: 1.4756




Test Accuracy: 98.85%


In [None]:
fcl_weights=[torch.int_repr(model_quantized.fc1.weight().cpu()).numpy(),torch.int_repr(model_quantized.fc2.weight().cpu()).numpy()]
fcl_weights

[array([[ -95,   28,   -5, ...,  -94,   62, -117],
        [  11, -104,  -63, ...,   87,   15,  114],
        [ -30,  118,  126, ...,  -89,   -9,  -39],
        ...,
        [ -82,   11,   36, ...,  -93,  -26,  -69],
        [  54,   74,   46, ..., -123,   82,  125],
        [  71,  126,   -1, ...,   27,  -60,  -52]], dtype=int8),
 array([[-120,   -5,  -15, ...,   36,   23, -124],
        [ -37,   23,  -15, ...,  -87,  -27,   15],
        [ -58,   33,   42, ...,   24,   36, -124],
        ...,
        [ 106,  -77,   58, ..., -104,  -96,  -38],
        [-123,  -49, -122, ...,   72,  -85,   34],
        [ -82,  -85,   16, ...,   62,  -71,  105]], dtype=int8)]

In [None]:
import numpy as np
biases=[model_quantized.fc1.bias().detach().cpu().numpy(),model_quantized.fc2.bias().detach().cpu().numpy()]
fcl_biases=[np.int8(biases[0]*100),np.int8(biases[1]*100)]
fcl_biases

[array([ 2,  0,  0,  0,  0,  0,  2, -1,  1,  1,  0,  2,  2,  1, -2, -1, -1,
        -1,  0,  0,  1,  0,  2,  2,  0, -1, -2,  0, -1, -2, -2,  0,  0,  0,
         0,  1, -2,  1, -2, -2,  1,  0,  1, -2,  0, -1,  1,  1,  0, -1,  1,
         0,  2,  2, -2, -1,  2,  0,  0,  2,  2,  1,  2,  0,  1,  1,  0, -1,
         1, -2,  0, -2,  2, -2, -1,  1,  2,  2,  2,  0,  0,  0,  1,  2, -1,
         0, -2,  1,  0,  0,  1,  2,  2, -1,  0,  2, -2,  1,  2,  1, -1, -1,
         2,  1,  1, -2,  2, -2,  1,  0, -2, -1,  0, -2,  1, -2,  0, -1,  0,
        -2,  0, -1,  1, -2, -2,  1,  0,  2], dtype=int8),
 array([ 3,  1, -1,  4, -1,  2, -8, -2, -1,  8], dtype=int8)]

In [None]:
conv_weights=[torch.int_repr(model_quantized.conv1.weight().cpu()).numpy(),torch.int_repr(model_quantized.conv2.weight().cpu()).numpy()]
conv_weights

[array([[[[  98,  106,  -30,  117,  -28],
          [  26,  -62,   75,  112,  -94],
          [ 111,   24,   94,   17,   62],
          [ -18,   98,   19,  -60,   33],
          [ -59,  -15,  -52,   85, -101]],
 
         [[   0,    0,    0,    0,    0],
          [   0,    0,    0,    0,    0],
          [   0,    0,    0,    0,    0],
          [   0,    0,    0,    0,    0],
          [   0,    0,    0,    0,    0]],
 
         [[   0,    0,    0,    0,    0],
          [   0,    0,    0,    0,    0],
          [   0,    0,    0,    0,    0],
          [   0,    0,    0,    0,    0],
          [   0,    0,    0,    0,    0]],
 
         [[   0,    0,    0,    0,    0],
          [   0,    0,    0,    0,    0],
          [   0,    0,    0,    0,    0],
          [   0,    0,    0,    0,    0],
          [   0,    0,    0,    0,    0]]],
 
 
        [[[ -59,  -36,  -77,   12, -126],
          [ 115, -108,   98,   21,  -41],
          [  79,   20,  103,   14,  -40],
          [  34,  -

In [None]:
biases=[model_quantized.conv1.bias().detach().cpu().numpy(),model_quantized.conv2.bias().detach().cpu().numpy()]
conv_biases=[np.int8(biases[0]*100),np.int8(biases[1]*100)]
conv_biases

[array([ -4,   7, -17,  11,  -6, -15,   2,   9,  17, -12,  -9,   3,   5,
          7,  10,  -4,  11,  14,  -5,  14,   9,  17, -11,  -9,  -5,   1,
         12, -11,  10,  -7,  12,   0], dtype=int8),
 array([-4, -1, -5,  2, -2, -2,  0, -1,  4,  5, -5,  2,  0,  3, -2,  0,  0,
         2,  2,  2,  0, -4, -3, -3, -5,  5, -2, -3, -2,  3, -3, -4],
       dtype=int8)]

In [None]:
# non quantized cnn

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader

torch.manual_seed(42)

# Define the CNN model
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=5, stride=1, padding=2)
        self.conv2 = nn.Conv2d(32, 32, kernel_size=3, stride=2, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=3, stride=2)
        self.fc1 = nn.Linear(1152, 128)
        self.fc2 = nn.Linear(128, 10)
        self.relu = nn.ReLU()
        self.softmax = nn.Softmax(dim=1)


    def forward(self, x):
        x = self.relu(self.conv1(x))
        x = self.pool(self.relu(self.conv2(x)))
        x = torch.flatten(x, 1)
        x = self.relu(self.fc1(x))
        x = self.softmax(self.fc2(x))

        return x

# Load the MNIST dataset
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))
])

train_dataset = torchvision.datasets.MNIST(root='./data', train=True, download=True, transform=transform)
test_dataset = torchvision.datasets.MNIST(root='./data', train=False, download=True, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# Model, Loss, and Optimizer
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CNN().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
epochs = 5
for epoch in range(epochs):
    model.train()
    running_loss = 0.0
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
    print(f"Epoch {epoch+1}, Loss: {running_loss/len(train_loader):.4f}")

# Testing the model
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f"Test Accuracy: {100 * correct / total:.2f}%")

Epoch 1, Loss: 1.6354
Epoch 2, Loss: 1.5186
Epoch 3, Loss: 1.4794
Epoch 4, Loss: 1.4768
Epoch 5, Loss: 1.4739
Test Accuracy: 98.30%


# Implementation in vanilla Python

## Sequential inference

In [None]:
import math

In [None]:
def mat_vec_mul(matrix, vector):
    result = []
    for row in matrix:
        dot = 0
        for i in range(len(vector)):
            dot += row[i] * vector[i]
        result.append(dot)
    return result

In [None]:
def vec_add(vec1, vec2):
    if len(vec1) != len(vec2):
        return 0

    sum_ = [0]*len(vec1)
    for i in range(len(vec1)):
        sum_[i] = vec1[i] + vec2[i]
    return sum_

In [None]:
def relu(vector):
    return [x if x > 0 else 0 for x in vector]

In [None]:
def softmax(vector):
    vector = [float(x) for x in vector]
    max_val = max(vector)
    exps = [math.exp(x - max_val) for x in vector] # numerical stability
    sum_exps = sum(exps)
    return [e / sum_exps for e in exps]

In [None]:
def forward(x, weights, biases):
    # fc1 = ReLU(W1 * x + b1)
    fc1_linear = vec_add(mat_vec_mul(weights[0], x), biases[0])
    fc1_activated = relu(fc1_linear)

    # fc2 = softmax(W2 * fc1_activated + b2)
    fc2_linear = vec_add(mat_vec_mul(weights[1], fc1_activated), biases[1])
    output = softmax(fc2_linear)

    return output # list of size 10 containing probabilities

In [None]:
fcl_weights, type(fcl_weights)

([array([[ -95,   28,   -5, ...,  -94,   62, -117],
         [  11, -104,  -63, ...,   87,   15,  114],
         [ -30,  118,  126, ...,  -89,   -9,  -39],
         ...,
         [ -82,   11,   36, ...,  -93,  -26,  -69],
         [  54,   74,   46, ..., -123,   82,  125],
         [  71,  126,   -1, ...,   27,  -60,  -52]], dtype=int8),
  array([[-120,   -5,  -15, ...,   36,   23, -124],
         [ -37,   23,  -15, ...,  -87,  -27,   15],
         [ -58,   33,   42, ...,   24,   36, -124],
         ...,
         [ 106,  -77,   58, ..., -104,  -96,  -38],
         [-123,  -49, -122, ...,   72,  -85,   34],
         [ -82,  -85,   16, ...,   62,  -71,  105]], dtype=int8)],
 list)

In [None]:
py_fcl_weights = [w.tolist() for w in fcl_weights]
py_fcl_biases = [b.tolist() for b in fcl_biases]

In [None]:
len(py_fcl_biases), len(py_fcl_biases[0]), len(py_fcl_biases[1])

(2, 128, 10)

In [None]:
# Sample run
import random
dummy_input = [random.randint(-128, 127) for _ in range(1152)] # cnn outputs a vector of size 1152

probabilities = forward(dummy_input, py_fcl_weights, py_fcl_biases)

print("Output probabilities:")
for i, prob in enumerate(probabilities):
    print(f"Class {i}: {prob:.4f}")

print("-----------------------")

class_ = 0
for i in range(len(probabilities)):
    if probabilities[i] > probabilities[class_]:
        class_ = i

print(f"Predicted class: {class_}")

Output probabilities:
Class 0: 0.0000
Class 1: 1.0000
Class 2: 0.0000
Class 3: 0.0000
Class 4: 0.0000
Class 5: 0.0000
Class 6: 0.0000
Class 7: 0.0000
Class 8: 0.0000
Class 9: 0.0000
-----------------------
Predicted class: 1


## Single input parallelized inference

In [None]:
# we can parallelize the inference by using multiprocessing for matrix multiplication

from multiprocessing import Pool

def dot_product(vector1, vector2):
    dot = 0
    for i in range(len(vector1)):
        dot += vector1[i] * vector2[i]
    return dot

In [None]:
def mat_vec_mul_2(matrix, vector, num_workers = None):
    args = [(row, vector) for row in matrix]
    with Pool(processes=num_workers) as pool:
        result = pool.map(dot_product, args)
    return result

## Batch of inputs parallelization

In [None]:
# we can parallelize inference in case of a batch of inputs