In [None]:
import numpy as np 
import matplotlib.pyplot as plt
from scipy import ndimage
import pandas as pd

In [None]:
mnist_data = pd.read_csv("mnist-data/digits.csv")

In [None]:
def pool_forward(x,mode="max"):
    #m*n_c*w*h
    x_patches = x.reshape(x.shape[0],x.shape[1]//2, 2,x.shape[2]//2, 2,x.shape[3])
    if mode=="max":
        out = x_patches.max(axis=2).max(axis=3)
        mask  =np.isclose(x,np.repeat(np.repeat(out,2,axis=1),2,axis=2)).astype(int)
    elif mode=="average": 
        out =  x_patches.mean(axis=3).mean(axis=4)
        mask = np.ones_like(x)*0.25
    return out,mask

In [None]:
def pool_backward(dx, mask):
    return mask*(np.repeat(np.repeat(dx,2,axis=1),2,axis=2))

In [None]:
def relu(x, deriv=False):
    if deriv:
        return (x>0)
    return np.multiply(x, x>0)

In [None]:
def conv_forward(x,w,b,padding="same"):
    if padding=="same": 
        pad = (w.shape[0]-1)//2
    else: #padding is valid - i.e no zero padding
        pad =0 
    n = (x.shape[1]-w.shape[0]+2*pad) +1 #ouput width/height
    y = np.zeros((x.shape[0],n,n,w.shape[3]))
    x = np.pad(x,((0,0),(pad,pad),(pad,pad),(0,0)),'constant', constant_values = 0)
    w = np.flip(w,0)
    w = np.flip(w,1)
    
    f = w.shape[0]
        
    for i in range(x.shape[0]):
        for k in range(w.shape[3]):
            for d in range(x.shape[3]):
                y[i,:,:,k]+=ndimage.convolve(x[i,:,:,d],w[:,:,d,k])[f//2:-(f//2),f//2:-(f//2)]
                #ndimage.convolve starts convolution from centre of kernel and zero pads but we don't want this
                #since we want to manually decide if we want to pad or not
    y = y + b
    return y

In [None]:
def fc_forward(x,w,b):
    return relu(w.dot(x)+b)


In [None]:
def fc_backward(dA,a,x,w,b):
    m = dA.shape[1]
    dZ = dA*relu(a, deriv = True)
    dW = (1/m)*dZ.dot(x.T)
    db = (1/m)*np.sum(dZ,axis=1,keepdims=True)
    dx =  np.dot(w.T,dZ)
    return dx, dW,db

In [None]:
def softmax_forward(x,w,b):
    z = w.dot(x)+b
    a = np.exp(z)
    a = a/np.sum(a,axis=0,keepdims=True)
    return a

In [None]:
def softmax_backward(y_hat, y, w, b, x):
    m = y.shape[1]
    dZ = y_hat - y
    dW = (1/m)*dZ.dot(x.T)
    db = (1/m)*np.sum(dZ,axis=1,keepdims=True)
    dx =  np.dot(w.T,dZ)
    return dx, dW,db

In [None]:
def conv_backward(dZ,x,w,padding="same"):
    m = x.shape[0]
    db = (1/m)*np.sum(dZ, axis=(0,1,2), keepdims=True)
    
    if padding=="same": 
        pad = (w.shape[0]-1)//2
    else: #padding is valid - i.e no zero padding
        pad =0 
    x_padded = np.pad(x,((0,0),(pad,pad),(pad,pad),(0,0)),'constant', constant_values = 0)
    
    #this will allow us to broadcast operation
    x_padded_bcast = np.expand_dims(x_padded, axis=-1) 
    dZ_bcast = np.expand_dims(dZ, axis=-2)
    
    dW = np.zeros_like(w)
    f=w.shape[0]
    w_x = x_padded.shape[1]
    for i in range(f):
        for j in range(f):
            dW[i,j,:,:] = (1/m)*np.sum(dZ_bcast*x_padded_bcast[:,i:w_x-(f-1 -i),j:w_x-(f-1 -j),:,:],axis=(0,1,2))  
    
    dx = np.ones_like(x_padded)*0.0
    Z_pad = f-1
    dZ_padded = np.pad(dZ,((0,0),(Z_pad,Z_pad),(Z_pad,Z_pad),(0,0)),'constant', constant_values = 0)  
    for i in range(x.shape[0]):
        for k in range(w.shape[3]):
            for d in range(x.shape[3]):
                dx[i,:,:,d]+=ndimage.convolve(dZ_padded[i,:,:,k],w[:,:,d,k])[f//2:-(f//2),f//2:-(f//2)]
    dx = dx[:,pad:dx.shape[1]-pad,pad:dx.shape[2]-pad,:]
    return dx,dW,db

Next to define the model:


In [None]:
def init_conv_parameters(f, n_c, k):
    
    return 0.001*np.random.rand(f,f,n_c,k), np.ones((1,1,1,k))
                                                                      
def init_fc_parameters(n_x,n_y):
    return 0.001*np.random.rand(n_y,n_x),np.zeros((n_y,1))

In [None]:
def initialise_parameters():    
    parameters={}
    parameters["W_conv1"], parameters["b_conv1"] = init_conv_parameters(5, 1, 16)
    parameters["W_conv2"], parameters["b_conv2"] = init_conv_parameters(3, 16, 16)

    parameters["W_conv3"], parameters["b_conv3"] = init_conv_parameters(3, 16, 32)
    parameters["W_conv4"], parameters["b_conv4"] = init_conv_parameters(3, 32,32)

    parameters["W_fc1"],parameters["b_fc1"] = init_fc_parameters(1568,512)
    parameters["W_softmax"],parameters["b_softmax"] = init_fc_parameters(512,10)

    return parameters


In [None]:
def forward_prop(X,parameters):
    
    cache={}
    
    cache["z_conv1"] = conv_forward(X,parameters["W_conv1"], parameters["b_conv1"])
    cache["a_conv1"] = relu(cache["z_conv1"])

    cache["z_conv2"] = conv_forward(cache["a_conv1"],parameters["W_conv2"], parameters["b_conv2"])
    cache["a_conv2"] = relu(cache["z_conv2"])

    cache["z_pool1"], cache["mask_pool1"] = pool_forward(cache["a_conv2"])
    
 

    cache["z_conv3"] = conv_forward(cache["z_pool1"],parameters["W_conv3"], parameters["b_conv3"])
    cache["a_conv3"] = relu(cache["z_conv3"])
    
 

    cache["z_conv4"] = conv_forward(cache["a_conv3"],parameters["W_conv4"], parameters["b_conv4"] )
    cache["a_conv4"] = relu(cache["z_conv4"])
    
 
    cache["z_pool2"], cache["mask_pool2"] = pool_forward(cache["a_conv4"])


 

    cache["a_flatten"] = np.reshape(cache["z_pool2"], (cache["z_pool2"].shape[0],-1)).T

 
    cache["a_fc1"] = fc_forward( cache["a_flatten"],parameters["W_fc1"],parameters["b_fc1"])
    
 
    return softmax_forward(cache["a_fc1"],parameters["W_softmax"],parameters["b_softmax"]),cache

In [None]:
def backprop(X,Y,Y_pred,parameters,cache,lambd):
    grads = {}
    
    dA, grads["dW_softmax"],grads["db_softmax"] =softmax_backward(Y_pred, Y, parameters["W_softmax"],parameters["b_softmax"],cache["a_fc1"])

    dA, grads["dW_fc1"],grads["db_fc1"] = fc_backward(dA,cache["a_fc1"],cache["a_flatten"],parameters["W_fc1"],parameters["b_fc1"])
    
    dA = np.reshape(dA.T,cache["z_pool2"].shape)
    dA = pool_backward(dA, cache["mask_pool2"])
    
    dA = dA*relu(cache["z_conv3"],deriv=True)
    dA, grads["dW_conv4"],grads["db_conv4"] = conv_backward(dA,cache["a_conv3"],parameters["W_conv4"])
    
    dA = dA*relu(cache["z_conv3"],deriv=True)
    dA, grads["dW_conv3"],grads["db_conv3"] = conv_backward(dA,cache["z_pool1"],parameters["W_conv3"])
    
    dA = pool_backward(dA, cache["mask_pool1"])
    
    dA = dA*relu(cache["z_conv2"],deriv=True)
    dA, grads["dW_conv2"],grads["db_conv2"] = conv_backward(dA,cache["a_conv1"],parameters["W_conv2"])
    
    dA = dA*relu(cache["z_conv1"],deriv=True)
    _, grads["dW_conv1"],grads["db_conv1"] = conv_backward(dA,X,parameters["W_conv1"])
    
    #regularisation term
    for key in grads:
        if "W" in key:
            grads[key]+= (lambd/X.shape[0])*parameters[key[1:]] 
    return grads        

In [None]:
def loss_function(y_pred,y,parameters,lambd):
    m = y.shape[1]
    cost = (-1/m)*np.sum(y*np.log(y_pred))
    
    regularisation_term = 0
    for key in parameters:
        if "W" in key:
            regularisation_term += np.sum(np.square(parameters[key]))
    
    regularised_cost = cost + (lambd/(2*m))*regularisation_term
    
    return regularised_cost

In [None]:
def accuracy(y_pred,y):
    preds = np.argmax(y_pred,axis=0)
    truth = np.argmax(y,axis=0)
    return np.mean(np.equal(preds,truth).astype(int))

In [None]:
%matplotlib notebook
def train_model(X_train, Y_train, X_dev, Y_dev,num_epochs,batch_size,lambd,learning_rate,parameters = initialise_parameters() ):
    train_costs = []
    train_evals = []
    dev_evals = []
    fig, (ax1, ax2,ax3) = plt.subplots(1,3,figsize=(10, 3))
    
    ax1.set_xlabel('Number of iterations')
    ax1.set_ylabel('Error')
    ax1.set_title('Training Set Error')
    
    ax2.set_xlabel('Number of iterations')
    ax2.set_ylabel('Accuracy')
    ax2.set_title('Training Set Accuracy')
    
    ax3.set_xlabel('Number of iterations')
    ax3.set_ylabel('Accuracy')
    ax3.set_title('Dev Set Accuracy')

    plt.tight_layout()
    plt.ion()

    fig.show()
    fig.canvas.draw()
    for epoch in range (num_epochs):
        print("Training the model, epoch: " + str(epoch+1))
        #cycle through the entire training set in batches
        for i in range(0,X_train.shape[0]//batch_size):
            
            
            #get the next minibatch to train on
            X_train_minibatch = X_train[i*batch_size:(i+1)*batch_size]
            Y_train_minibatch = Y_train[:,i*batch_size:(i+1)*batch_size]
            
            
            
            #perform one cycle of forward and backward propagation to get the partial derivatives w.r.t. the weights
            #and biases. Calculate the cost - used to monitor training
            y_pred, cache = forward_prop(X_train_minibatch,parameters)
            minibatch_cost = loss_function(y_pred,Y_train_minibatch,parameters,lambd)
            minibatch_grads = backprop(X_train_minibatch,Y_train_minibatch,y_pred,parameters, cache,lambd)
            
            
            #update the parameters using gradient descent
            for param in parameters.keys():
                parameters[param] = parameters[param] - learning_rate*minibatch_grads["d"+param]
            
            train_costs.append(minibatch_cost)
            ax1.plot(train_costs)
            fig.canvas.draw()
            
            
            train_eval_metric = accuracy(y_pred,Y_train_minibatch)
            train_evals.append(train_eval_metric)
            ax2.plot(train_evals)
            fig.canvas.draw()
            
            #periodically output an update on the current cost and performance on the dev set for visualisation
            if(i%100 == 0):
                print("Training set error: "+ str(minibatch_cost))
                print("Training set accuracy: "+ str(train_eval_metric))
                y_dev_pred,_ = forward_prop(X_dev,parameters)
                dev_eval_metric = accuracy(y_dev_pred,Y_dev)
                dev_evals.append(dev_eval_metric)
                print("Accuracy on dev set: "+ str(dev_eval_metric))
                ax3.plot(dev_evals)
                fig.canvas.draw()
    print("Training complete!")
    #return the trained parameters 
    return parameters

In [None]:
mnist_data = mnist_data.reindex(np.random.permutation(mnist_data.index)) #shuffle data

In [None]:
X = mnist_data.loc[:,mnist_data.columns!="label"].values
X = np.reshape(X,(X.shape[0], 28,28,1))
X= X/255 #normalise input features 
Y = mnist_data.loc[:,["label"]].values
Y = np.eye(10)[Y.reshape(-1)].T

In [None]:
parameters =train_model(X[:10000],Y[:,:10000],X[28000:28500],Y[:,28000:28500],
                        num_epochs=1,batch_size=64,lambd=5,learning_rate=0.3)

In [None]:
accuracy(forward_prop(X[-1000:],parameters)[0],Y[:,-1000:])