In [2]:
import torch
import torchvision
import torch.nn as nn

# Data

In [3]:
transforms = torchvision.transforms.Compose([
    torchvision.transforms.Resize(30),
    torchvision.transforms.ToTensor()
])

In [4]:
train_data = torchvision.datasets.CIFAR10("datasets", download=True, transform=transforms)
train_loader = torch.utils.data.DataLoader(train_data, batch_size=32, shuffle=True)

Files already downloaded and verified


# Cross-correlation from scratch

In [5]:
# The corr2d only corresponds to one channel of the input.
# We an already implement padding, and stride to our function

# Due to laziness padding and stride are ints and are applied symmetrically

def corr2d(X, K, padding=1, stride=1):
    # We assume 2 dimensional X and K

    X = torch.nn.functional.pad(X, (padding,padding,padding,padding))

    h, w = K.shape

    out_h = ((X.shape[0] - h)//stride) + 1
    out_w = ((X.shape[1] - h)//stride) + 1

    Y = torch.zeros((out_h, out_w))
    for i in range(Y.shape[0]):
        for j in range(Y.shape[1]):
            Y[i, j] = ( X[ (i*stride) : (i*stride) + h, (j*stride) : (j*stride) + w] * K ).sum()
    return Y

In [6]:
# Now for multiple channels this can be expanded 

def corr2d_multi_channels(X, K, padding=1, stride=1):
    return sum(corr2d(x, k, padding, stride) for x, k in zip(X,K))

In [7]:
# Lastly, the output an also have multiple channels hence K has 4 dimensions:
# Output Channel x Input channel x height x width

def corr2d_multi_in_out(X, K, padding=1, stride=1):
    return torch.stack([corr2d_multi_channels(X, k, padding, stride) for k in K], 0)

In [8]:
def conv_batch(X, K, padding=1, stride=1):
    return torch.stack([corr2d_multi_in_out(x,K, padding, stride) for x in X], 0)

In [9]:
class Conv2d(nn.Module):
    def __init__(self, out_channel, in_channel, kernel_size=(3,3), padding=1, stride=1):
        super(Conv2d, self).__init__()
        self.kernel = torch.nn.parameter.Parameter(torch.rand((out_channel,in_channel, kernel_size[0], kernel_size[1])))
        self.padding = padding
        self.stride = stride

    def forward(self, X):
        x = conv_batch(X, self.kernel, self.padding, self.stride)
        return x

In [10]:
# We now do pooling
# Max or Ave pooling
class Pool2d(nn.Module):
    def __init__(self, pool_size, mode="max"):
        super().__init__()
        self.mode = mode
        self.p_h, self.p_w = pool_size
    
    def forward(self, X):
        Y = torch.zeros((X.shape[0], X.shape[1], X.shape[2] - self.p_h + 1, X.shape[3] - self.p_w + 1))
        for b in range(Y.shape[0]):
            for c in range(Y.shape[1]):
                for i in range(Y.shape[2]):
                    for j in range(Y.shape[3]):
                        if self.mode == "max":
                            Y[b, c, i, j] = X[b, c, i : i + self.p_h, j: j + self.p_w].max()
                        else:
                            Y[b, c, i, j] = X[b, c, i : i + self.p_h, j: j + self.p_w].mean()

        return Y

In [11]:
class SimpleCNN(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.net = nn.Sequential(
            Conv2d(2, 3, (2, 2)), nn.ReLU(),
            Pool2d((2, 2)),
            Conv2d(2, 2, (2, 2)), nn.ReLU(),
            Pool2d((2,2)),
            nn.Flatten(),
            nn.LazyLinear(120), nn.Sigmoid(),
            nn.LazyLinear(40), nn.Sigmoid(),
            nn.LazyLinear(num_classes),
            nn.Softmax()
        )

    def forward(self, x):
        x = self.net(x)
        return x

In [12]:
simplecnn = SimpleCNN(len(train_data.classes))



In [13]:
loss_fn = nn.CrossEntropyLoss()

In [14]:
optim = torch.optim.SGD(simplecnn.parameters(), lr=5e-3)

In [15]:
from tqdm import tqdm

In [17]:
for X, Y in tqdm(train_loader):
    optim.zero_grad()

    y_hat = simplecnn(X)
    loss = loss_fn(y_hat, Y)

    loss.backward()
    optim.step()

    print(f"Batch Loss: {loss.item()}")

  0%|          | 0/1563 [00:00<?, ?it/s]

def corr2d(X, K):