# CNNs from scratch using numpy

In [None]:
# import the necessary libraries
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.datasets import fetch_openml
from torchvision import datasets, transforms
import torch

In [138]:
transform = transforms.Compose([
    # Convert image (PIL/Numpy) to PyTorch Tensor
    transforms.ToTensor(),
    # Normalize the pixel values using MNIST's standard mean (0.1307) and std dev (0.3081)
    transforms.Normalize((0.1307,), (0.3081,))
])

# 2. Download and Load DataSets

# Training Dataset (60,000 samples)
mnist_train_dataset = datasets.MNIST(
    root='./data',        # Directory where the data will be downloaded
    train=True,           # Specify training data
    download=True,        # Download if data is not already present
    transform=transform
)

# Testing Dataset (10,000 samples)
mnist_test_dataset = datasets.MNIST(
    root='./data',
    train=False,          # Specify test data
    download=True,
    transform=transform
)

X_train_tensor = torch.cat([data[0].unsqueeze(0) for data in mnist_train_dataset], dim=0)
y_train_tensor = mnist_train_dataset.targets
X_test_tensor = torch.cat([data[0].unsqueeze(0) for data in mnist_test_dataset], dim=0)
y_test_tensor = mnist_test_dataset.targets

# --------------------------------------------------------------------
# 1. Convert Feature Tensors (X) to NumPy
# We also flatten the images from (N, 1, 28, 28) to (N, 784) for ML libraries

limit_img = 1000

X_train_numpy = X_train_tensor.numpy().reshape(X_train_tensor.shape[0], 1, 28, 28)[0:limit_img]
X_test_numpy = X_test_tensor.numpy().reshape(X_test_tensor.shape[0], 1, 28, 28)[0:limit_img]

# 2. Convert Label Tensors (y) to NumPy

y_train_numpy = y_train_tensor.numpy()[0:limit_img]
y_test_numpy = y_test_tensor.numpy()[0:limit_img]

print("--- NumPy Array Shapes ---")
print(f"X_train (Features): {X_train_numpy.shape}")
print(f"y_train (Labels):   {y_train_numpy.shape}")
print(f"X_test (Features):  {X_test_numpy.shape}")
print(f"y_test (Labels):    {y_test_numpy.shape}")

--- NumPy Array Shapes ---
X_train (Features): (1000, 1, 28, 28)
y_train (Labels):   (1000,)
X_test (Features):  (1000, 1, 28, 28)
y_test (Labels):    (1000,)


In [139]:
def he_init(shape):
    """He initialization for ReLU networks."""
    std = np.sqrt(2.0 / np.prod(shape[1:]))
    return np.random.randn(*shape) * std

In [140]:
class Conv2D:

    def __init__(self, C_in, C_out, kernel_size, stride=1, padding=0):
        self.C_in = C_in
        self.C_out = C_out
        self.kernel_size = kernel_size
        self.stride = stride
        self.padding = padding

        self.W = he_init((C_out, C_in, kernel_size, kernel_size))
        self.b = np.zeros((C_out,))

        self.dW = np.zeros_like(self.W)
        self.db = np.zeros_like(self.b)

    def forward(self, X):
        # X shape: (N, C_in, H_in, W_in)
        self.X = X
        N, C_in, H_in, W_in = X.shape
        kH, kW = self.kernel_size, self.kernel_size
        S = self.stride
        P = self.padding

        if P > 0:
            X_padded = np.pad(X, ((0, 0), (0, 0), (P, P), (P, P)), mode='constant')
        else:
            X_padded = X
    
        H_pad, W_pad = X_padded.shape[2], X_padded.shape[3]
        H_out = (H_pad - kH) // S + 1
        W_out = (W_pad - kW) // S + 1

        out = np.zeros((N, self.C_out, H_out, W_out))

        for n in range(N):
            for c_out in range(self.C_out):
                for h in range(H_out):
                    for w in range(W_out):
                        h_start = h * S
                        h_end = h_start + kH
                        w_start = w * S
                        w_end = w_start + kW

                        region = X_padded[n, :, h_start:h_end, w_start:w_end] # C_in, kH, kW
                        out[n, c_out, h, w] = np.sum(region * self.W[c_out]) + self.b[c_out]
                        
        self.x_padded = X_padded
        return out
    
    def backward(self, dout):
        X = self.X
        N, C_in, H_in, W_in = X.shape
        kH, kW = self.kernel_size, self.kernel_size
        S = self.stride
        P = self.padding

        N, C_out, H_out, W_out = dout.shape

        self.dW.fill(0)
        self.db.fill(0)
        dX_padded = np.zeros_like(self.x_padded)

        for n in range(N):
            for c_out in range(C_out):
                for h in range(H_out):
                    for w in range(W_out):
                        
                        h_start = h * S
                        h_end = h_start + kH
                        w_start = w * S
                        w_end = w_start + kW

                        region = self.x_padded[n, :, h_start:h_end, w_start:w_end] # C_in, kH, kW

                        self.dW[c_out] += region * dout[n, c_out, h, w]
                        self.db[c_out] += dout[n, c_out, h, w]
                        dX_padded[n, :, h_start:h_end, w_start:w_end] += self.W[c_out] * dout[n, c_out, h, w]

        if P > 0:
            dX = dX_padded[:, :, P:-P, P:-P]
        else:
            dX = dX_padded
        


