In [41]:
import numpy as np

In [135]:
class Conv2D:
    
    def __init__(self, ksize, filters, input_size, activation, stride=1, padding=0):
        if input_size[0] <= 0 or input_size[1] <= 0:
            raise ValueError(f"Input image size is invalid, got {input_size}")
        self.ksize = ksize
        self.filters = filters # no. of kernels in a layer -> no. of channels in each output
        self.stride = stride
        self.padding = padding
        self.input_size = input_size # to decide no. of channels in the kernel
        self.channels = input_size[-1]
        self.activation = activation
        self.kernels = []
        for i in range(self.filters):
            k = np.random.randn(ksize, ksize, self.channels)
            self.kernels.append(k)
        self.bias = np.random.randn(1,self.filters)
        
    @staticmethod
    def _rotate(inp):
        assert len(inp.shape)==4, f"No. of dim in inp not equal to 4, got {inp.shape}"
        return np.flip(inp, axis=(1,2))

    @staticmethod
    def _convolution_op_helper(inp, kernel, stride=1):
        # inp shape -> 4 dim
        assert len(inp.shape)==4, f"No. of dim in inp not equal to 4, got {inp.shape}"
        # kernel shouldhave 4 dim
        assert len(kernel.shape)==4, f"No. of dim in kernel not equal to 4, got {kernel.shape}"

        # no. of chanels in kernel and that in inp it should be same
        assert inp.shape[-1] == kernel.shape[-1], f"Mismatch in no. of channels in inp and kernel, got inp {inp.shape[-1]}, kernel {kernel.shape[-1]}"
        # non-square kernels are not allowed
        assert kernel.shape[1] == kernel.shape[2], f"dim 0 of kernel doesn't match dim 1, got {kernel.shape}"
        # inp shape square
        assert inp.shape[1]>=kernel.shape[1] and inp.shape[2]>=kernel.shape[2], f"Inp map dim(1,2) < kernel dim(1,2), got inp map dim 1, 2 {inp.shape[1:-1]}, kernel dim 1,2 {kernel.shape[1:-1]}"

        # flip the kernel
        kernel = Conv2D._rotate(kernel)

        oup = []
        start_rloc = 0
        end_rloc = kernel.shape[1]
        while end_rloc <= inp.shape[1]:
            output = []
            start_cloc = 0
            end_cloc = kernel.shape[2]
            while end_cloc <= inp.shape[2]:
                conv = (inp[:,start_rloc:end_rloc, start_cloc:end_cloc]*kernel).sum(axis=(1,2,3))
                output.append(conv)

                start_cloc += stride
                end_cloc += stride
            oup.append(output)
            start_rloc += stride
            end_rloc += stride
        return np.moveaxis(oup, -1, 0)
    
    def _convolution_op(self, inp):
        output = []
        for kernel in self.kernels:
            o = Conv2D._convolution_op_helper(inp, np.expand_dims(kernel, axis=0), self.stride)
            output.append(o)
        output = np.stack(output, axis=-1)
        return output
    
    def _pad_grad_I(self, grad_I):
        return np.pad(grad_I, [(0, 0), (0, self.input_size[0] - grad_I.shape[1]), (0, self.input_size[1] - grad_I.shape[2]), (0,0)])
            
    @staticmethod
    def _pad(inp, pad_width):   
        assert len(inp.shape)==4, f"No. of dim in inp not equal to 4, got {inp.shape}"
        return np.pad(inp, ((0,0), (pad_width,pad_width), (pad_width,pad_width), (0,0)))

    @staticmethod
    def _inside_pad(inp, pad_width):
        assert len(inp.shape)==4, f"No. of dim in inp not equal to 4, got {inp.shape}"
        ix = np.repeat(np.arange(1, inp.shape[1]), pad_width)
        inp = np.insert(inp, ix, 0, axis=1)
        return np.insert(inp, ix, 0, axis=2)
        

    def eval(self, X):
        o_ = self._convolution_op(X) + self.bias
        return self.activation(o_)

    def grad_activation(self, X): #pqrs
        o_ = self._convolution_op(X) + self.bias # shape: m, h, w, c; eg (50, 3,3,2)
        m, h, w, c = o_.shape # (50, 2,2, 5)
        do_do_ = self.activation.grad_input(o_.reshape(m, h*c*w)) # shape of do_do-: (50, 20, 20)
        return np.diagonal(do_do_, axis1=1, axis2=2).reshape(o_.shape)
    
    
    def gradient_dict(self, X):
        g = {}
        g['activation'] = self.grad_activation(X) # do_do_
        g['input'] = self.get_input(X)
        return g
        
    def get_input(self, X):
        out_h, out_w, _ = self.get_output_size()
        h = (out_h-1)*self.stride - 2*self.padding + self.ksize
        w = (out_w-1)*self.stride - 2*self.padding + self.ksize
        return Conv2D._rotate(X[:, :h, :w, :]) # flip input

    def backprop_grad(self, grad_loss, grad): # abcd
        # to find dL_dwi and dL_dbi, we need dL_do and do_do_. 
        
        """grad: dictionary, keys: activation, input"""
        do_do_ = grad['activation'] # pqrs
