In [1]:
import numpy as np

In [196]:
class Conv2D:
    
    def __init__(self, ksize, stride, padding, activation, filters, input_size):
        self.kernels = []
        self.stride = stride
        self.padding = padding
        self.input_size = input_size
        self.ksize = ksize
        self.filters = filters
        
        self.bias = np.random.randn(filters).reshape(1, -1)
        for i in range(filters):
            self.kernels.append(np.random.randn((ksize, ksize, input_size[-1])))
        self.activation = activation
        
    @staticmethod
    def _rotate(inp):
        assert len(inp.shape) == 4, f"Shape mismatch, input map should have 4 dim, got {len(inp.shape)}"

        return np.flip(inp, axis=(1,2))

    @staticmethod
    def _inside_pad(inp, pad_width):
        assert len(inp.shape) == 4, f"Shape mismatch, input map should have 4 dim, got {len(inp.shape)}"
        
        if pad_width == 0:
            return inp
        ix = np.repeat(np.arange(1, arr.shape[1]), pad_width)

        inp = np.insert(inp, ix, 0, axis=1)
        return  np.insert(inp, ix, 0, axis=2)


    @staticmethod
    def _pad(inp, pad_width):
        assert len(inp.shape) == 4, f"Shape mismatch, input map should have 4 dim, got {len(inp.shape)}"
        if pad_width == 0:
            return inp

        return np.pad(arr, [(0, 0), (pad_width, pad_width), (pad_width, pad_width), (0,0)])

    @staticmethod
    def _convolution_op_w_kernel(inp, kernel, stride=1):
        
        assert len(inp.shape) == 4, f"Shape mismatch, input map should have 4 dim, got {len(inp.shape)}"
        assert len(kernel.shape) == 4, f"Shape mismatch, kernel should have 4 dim, got {len(kernel.shape)}"
        assert inp.shape[-1] == kernel.shape[-1], f"Shape mismatch, input map should have same channel as kernel, got {inp.shape[-1]} & {kernel.shape[-1]}"
        assert kernel.shape[0] == kernel.shape[1], "Non square kernels are not supported"
        assert (inp.shape[1] >= kernel.shape[0]) or (inp.shape[2] >= kernel.shape[1]), f"Input map shape less than kernel, got {inp.shape[1:3]} for kernel {kernel.shape[:-1]}"

        kernel = self._rotate(kernel)

        start_rloc = 0
        end_rloc = kernel.shape[1]

        oup = []

        while end_rloc <= inp.shape[1]:

            start_cloc = 0
            end_cloc = kernel.shape[2]
            output = []

            while end_cloc <= inp.shape[2]:
                conv = inp[:, start_rloc:end_rloc, start_cloc:end_cloc]*kernel
                output.append(conv.sum(axis=(1,2,3)))

                start_cloc += stride
                end_cloc += stride

            oup.append(output)

            start_rloc += stride
            end_rloc += stride        
            
        return np.moveaxis(oup, -1, 0)
    
    def _convolution_op(self, inp, stride): 
        
        feature_maps = []
        for kernel in self.kernels:
            oup = self._convolution_op_w_kernel(inp, np.expand_dims(kernel, 0), stride)
            feature_maps.append(oup)
        
        return np.stack(feature_maps, axis=-1)
    
    def get_output_size(self):
        m, n, k, p, s = self.input_size[0], self.input_size[1], self.ksize, self.padding, self.stride
        return (m-k+2*p)//s + 1, (n-k+2*p)//s + 1, self.filters

    def get_no_of_params(self):
        return (self.ksize*self.ksize*self.input_size[-1]*self.filters) + self.filters

    def eval(self, X):
        out = self._convolution_op(X.T, self.stride) + self.bias
        b, h, w, c = out.shape
        a_out = self.activation(out.reshape(b, h*w*c).T)
        
        return a_out.T.reshape(b, h, w, c)

    def grad_parameters(self, X):
        out = self._convolution_op(X.T, self.stride) + self.bias
        b, h, w, c = out.shape
        
        da_dI = self.activation.grad_input(out.reshape(b, h*w*c).T)
        da_dI = np.diagonal(da_dI, axis1=1, axis2=2)
        
        return da_dI.T.reshape(b, h, w, c), None
    
    def grad_input(self, X):
        
        return X.T
    
    @staticmethod
    def backprop_grad(abcd, grad): # abcd -> grad_loss
        pqrs = grad["w"]
        
        kernels = pqrs * abcd
        kernels = self._inside_pad(kernels, self.stride-1)
        inps = grad["input"]
        grad_ws = []
        
        #### GRAD W---------------------------
        for i in range(kernels.shape[-1]):
            kernel = kernels[..., i]
            grad_w = []
            for j in range(inps.shape[-1]):
                inp = inps[..., i]
                oup = self._convolution_op_w_kernel(inp, kernel)
                oup = self._rotate(oup).sum(axis=0)
                grad_w.append(oup)
            grad_ws.append(np.array(grad_w))
        ### -----------------------------------
        
        #### GRAD I---------------------------
        inp = self._pad(kernels, self.ksize-1)
        kernels = self.kernels
        grad_I = np.empty(grad["input"].shape, dtype="float")
        
        for i in range(self.input_size[-1]):
            kernel = np.dstack([kernels[j][...,i] for j in range(len(kernels))])
            oup = self._convolution_op_w_kernel(inp, kernel)
            grad_I[..., i] = oup
        ### -----------------------------------
        
        #### GRAD b---------------------------
        grad_bs = np.sum(pqrs * abcd, axis=(1,2,0))

        return grad_ws, grad_bs, grad_I

    def update(self, grad_w, grad_b, optimizer, method="minimize"):
        pass
        

In [None]:
# MaxPool, Flatten