# Convolutional Neural Networks (CNN)

## Setup

In [2]:
import torch
from torch import nn
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, TensorDataset
import torch.optim as optim
import torch.nn.functional as F

from src.data import DataLoaderScratch
from src.trainer import TrainerScratch
from src.optimizers import SGDScratch

## Load Data

In [3]:
transform = transforms.Compose(
    [transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))]
)
mnist_trainset = datasets.MNIST(
    root="./data", train=True, download=True, transform=transform
)
mnist_testset = datasets.MNIST(
    root="./data", train=False, download=True, transform=transform
)

In [4]:
# Transform the training data
X_train = mnist_trainset.data.float() / 255.0
X_train = X_train.reshape(X_train.shape[0], -1)
y_train = mnist_trainset.targets

# Transform the test data
X_val = mnist_testset.data.float() / 255.0
X_val = X_val.reshape(X_val.shape[0], -1)
y_val = mnist_testset.targets

train_dataloader = DataLoaderScratch(X_train, y_train, batch_size=256, shuffle=True)
val_dataloader = DataLoaderScratch(X_val, y_val, batch_size=256, shuffle=False)

## Convolution Layer

### Defining a Simple Example

Let's start by creating a very simple simple 4x4 greyscale image with an edge down the middle.

In [5]:
input = torch.tensor([
    [10, 10, 10, -10, -10, -10],
    [10, 10, 10, -10, -10, -10],
    [10, 10, 10, -10, -10, -10],
    [10, 10, 10, -10, -10, -10],
    [10, 10, 10, -10, -10, -10],
    [10, 10, 10, -10, -10, -10],
], dtype=torch.float)

print(input)
print(input.shape)

tensor([[ 10.,  10.,  10., -10., -10., -10.],
        [ 10.,  10.,  10., -10., -10., -10.],
        [ 10.,  10.,  10., -10., -10., -10.],
        [ 10.,  10.,  10., -10., -10., -10.],
        [ 10.,  10.,  10., -10., -10., -10.],
        [ 10.,  10.,  10., -10., -10., -10.]])
torch.Size([6, 6])


Now let's create an edge detection filter.

In [6]:
filter = torch.tensor([
    [1, 0, -1],
    [1, 0, -1],
    [1, 0, -1]
], dtype=torch.float)

print(filter)
print(filter.shape)

tensor([[ 1.,  0., -1.],
        [ 1.,  0., -1.],
        [ 1.,  0., -1.]])
torch.Size([3, 3])


### Implementation using For Loops

In [7]:
padding = 0
stride = 1

# Add zero padding to the input tensor
input_padded = torch.nn.functional.pad(input, (padding, padding, padding, padding), "constant", 0)

input_height, input_width = input_padded.shape
filter_height, filter_width = filter.shape

