In [1]:
import torch
import torchvision
import torchvision.transforms as transforms
import numpy as np
import torch.nn as nn

In [2]:
debug_mode = True

In [3]:
transform = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

batch_size = 4

trainset = torchvision.datasets.CIFAR10(root='../data', train=True,
                                        download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size,
                                          shuffle=True, num_workers=2)

testset = torchvision.datasets.CIFAR10(root='../data', train=False,
                                       download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size,
                                         shuffle=False, num_workers=2)

classes = ('plane', 'car', 'bird', 'cat',
           'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

Files already downloaded and verified
Files already downloaded and verified


In [4]:
class Value:
    
    def __init__(self, data, _children = (), _op='', label = ''):
        self.data = data
        self._prev = set(_children)
        self._op = _op
        self.label = label
        self._backward = lambda: None
        self.grad = np.zeros_like(self.data, dtype = np.float64)
    
    def zero_grad(self):
        self.grad[:] = 0.0
        for child in self._prev:
            child.zero_grad()

In [5]:
class BuildingBlock:
    
    # Must override
    def __init__(self):
        pass
    
    # Must override, return Value object
    def __call__(self, *args, **kwargs):
        out = Value(output_array, tuple(args), 'default')
        out._backward = self.getBackwardFunction(out, *args, **kwargs)
        return out
    
    # Must override, return function for backpropagation
    def getBackwardFunction(self, out, *args, **kwargs):
        def _backward():
            print("Default backward function called. Please define backward function for this neuron")
        return _backward
    
    # Must override,return learnable parameters
    def getLearnableParameters(self):
        return []

In [6]:
class Convolve(BuildingBlock):
    
    def __init__(self, in_channels, out_channels, kernel_size, *args, **kwargs):
        if isinstance(kernel_size, int):
            kernel_size = (kernel_size, kernel_size)
        
        assert isinstance(kernel_size, tuple), "Kernel size must be either integer or tuple"
        
        assert len(kernel_size) == 2, "Kernel size must be 2"
        
        assert isinstance(out_channels, int), "out_channels must be of type int"
        
        assert isinstance(in_channels, int), "out_channels must be of type int"
        
        self.kernel = Value(np.random.random(size = (out_channels, in_channels, *kernel_size)))
            
        super().__init__(*args, **kwargs)
        
    def getLearnableParameters(self):
        return [self.kernel]
        
    def __call__(self, imageToBeConvoluted):
        kernel = self.kernel
        
        assert len(imageToBeConvoluted.data.shape) == 4, "Convolving tensors with shape 4 ONLY"
    
        
        N, C_in, W, H = imageToBeConvoluted.data.shape
        C_out, C_in_expected, Kx, Ky = kernel.data.shape
        
        assert C_in == C_in_expected, "Input channels must be equal to ones declared at object initialization"
        
        
        output_array = np.zeros(shape = (N, C_out, W - Kx + 1, H - Ky + 1))
        
        for c in range(C_out):
            for x in range(W - Kx + 1):
                for y in range(H - Ky + 1):
                    output_array[:, c, x, y] = np.sum(imageToBeConvoluted.data[:, :, x:x + Kx, y:y + Ky] * kernel.data[c], axis = (1, 2, 3))
                
        out = Value(output_array, (imageToBeConvoluted, kernel), 'convolve')
        out._backward = self.getBackwardFunction(out, imageToBeConvoluted)
        return out
        
    def getBackwardFunction(self, out, imageToBeConvoluted):
        kernel = self.kernel
        
        N, C_in, W, H = imageToBeConvoluted.data.shape
        C_out, C_in_expected, Kx, Ky = kernel.data.shape
        
        def _backward():
            for c in range(C_out):
                for i in range(Kx):
                    for j in range(Ky):
                        # val1 = out.grad[:, c:c+1] is of shape (N, 1, W - Kx + 1, H - Ky + 1)
                        # val2 = imageInput.data[:, :, i: i + W - Kx + 1, j: j + H - Ky + 1] 
                        # is of shape (N,C_in, W - Kx + 1, H - Ky + 1)
                        # 
                        # val1 * val2 will give us a broadcasted vector of shape (N, C_in, W - Kx + 1, H - Ky + 1)
                        # sum of this value across axes (0, 2, 3) will give us the gradient vector of length C_in
                        # kernel.grad[c, :, i, j] is a vector of shape c_in
                        val1 = out.grad[:, c:c + 1]
                        val2 = imageToBeConvoluted.data[:, :, i:i + W - Kx + 1, j:j + H - Ky + 1]
                        kernel.grad[c, :, i, j] += np.sum(val1 * val2, axis = (0, 2, 3))
                        
            
            for c in range(C_out):
                for i in range(W - Kx + 1):
                    for j in range(H - Ky + 1):
                        # val1 = out.grad[:, c:c+1, i:i + 1, j:j + 1] is a vector of shape (N, 1, 1, 1)
                        # val2 = kernel.data[c:c+1] is a vector of shape (1, C_in, Kx, Ky)
                        # val1 * val2 would be a vector of shape (N, C_in, Kx, Ky)
                        # add that to image.grad[:, :, i:i + Kx, j:j + Ky]
                        val1 = out.grad[:, c:c+1, i:i + 1, j:j + 1]
                        val2 = kernel.data[c:c+1]
                        imageToBeConvoluted.grad[:, :, i:i + Kx, j:j + Ky] += val1 * val2
        return _backward

In [7]:
if debug_mode:
    C_out = 4
    C_in = 3
    K = 3
    stride = 1
    W = 32
    H = 32
    N = 4

    test_input = torch.randn(N, C_in, W, H)
    m = nn.Conv2d(C_in, C_out, K, stride = stride, bias = False)
    output = m(test_input)

In [8]:
if debug_mode:
    c = Convolve(C_in, C_out, K)
    weights = next(iter(m.parameters())).squeeze().detach().numpy()
    c.kernel.data = weights
    image = Value(test_input.detach().numpy())
    actual = c(image)
    print(actual.grad.shape)
    print(np.linalg.norm(actual.data - output.data.detach().numpy()))
    actual.grad[:] = 1.0
    actual._backward()

(4, 4, 30, 30)
7.44420207734223e-06


### Checking whether gradients are correct or not

In [9]:
if debug_mode:
    import torch.nn as nn
    import torch.nn.functional as F

    C_out = 4
    C_in = 3
    K = 3
    stride = 1
    W = 32
    H = 32
    N = 4

    class Net(nn.Module):
        def __init__(self):
            super().__init__()
            self.conv1 = nn.Conv2d(C_in, C_out, K, stride = stride)

        def forward(self, x):
            x = self.conv1(x)
            x = torch.sum(x)
            return x


    net = Net()

In [10]:
if debug_mode:
    weight_value = next(iter(net.conv1.parameters())).data.numpy()
    c = Convolve(C_in, C_out, K)
    c.kernel.data = weight_value
    expected_output = net(test_input)
    actual_output = c(Value(test_input.numpy()))

In [11]:
if debug_mode:
    import torch.optim as optim
    optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)
    optimizer.zero_grad()
    expected_output.backward()

    actual_output.grad[:] = 1.0
    actual_output._backward()

In [12]:
if debug_mode:
    expected_gradient = list(net.parameters())[0].grad.numpy()
    actual_gradient = c.getLearnableParameters()[0].grad

In [14]:
if debug_mode:
    print(np.linalg.norm(expected_gradient - actual_gradient))

0.00017543987665445308