#         print("pqrs:", do_do_.shape)
        ##################################
        #                                #
        #          dL_dbi                #
        #                                #
        ##################################
        b, h, w, c = grad_loss.shape
        dL_do_ = grad_loss * do_do_[:,:h, :w,:]
#         print("dL_do_", dL_do_.shape)
        dL_dbi = []
        for c in range(dL_do_.shape[-1]):
            b = dL_do_[:,:,:,c].sum(axis =(1, 2, 0))
            dL_dbi.append(b)
        dL_dbi = np.array(dL_dbi).reshape(1,-1)
        
        ##################################
        #                                #
        #          dL_dwi                #
        #                                #
        ##################################
        kernels = Conv2D._inside_pad(dL_do_, self.stride-1) # abcd*pqrs -> act as a kernel while computing dL_dwi # 18,18,5
        inps = grad['input'] # 20, 20,10
#         print("grad_input:", inps.shape)
        dL_dwi = [] # len should be same no. of filters in this layer
        for i in range(dL_do_.shape[-1]): # 5 times
            kernel = kernels[:,:,:,i] # 1, 18,18, 1
            dwi = []
            for j in range(inps.shape[-1]): # 10 times
                inp = inps[...,j] # 1, 20, 20, 1
                conv = Conv2D._convolution_op_helper(np.expand_dims(inp,axis=-1) , np.expand_dims(kernel, axis=-1))
                dwi.append(conv)