# Calculate the dimensions of the output tensor
output_height = ((input_height - filter_height) // stride) + 1
output_width = ((input_width - filter_width) // stride) + 1
output = torch.zeros((output_height, output_width))

for i in range(0, output_height):
    for j in range(0, output_width):
        # Apply the filter
        output[i, j] = torch.sum(
            input_padded[i*stride:i*stride+filter_height, j*stride:j*stride+filter_width] * filter)
    
print(output)
print(output.shape)

tensor([[ 0., 60., 60.,  0.],
        [ 0., 60., 60.,  0.],
        [ 0., 60., 60.,  0.],
        [ 0., 60., 60.,  0.]])
torch.Size([4, 4])


### Vectorized Implementation

We reshape the input and filter to simulate an example with added dimensions for the images, channels and filters.
For the inputs, we want our new dimensions to be $(batch \_ size, channels, input \_ height, input\_width)$.
For the filters, we want our new dimensions to be $(out \_ channels, in \_ channels, input \_ height, input\_width)$.

In [8]:
input = input.unsqueeze(0).unsqueeze(0)
filter = filter.unsqueeze(0).unsqueeze(0)

print("inputs shape:", input.shape)
print("filters shape:", filter.shape)

inputs shape: torch.Size([1, 1, 6, 6])
filters shape: torch.Size([1, 1, 3, 3])


The resulting shapes indicate that we have 
- 1 image with 1 color channel of size 6x6
- 1 filter with 1 color channel of size 3x3.

We can use **unfold** to take each filter position in the image, flatten it and stack it horizontally in a tensor. This way, each row contains a vector of the filter position.

In [9]:
padding = 0
stride = 1

batch_size, in_channels, input_height, input_width = input.shape
out_channels, _, filter_height, filter_width = filter.shape

# Padding
input_padded = F.pad(input, (padding, padding, padding, padding))

# Calculate output dimensions
output_height = (input_height + 2*padding - filter_height) // stride + 1
output_width = (input_width + 2*padding - filter_width) // stride + 1

# Unfold input to get sliding windows
input_unfolded = F.unfold(input_padded, (filter_height, filter_width), stride=stride).transpose(1, 2)

print("original input: ")
print(input)
print(input.shape)
print("unfolded input:")
print(input_unfolded)
print(input_unfolded.shape)

original input: 
tensor([[[[ 10.,  10.,  10., -10., -10., -10.],
          [ 10.,  10.,  10., -10., -10., -10.],
          [ 10.,  10.,  10., -10., -10., -10.],
          [ 10.,  10.,  10., -10., -10., -10.],
          [ 10.,  10.,  10., -10., -10., -10.],
          [ 10.,  10.,  10., -10., -10., -10.]]]])
torch.Size([1, 1, 6, 6])
unfolded input:
tensor([[[ 10.,  10.,  10.,  10.,  10.,  10.,  10.,  10.,  10.],
         [ 10.,  10., -10.,  10.,  10., -10.,  10.,  10., -10.],
         [ 10., -10., -10.,  10., -10., -10.,  10., -10., -10.],
         [-10., -10., -10., -10., -10., -10., -10., -10., -10.],
         [ 10.,  10.,  10.,  10.,  10.,  10.,  10.,  10.,  10.],
         [ 10.,  10., -10.,  10.,  10., -10.,  10.,  10., -10.],
         [ 10., -10., -10.,  10., -10., -10.,  10., -10., -10.],
         [-10., -10., -10., -10., -10., -10., -10., -10., -10.],
         [ 10.,  10.,  10.,  10.,  10.,  10.,  10.,  10.,  10.],
         [ 10.,  10., -10.,  10.,  10., -10.,  10.,  10., -10.],
 

As you can see in the result, we have each 3x3 filter position stacked row-wise in the output. Now we want to multiply each row by the filter and we should have an efficient implementation of convolution. 

For each image we have a matrix for the input and for the filters. So, to do the matrix multiplication we will use torch.bmm which performs a batch matrix-matrix product of matrices. 

For each individual image, our input shape is $(filter \_ positions, filter \_ height * filter \_ width)$. We want to matrix multiply each input image with the filter that is $(filter \_ height, filter \_ width)$. Therefore, we want to reshape the filter to make it shape $(filter \_ height * filter \_ width, 1)$. In this way, we get a valid matrix multiplication.

In [10]:
# Reshape filter for batch matrix multiplication
filter_reshaped = filter.view(out_channels, -1).transpose(0, 1).unsqueeze(0)
print(filter_reshaped)
print(filter_reshaped.shape)

tensor([[[ 1.],
         [ 0.],
         [-1.],
         [ 1.],
         [ 0.],
         [-1.],
         [ 1.],
         [ 0.],
         [-1.]]])
torch.Size([1, 9, 1])


In [11]:
# Perform batch matrix multiplication, ensuring both tensors are 3D
output = torch.bmm(input_unfolded, filter_reshaped.repeat(batch_size, 1, 1))

# Reshape to get the final output
output = output.transpose(1, 2).view(batch_size, out_channels, output_height, output_width)

print(output)
print(output.shape)

tensor([[[[ 0., 60., 60.,  0.],
          [ 0., 60., 60.,  0.],
          [ 0., 60., 60.,  0.],
          [ 0., 60., 60.,  0.]]]])
torch.Size([1, 1, 4, 4])


The result is a correct convolution of the the image.

### Checking the Implementation

Let's put all the code into a function.

In [126]:
def conv2d(input, filter, padding=0, stride=1):
    batch_size, in_channels, input_height, input_width = input.shape
    out_channels, _, filter_height, filter_width = filter.shape

    # Padding
    input_padded = F.pad(input, (padding, padding, padding, padding))
    
    # Calculate output dimensions
    output_height = (input_height + 2*padding - filter_height) // stride + 1
    output_width = (input_width + 2*padding - filter_width) // stride + 1

    # Unfold input to get sliding windows
    input_unfolded = F.unfold(input_padded, (filter_height, filter_width), stride=stride).transpose(1, 2)

    # Reshape filter for batch matrix multiplication
    filter_reshaped = filter.view(out_channels, -1).transpose(0, 1).unsqueeze(0)

    # Perform batch matrix multiplication, ensuring both tensors are 3D
    output = torch.bmm(input_unfolded, filter_reshaped.repeat(batch_size, 1, 1))

    # Reshape to get the final output
    output = output.transpose(1, 2).view(batch_size, out_channels, output_height, output_width)

    return output


We test the function and see whether it's result is the same as PyTorch's built in function.

In [127]:
# Initialize the same input and filters as for your custom conv2d function
input = torch.randn(2, 3, 8, 8)  # batch_size=2, in_channels=3, height=8, width=8
filter = torch.randn(4, 3, 3, 3)  # out_channels=4, in_channels=3, height=3, width=3

# Custom conv2d function call
output_custom = conv2d(input, filter, padding=0, stride=1)

# Using PyTorch's built-in Conv2d
conv_layer = nn.Conv2d(in_channels=3, out_channels=4, kernel_size=3, stride=1, padding=0, bias=False)
# Manually set the weights of the Conv2d layer to match our filters
conv_layer.weight.data = filter

# Compute the output using the built-in Conv2d
output_builtin = conv_layer(input)

# Check if the outputs are close enough
are_close = torch.allclose(output_custom, output_builtin, atol=1e-6)
print(f"Are the custom and built-in conv2d outputs close? {are_close}")

Are the custom and built-in conv2d outputs close? True


## Data Loading and Preprocessing

In [14]:
transform = transforms.Compose(
    [transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))]
)
mnist_trainset = datasets.MNIST(
    root="./data", train=True, download=True, transform=transform
)
mnist_testset = datasets.MNIST(
    root="./data", train=False, download=True, transform=transform
)

# Transform the training data
X_train = mnist_trainset.data.float() / 255.0
# Add single dimension for the input channel
X_train = X_train.unsqueeze(1)
y_train = mnist_trainset.targets

# Transform the test data
X_val = mnist_testset.data.float() / 255.0
# Add single dimension for the input channel
X_val = X_val.unsqueeze(1)
y_val = mnist_testset.targets

train_dataloader = DataLoaderScratch(X_train, y_train, batch_size=256, shuffle=True)
val_dataloader = DataLoaderScratch(X_val, y_val, batch_size=256, shuffle=False)

### Single Batch Iteration

In [128]:
# Create a batch
batch_size = 128
perm = torch.randperm(len(X_train))
X_batch = X_train[perm][:batch_size]
y_batch = y_train[perm][:batch_size]

batch_size, in_channels, input_height, input_width = X_batch.shape
filter_size = 3
out_channels = 10 # Number of filters
num_classes = y_train.unique().shape[0]

# Initialize the conv layer weightsa and add a single channel dimension
W1 = nn.Parameter(torch.randn(out_channels, in_channels, filter_size, filter_size) * 0.01)
b1 = nn.Parameter(torch.zeros(size=(1, out_channels, 1, 1)))
same_padding = int((filter_size - 1)/2)

# Initialize the fc layer weights
W2 = nn.Parameter(torch.randn(size=(out_channels * input_height * input_width, num_classes)) * 0.01)
b2 = nn.Parameter(torch.zeros(num_classes))

parameters = [W1, b1, W2, b2]
optimizer = SGDScratch(parameters, lr=0.1)

def relu(x):
    out = torch.maximum(x, torch.zeros(1))
    return out

def softmax(X):
    X_exp = torch.exp(X)
    X_softmax = X_exp / X_exp.sum(axis=1, keepdims=True)
    return X_softmax

# Define the log-loss
def log_loss(y_pred, y):
    y_one_hot = nn.functional.one_hot(y)
    loss = -(y_one_hot * torch.log(y_pred)).sum(axis=1).mean()
    return loss

In [129]:
# Zero gradients
optimizer.zero_grad()

# Forward pass
Z1 = conv2d(X_batch, W1, padding=padding) + b1
A1 = relu(Z1)
Z2 = A1.flatten(1) @ W2 + b2
y_pred = softmax(Z2)

# Calculate Loss
loss = log_loss(y_pred, y_batch)

# Compute gradients
loss.backward()

# Update parameters
optimizer.step()

In [130]:
loss

tensor(2.3040, grad_fn=<NegBackward0>)