In [141]:
class Relu:
    def forward(self, X):
        self.mask = (X>0)
        return X * self.mask
    def backward(self, dout):
        return dout * self.mask

In [142]:
class MaxPool2D:

    def __init__(self, pool_size, stride):
        self.pool_size = pool_size
        self.stride = stride

    def forward(self, X):
        # X shape: (N, C, H, W)
        self.X = X  # Store input for backward pass
        N, C, H, W = X.shape
        pH, pW = self.pool_size, self.pool_size
        S = self.stride

        H_out = (H - pH) // S + 1
        W_out = (W - pW) // S + 1

        self.max_indices = {}
        self.out_shape = (N, C, H_out, W_out)

        out = np.zeros((N, C, H_out, W_out))
        for n in range(N):
            for c in range(C):
                for h in range(H_out):
                    for w in range(W_out):
                        h_start = h * S
                        h_end  = h_start + pH
                        w_start = w * S
                        w_end  = w_start + pW

                        region = X[n, c, h_start:h_end, w_start:w_end]
                        out[n, c, h, w] = np.max(region)
                        idx = np.argmax(region)
                        self.max_indices[(n, c, h, w)] = (int(h_start + idx // pW), int(w_start + idx % pW))
        return out
    
    def backward(self, dout):
        N, C, H, W = self.X.shape
        pH, pW = self.pool_size, self.pool_size
        S = self.stride
        
        dx = np.zeros_like(self.X)

        _, _, H_out, W_out = dout.shape
        for n in range(N):
            for c in range(C):
                for h in range(H_out):
                    for w in range(W_out):
                        grad = dout[n, c, h, w]
                        h_idx, w_idx = self.max_indices[(n, c, h, w)]
                        dx[n, c, h_idx, w_idx] += grad

        return dx


In [143]:
class Flatten:
    def forward(self, X):
        self.input_shape = X.shape
        N = X.shape[0]
        return X.reshape(N, -1)
    
    def _get_flattened_size(self):
        return np.prod(self.input_shape[1:])
    
    def backward(self, dout):
        return dout.reshape(self.input_shape)

In [144]:
class Linear:
    def __init__(self, input_dim, output_dim):
        self.W = he_init((input_dim, output_dim))
        self.b = np.zeros((output_dim,))
    
    def forward(self, X):
        self.X = X
        return np.dot(X, self.W) + self.b
    
    def backward(self, dout):
        self.dW = np.dot(self.X.T, dout)
        self.db = np.sum(dout, axis=0)
        dx = np.dot(dout, self.W.T)
        return dx

In [145]:
class SoftMaxCrossEntropy:
    def forward(self, X, y):
        exps = np.exp(X - np.max(X, axis=1, keepdims=True))
        probs = exps / np.sum(exps, axis=1, keepdims=True)
        N = X.shape[0]
        correct_logprobs = -np.log(probs[range(N), y] + 1e-10)
        loss = np.sum(correct_logprobs) / N
        self.probs = probs
        self.y = y
        return loss, probs
    
    def backward(self, y=None):
        if y is None:
            y = self.y
        probs = self.probs.copy()
        N = probs.shape[0]
        probs[range(N), y] -= 1
        probs /= N
        return probs

In [146]:
class SimpleCNN:

    def __init__(self, C_in, C_out, kernel_size, pool_size, stride, padding, num_classes):
        self.C_in = C_in
        self.C_out = C_out
        self.kernel_size = kernel_size
        self.pool_size = pool_size
        self.stride = stride
        self.padding = padding
        self.num_classes = num_classes

        self.conv = Conv2D(C_in, C_out, kernel_size, stride, padding)
        self.relu = Relu()
        self.pool = MaxPool2D(pool_size, stride)
        self.flatten = Flatten()
        
        # Calculate flattened size: (28, 28) -> conv -> pool -> flatten
        # After conv: H_out = (28 + 2*1 - 3) / 1 + 1 = 28
        # After pool with stride=1: H_out = (28 - 2) / 1 + 1 = 27
        # Flattened: C_out * 27 * 27 = 16 * 729 = 11664
        H_after_conv = (28 + 2 * padding - kernel_size) // stride + 1
        H_after_pool = (H_after_conv - pool_size) // stride + 1
        flattened_size = C_out * H_after_pool * H_after_pool
        
        self.fc = Linear(flattened_size, num_classes)
        self.criterion = SoftMaxCrossEntropy()

        self.params = [self.conv, self.fc]

    def forward(self, X, y):
        out = self.conv.forward(X)
        out = self.relu.forward(out)
        out = self.pool.forward(out)
        out = self.flatten.forward(out)
        logits = self.fc.forward(out)

        if y is None:
            return logits
        
        loss, probs = self.criterion.forward(logits, y)
        return loss, probs

    def backward(self):
        dlogits = self.criterion.backward(self.y)
        dout = self.fc.backward(dlogits)
        dout = self.flatten.backward(dout)
        dout = self.pool.backward(dout)
        dout = self.relu.backward(dout)
        dx = self.conv.backward(dout)

        return dx

    def step(self, learning_rate=1e-3):
        for layer in self.params:
            layer.W -= learning_rate * layer.dW
            layer.b -= learning_rate * layer.db

In [147]:
def predict(model, X):
    logits = model.forward(X, None)
    return np.argmax(logits, axis=1)

In [148]:
def train(model, X_train, y_train, epochs = 5, batch_size = 32, learning_rate = 1e-3):
    num_samples = X_train.shape[0]
    num_batches = int(np.ceil(num_samples / batch_size))

    for epoch in range(epochs):
        epoch_loss = 0.0
        correct_predictions = 0
        total_predictions = 0
        
        for batch_idx in range(num_batches):
            start = batch_idx * batch_size
            end = min(start + batch_size, num_samples)
            X_batch = X_train[start:end]
            y_batch = y_train[start:end]

            loss, _ = model.forward(X_batch, y_batch)
            model.y = y_batch  # Store y for backward pass
            epoch_loss += loss

            # Calculate accuracy for this batch
            predictions = predict(model, X_batch)
            correct_predictions += np.sum(predictions == y_batch)
            total_predictions += y_batch.shape[0]

            model.backward()
            model.step(learning_rate)
        
        avg_loss = epoch_loss / num_batches
        accuracy = correct_predictions / total_predictions
        print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}, Accuracy: {accuracy:.4f}")

In [149]:
C_in = 1
C_out = 16
kernel_size = 3
pool_size = 2
stride = 1
padding = 1
num_classes = 10
model = SimpleCNN(C_in, C_out, kernel_size, pool_size, stride, padding, num_classes)

In [150]:
train(model, X_train_numpy, y_train_numpy, epochs = 15, batch_size = 20, learning_rate = 1e-3)


Epoch 1/15, Loss: 18.0136, Accuracy: 0.1210
Epoch 2/15, Loss: 13.2340, Accuracy: 0.2110
Epoch 2/15, Loss: 13.2340, Accuracy: 0.2110
Epoch 3/15, Loss: 9.8150, Accuracy: 0.2960
Epoch 3/15, Loss: 9.8150, Accuracy: 0.2960
Epoch 4/15, Loss: 7.9511, Accuracy: 0.3580
Epoch 4/15, Loss: 7.9511, Accuracy: 0.3580
Epoch 5/15, Loss: 6.7290, Accuracy: 0.3740
Epoch 5/15, Loss: 6.7290, Accuracy: 0.3740
Epoch 6/15, Loss: 5.8548, Accuracy: 0.3910
Epoch 6/15, Loss: 5.8548, Accuracy: 0.3910
Epoch 7/15, Loss: 5.2021, Accuracy: 0.4120
Epoch 7/15, Loss: 5.2021, Accuracy: 0.4120
Epoch 8/15, Loss: 4.6610, Accuracy: 0.4180
Epoch 8/15, Loss: 4.6610, Accuracy: 0.4180
Epoch 9/15, Loss: 4.2015, Accuracy: 0.4250
Epoch 9/15, Loss: 4.2015, Accuracy: 0.4250
Epoch 10/15, Loss: 3.8296, Accuracy: 0.4240
Epoch 10/15, Loss: 3.8296, Accuracy: 0.4240
Epoch 11/15, Loss: 3.5268, Accuracy: 0.4240
Epoch 11/15, Loss: 3.5268, Accuracy: 0.4240
Epoch 12/15, Loss: 3.2570, Accuracy: 0.4340
Epoch 12/15, Loss: 3.2570, Accuracy: 0.4340
Ep