#             print(conv.shape) # 10,m, 3 ,3
            dwi = np.transpose(np.array(dwi), (1,0,2,3)).sum(axis=0) # (m, 10, 3, 3).sum(axis=0) -> 10, 3, 3
            dwi = np.transpose(dwi, (1,2,0)) 
            dL_dwi.append(dwi)
            
        ##################################
        #                                #
        #          dL_dI                 #
        #                                #
        ##################################
        inps = Conv2D._pad(kernels, self.ksize-1)
        kernels = self.kernels
        dL_dI = []
        for i in range(self.input_size[-1]):
            ## ith channel of jth kernel needs to convolve with jth channel of inp 
            kernel = [self.kernels[j][...,i] for j in range(len(self.kernels))]
            kernel = np.stack(kernel, axis=-1) # 3,3,5
            conv = Conv2D._convolution_op_helper(inps, np.expand_dims(kernel, axis=0))
            dL_dI.append(conv)
        dL_dI = np.stack(dL_dI, axis=-1)
        
        return dL_dwi, dL_dbi, self._pad_grad_I(dL_dI)
    
    def _pad_grad_I(self, grad_I):
        return np.pad(grad_I, [(0, 0), (0, self.input_size[0] - grad_I.shape[1]), (0, self.input_size[1] - grad_I.shape[2]), (0,0)])
        
    def update(self, grad, optimizer):
        """ grad: (dL_dwi, dL_dbi)"""
        self.bias = optimizer.minimize(self.bias, grad[1])
        for i in range(len(self.kernels)):
            self.kernels[i] = optimizer.minimize(self.kernels[i], grad[0][i]) 
            
    def get_parameter_shape(self):
        return self.kernels[0].shape, self.bias.shape
    
    def get_output_size(self):
        m, n, k, p, s = self.input_size[0], self.input_size[1], self.ksize, self.padding, self.stride
        return ((m-k+(2*p))//s)+1, ((n-k+(2*p))//s)+1, self.filters
    
    def get_total_parameters(self):
        return np.prod((len(self.kernels), *self.kernels[0].shape)) + np.prod(self.bias.shape)

In [3]:
class Sigmoid:

    def __call__(self, X):
        return self.eval(X)

    def eval(self, X):
        return 1/((np.e**-X) + 1)

    def grad_input(self, X):
        I = np.identity(X.shape[1])
        b = self.eval(X)*(1-self.eval(X)) # same shape as X
        return np.einsum('ij,mi->mij', I, b)


In [110]:
class Flatten:
    
    def __init__(self, input_size):
        if input_size[0] <= 0 or input_size[1] <= 0:
            raise ValueError(f"Input image size is invalid, got {input_size}")
        self.h, self.w, self.c = input_size
        
    def eval(self, X):
        return X.reshape(-1, self.h*self.w*self.c)
    
    
    def gradient_dict(self, X):
        return {}

    def backprop_grad(self, grad_loss, grad): # abcd (grad_loss: 10,1,1620)
        return None, None, grad_loss[:, 0, :].reshape(-1, self.h, self.w, self.c) # -> 10, 18, 18, 5 
        
    def update(self, grad, optimizer):
        """ grad: (dL_dwi, dL_dbi)"""
        pass
        
    def get_parameter_shape(self):
        return ("-","-")
    
    def get_output_size(self):
        return (1,self.h*self.w*self.c)
    
    def get_total_parameters(self):
        return 0

In [91]:
m = 5
f = 2
np.ones((10, 1, 1620))[:,0,:].reshape((-1, 18, 18, 5)).shape

(10, 18, 18, 5)

In [5]:
import sys
sys.path.append("E://CB-DS-LV-May21//DS//NN")

In [6]:
from activation import Sigmoid
from loss import BinaryCrossEntropy
from optimizer import GradientDescentOptimizer
from layer import Dense
from model import Sequential

### Load Data

In [7]:
from keras.datasets import mnist

In [8]:
(x_train, y_train), (x_test, y_test) = mnist.load_data()
print(x_train.shape, y_train.shape)

(60000, 28, 28) (60000,)


In [10]:
x_train = x_train[(y_train==0) | (y_train==1)].reshape(-1, 28,28,1)/255
y_train = y_train[(y_train==0) | (y_train==1)].reshape(-1,1)
print(x_train.shape, y_train.shape)

(12665, 28, 28, 1) (12665, 1)


In [136]:
model = Sequential(BinaryCrossEntropy())
model.add(Conv2D, ksize=5, filters=20, input_size=(28,28,1), activation=Sigmoid(), stride=2, padding=0)
model.add(Conv2D, ksize=5, filters=10, activation=Sigmoid(), stride=2, padding=0)
model.add(Conv2D, ksize=3, filters=5, activation=Sigmoid(), stride=1, padding=0)
model.add(Conv2D, ksize=2, filters=5, activation=Sigmoid(), stride=1, padding=0)
model.add(Flatten)
model.add(Dense, activation=Sigmoid(), units=1)
model.summary()

+---+------------+------------+---------+--------------+------------------+
| # | Layer Type |  W.shape   | b.shape | Output shape | Total parameters |
+---+------------+------------+---------+--------------+------------------+
| 1 |   Conv2D   | (5, 5, 1)  | (1, 20) | (12, 12, 20) |       520        |
| 2 |   Conv2D   | (5, 5, 20) | (1, 10) |  (4, 4, 10)  |       5010       |
| 3 |   Conv2D   | (3, 3, 10) | (1, 5)  |  (2, 2, 5)   |       455        |
| 4 |   Conv2D   | (2, 2, 5)  | (1, 5)  |  (1, 1, 5)   |       105        |
| 5 |  Flatten   |     -      |    -    |    (1, 5)    |        0         |
| 6 |   Dense    |   (5, 1)   | (1, 1)  |    (1, 1)    |        6         |
+---+------------+------------+---------+--------------+------------------+
Total no. of model parameters 6096


In [28]:
x_train[0:10].shape

(10, 28, 28, 1)

In [137]:
ypred = model.predict(x_train[0:10])
print(ypred.shape)

(10, 1)


In [138]:
model.loss(ypred, y_train[:10])

130.9201092810309

In [142]:
model.fit(x_train[:10], y_train[:10], epochs=3, optimizer=GradientDescentOptimizer, learning_rate=0.005, verbose=1, batch_size=1)

Epoch: 3 Loss: 1.20282292144554334

In [143]:
ypred = model.predict(x_train[0:10])
print(ypred.shape)

(10, 1)


In [144]:
model.loss(ypred, y_train[:10])

128.64618569567816