In [1]:
import numpy as np

In [2]:
class Conv2DPyt:
    """ Computes convolution given the input parameters """
    
    """ * The class implementation will be along the lines of torch.nn.Conv2D in order to 
          enable comparison of this NumPy only implementation and seamless testing
        * Can expect extensive refactoring of the existing code in the days to come
        * As part of refactoring, some code will be de-modularized
        * Old code will be retained at the end of the notebook for reference
    """
    """
        TODO:
        * Parameter error checking and assertion
        * Implementing other features and caveats offered by nn.torch.Conv2D 
          (e.g., `groups` flag to enable depthwise convolution, uniform sampling of kernel weights etc.)
        * Optimizing code
    """
    
    def __init__(
        self, 
        in_channels, 
        out_channels, 
        kernel_size, 
        padding = 0, 
        stride = 1, 
        dilation = 1, 
        groups = 1, 
        bias = True, 
        padding_mode = 'zeros', 
        device = None, 
        dtype = None, 
        verbose = True, 
        debug = False
        ):
        super(Conv2DPyt, self).__init__()
        
        ''' mandatory parameters '''
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = kernel_size
        
        ''' optional parameters '''
        self.padding = padding
        self.stride = stride
        self.dilation = dilation
        
        ''' optional parameters (dummy, yet to be implemented)'''
        self.groups = groups
        self.bias = bias
        self.padding_mode = padding_mode
        self.device = device
        self.dtype = dtype
        
        ''' additional parameters (different from torch.nn.Conv2D)'''
        self.verbose = verbose
        self.verboseprint = print if self.verbose else lambda *a, **k: None
        self.debug = debug
        self.debugprint = print if self.debug else lambda *a, **k: None
        self.print_params()
    
    def print_params(self):
        self.verboseprint('*** parameters ***')
        self.verboseprint('in_channels: {}, out_channels: {}, kernel_size: {}'.format(self.in_channels, self.out_channels, self.kernel_size))
        self.verboseprint('padding: {}, stride: {}, dilation factor: {}'.format(self.padding, self.stride, self.dilation))
        self.verboseprint('groups: {}, bias: {}, padding_mode: {}, device: {}, dtype: {}'.format(self.groups, self.bias, self.padding_mode, self.device, self.dtype))
        self.verboseprint('\n')
    
    def add_padding(self, _input):
        # add zero padding based on the input parameters
        if self.padding != 0:
            _input = [[np.pad(channel,self.padding, 'constant', constant_values=0) for channel in batch] for batch in _input]    
            self.verboseprint('*** padded input image ***')
            self.verboseprint('input batches: {}, input channels: {}, input height: {}, input weight: {}'.format(_input.shape[0], _input.shape[1], _input.shape[2], _input.shape[3]))
            self.verboseprint(_input)
            self.verboseprint('\n')
        return _input
    
    def create_kernels(self):
        # create random kernels based on the input kernel parameters
        kernels = []
        self.verboseprint('*** kernels ***')
        self.verboseprint('kernels: {}, kernel channels: {}, kernel height: {}, kernel weight: {}'.format(self.out_channels, self.in_channels, self.kernel_size[0], self.kernel_size[1]))
        for k in range(self.out_channels):
            kernel = np.random.rand(self.in_channels, self.kernel_size[0], self.kernel_size[1]) # define a random kernel based on the kernel parameters
            if self.debug:
                kernel = k * np.ones_like(kernel)
            kernels.append(kernel)
            self.verboseprint('kernel {}'.format(k))
            self.verboseprint(kernel)
        self.verboseprint('\n')
        kernels = self.dilate_kernels(kernels)
        return kernels 
    
    def dilate_kernels(self, kernels):
        # dilate a kernel
        dil_ker_h = self.dilation * (self.kernel_size[0] - 1) + 1
        dil_ker_w = self.dilation * (self.kernel_size[1] - 1) + 1
        dil_kernels = []
        for kernel in kernels:
            dil_kernel = []
            for channel in kernel:
                dil_channel = np.zeros((dil_ker_h, dil_ker_w))
                for row in range(len(channel)):
                    for col in range(len(channel[0])):
                        dil_channel[self.dilation*row][self.dilation*col] = channel[row][col]
                dil_kernel.append(dil_channel.tolist())
            dil_kernels.append(dil_kernel)
        kernels, self.kernel_size = dil_kernels, (dil_ker_h, dil_ker_w)
        self.verboseprint('*** dilated kernels ***')
        self.verboseprint('kernels: {}, dilation factor: {}, kernel channels: {}, kernel height: {}, kernel weight: {}'.format(self.out_channels, self.dilation, self.in_channels, self.kernel_size[0], self.kernel_size[1]))
        for k in range(self.out_channels):
            self.verboseprint('kernel {}'.format(k))
            self.verboseprint(kernels[k])
        self.verboseprint('\n')
        return kernels
    
    def compute_out_vol(self, _input):
        # compute output volume from the input and kernel parameters
        _input_n, _, _input_h, _input_w = _input.shape
        
        out_n = int(_input_n)
        out_c = int(self.out_channels)
        out_h = int((_input_h - self.kernel_size[0])/self.stride) + 1
        out_w = int((_input_w - self.kernel_size[1])/self.stride) + 1
        return out_n, out_c, out_h, out_w
    
    def forward(self, _input):
        # create output from the input and kernel parameters 
        _input = self.add_padding(_input)
        if self.debug:
            for b in range(_input.shape[0]):
                _input[b] = (b+1) * np.ones_like(_input[b]) # define an image of all ones (twos etc.) based on the input parameters
        kernels = self.create_kernels()
        out_n, out_c, out_h, out_w = self.compute_out_vol(_input)
        output = np.zeros([out_n, out_c, out_h, out_w])
        # parse through every element of the output and compute the convolution value for that element
        for b in range(out_n):
            for k in range(out_c):
                for h in range(out_h):
                    for w in range(out_w):
                        # output[b, k, h, w] += self.convolve(h, w, k, b, _input, kernels)
                        # convolve kernel over the input slices
                        self.debugprint('kernel indices, image indices')
                        self.debugprint('[c, h, w]', '[n, c, h, w]')
                        convol_sum = 0
                        ker_c = self.in_channels
                        ker_h = self.kernel_size[0]
                        ker_w = self.kernel_size[1]
                        for c_ker in range(ker_c):
                            for h_ker in range(ker_h):
                                for w_ker in range(ker_w):
                                    self.debugprint([c_ker, h_ker, w_ker], [b, c_ker, h_ker + self.stride*h, w_ker + self.stride*w])
                                    convol_sum += kernels[k][c_ker][h_ker][w_ker] * _input[b][c_ker][h_ker + self.stride*h][w_ker + self.stride*w]
                        self.debugprint('\n')
                        output[b, k, h, w] += convol_sum
        self.verboseprint('*** output ***')
        output_shape = output.shape
        self.verboseprint('output batches: {}, ouput channels: {}, output height: {}, output weight: {}'.format(output_shape[0], output_shape[1], output_shape[2], output_shape[3]))
        assert((out_n, out_c, out_h, out_w) == output_shape)
        self.verboseprint(output)
        self.verboseprint('\n')
        return output

In [3]:
in_channels = 2 # input channels
out_channels = 3 # output channels
kernel_size = (2, 2) # kernel size

padding = 0 # padding (optional)
stride = 2 # stride (optional)
dilation = 1 # dilation factor (optional)
        
debug = True

in_batches = 2 # input batches
in_h = 4 # input height
in_w = 4 # input weight

conv2dpyt = Conv2DPyt(in_channels, out_channels, kernel_size, stride = stride, padding = padding, dilation = dilation, debug=debug) # call an instance of the class with the input parameters 
_input = np.random.rand(in_batches, in_channels, in_h, in_w) # define a random image based on the input parameters

_output = conv2dpyt.forward(_input) # perform convolution

*** parameters ***
in_channels: 2, out_channels: 3, kernel_size: (2, 2)
padding: 0, stride: 2, dilation factor: 1
groups: 1, bias: True, padding_mode: zeros, device: None, dtype: None


*** kernels ***
kernels: 3, kernel channels: 2, kernel height: 2, kernel weight: 2
kernel 0
[[[0. 0.]
  [0. 0.]]

 [[0. 0.]
  [0. 0.]]]
kernel 1
[[[1. 1.]
  [1. 1.]]

 [[1. 1.]
  [1. 1.]]]
kernel 2
[[[2. 2.]
  [2. 2.]]

 [[2. 2.]
  [2. 2.]]]


*** dilated kernels ***
kernels: 3, dilation factor: 1, kernel channels: 2, kernel height: 2, kernel weight: 2
kernel 0
[[[0.0, 0.0], [0.0, 0.0]], [[0.0, 0.0], [0.0, 0.0]]]
kernel 1
[[[1.0, 1.0], [1.0, 1.0]], [[1.0, 1.0], [1.0, 1.0]]]
kernel 2
[[[2.0, 2.0], [2.0, 2.0]], [[2.0, 2.0], [2.0, 2.0]]]


kernel indices, image indices
[c, h, w] [n, c, h, w]
[0, 0, 0] [0, 0, 0, 0]
[0, 0, 1] [0, 0, 0, 1]
[0, 1, 0] [0, 0, 1, 0]
[0, 1, 1] [0, 0, 1, 1]
[1, 0, 0] [0, 1, 0, 0]
[1, 0, 1] [0, 1, 0, 1]
[1, 1, 0] [0, 1, 1, 0]
[1, 1, 1] [0, 1, 1, 1]


kernel indices, image indices
[c

In [4]:
# get PyTorch output with the same debug mode inputs

import torch

x = torch.FloatTensor(_input)

weights = torch.stack([k * torch.ones([in_channels, kernel_size[0], kernel_size[1]]) for k in range(out_channels)])

output = torch.nn.functional.conv2d(x, weights, stride=stride, padding=padding, dilation=dilation)

print(output)

tensor([[[[ 0.,  0.],
          [ 0.,  0.]],

         [[ 8.,  8.],
          [ 8.,  8.]],

         [[16., 16.],
          [16., 16.]]],


        [[[ 0.,  0.],
          [ 0.,  0.]],

         [[16., 16.],
          [16., 16.]],

         [[32., 32.],
          [32., 32.]]]])


In [89]:
# compare outputs of conv-Numpy and PyTorch
print(torch.equal(torch.FloatTensor(_output), output))

True
