# CNN From Scratch Using NumPy 


In [1]:
import numpy as np
from six.moves import cPickle
import matplotlib.pyplot as plt
from sklearn.metrics import log_loss
np.seterr(all='ignore')
%matplotlib inline

# Utility Functions

In [2]:
def relu(X):
    return np.maximum(X, 0)

def d_relu(X):
    return 1. * (X > 0)


# Feed Forward Methods

In [3]:
def conv_forward(X, K, stride):
    N, C, H, W = X.shape
    C, kernel_height, kernel_width = K.shape
    num_filters = 1
    
    if ((int) ((H - kernel_height) % stride) != 0) or ((int) ((W - kernel_width) % stride) != 0):
        print("Invalid dimension for convolution")
        return

    out_height = (int) ((H - kernel_height) / stride) + 1
    out_width = (int) ((W - kernel_width) / stride) + 1
    
    def conv_f(N, num_filters, i, j):
        i = int(i)
        j = int(j)
        N = int(N)
        A = X[N, :, i*stride : i*stride + kernel_height, j*stride : j*stride + kernel_width]
        t = np.multiply(A, K)
        #Sum the result across all channels
        t1 = np.sum(t, axis=0)
        #Sum of all elements of the matrix
        out = t1.sum()
        return out

    f = np.vectorize(conv_f)
    Z = np.fromfunction(f, (N, num_filters, out_height, out_width) )
    
    #Save information in cache for the backprop
    cache = (X, K, stride)
    
    return Z, cache


def max_pool_forward(X, size, stride):
    N, C, H, W = X.shape
    
    if ((int) ((H - size) % stride) != 0) or ((int) ((W - size) % stride) != 0):
        print("Invalid dimension for convolution")
        return
    
    out_height = (int) ((H - size) / stride) + 1
    out_width = (int) ((W - size) / stride) + 1

    def pool_f(N, C, i, j):
        i = int(i)
        j = int(j)
        N = int(N)
        A = X[N, :, i*stride : i*stride + size, j*stride : j*stride + size]
        out = np.amax(A)
        return out

    f = np.vectorize(pool_f)
    output_pool = np.fromfunction(f, (N, C, out_height, out_width) )
    
    #Save information in cache for the backprop
    cache = (X, size, stride) 
    
    return output_pool, cache

# Back Propogation Methods

In [4]:
def create_mask_from_window(x):
    mask = (x == np.max(x))
    return mask

def pool_backward(dA, cache):
    A_prev, size, stride = cache
    m, n_C_prev , n_H_prev, n_W_prev = A_prev.shape
    m, n_C, n_H, n_W = dA.shape
    
    dA_prev = np.zeros(A_prev.shape)
    
    def pool_b(N,C,i,j):
        i = int(i)
        j = int(j)
        C = int(C)
        N = int(N)
        a_prev_slice = A_prev[N, :, i * stride : i * stride + size, j * stride : j * stride + size]
        mask = create_mask_from_window(a_prev_slice)
        dA_prev[N, :, i * stride : i * stride + size, j * stride : j * stride + size] += np.multiply(mask, dA[N, C, i, j])
        
    b = np.vectorize(pool_b)
    np.fromfunction(b, shape=(dA.shape))
   
    return dA_prev

def conv_backward(dZ, cache):
    A_prev, W, stride = cache
    C, f, f = W.shape
    
    dA_prev = np.zeros(A_prev.shape)                           
    dW = np.zeros(W.shape)
    
    def conv_b(N, C, i, j):
        i = int(i)
        j = int(j)
        N = int(N)
        C = int(C)
        a_slice = A_prev[N, :, i * stride : i * stride + f, j * stride : j * stride + f]
        dA_prev[N, :, i * stride : i * stride + f, j * stride : j * stride + f] += W[C, :, :] * dZ[N, C, i, j]
        dW[:,:,:] += a_slice * dZ[N, C, i, j]
    
    b = np.vectorize(conv_b)
    np.fromfunction(b, shape=(dZ.shape))

    return dA_prev, dW

# Initialize Parameters

In [5]:
C = 3
# Weight of convolution layer 1
k1 = [[1, 4, 6, 4, 1],
      [4, 16, 24, 16, 4],
      [6, 24, 36, 24, 6],
      [4, 16, 24, 16, 4],
      [1, 4, 6, 4, 1]]
np.divide(k1,256)
K1 = np.tile(k1, (C, 1))
K1 = K1.reshape(C, len(k1), len(k1[0]))
# Stride of convolution layer 1
stride_c1 = 1
# Size of Pool layer 1
size1 = 2
# Stride of Pooling layer 1
stride_p1 = 2

C = 1
# Weight of convolution layer 2
k2 = [[0, -1, 0],
      [-1, 5, -1],
      [0, -1, 0]]
K2 = np.tile(k2, (C, 1))
K2 = K2.reshape(C, len(k2), len(k2[0]))
# Stride of convolution layer 2
stride_c2 = 1
# Size of Pool layer 1
size2 = 2
# Stride of Pooling layer 1
stride_p2 = 1

# Fully Connected Layer length
num_fc = 64
out_height = 11
out_width = 11
# Weights of fully connected layer
W3 = np.random.standard_normal(size=(out_height*out_width, num_fc))
# Bias of fully connected layer
b3 = np.random.standard_normal(size=num_fc)

# Output Layer length
num_outputs = 10
# Weights of Softmax layer 
W4 = np.random.standard_normal(size=(num_fc, num_outputs))
# Bias of softmax layer
b4 = np.random.standard_normal(size=num_outputs)

# Learning rate
eta = 0.001

# Model

In [6]:
# Model Architecture
# CONV -> RELU -> MAXPOOL -> CONV -> RELU -> MAXPOOL -> FLATTEN -> FULLYCONNECTED -> SOFTMAX

def model(X, Y):
    ###### LAYER 1 ######
    N, C, H, W = X.shape

    X_conv1, cache_conv1 = conv_forward(X, K1, stride_c1)
    print("Convolved shape 1: ", X_conv1.shape)

    X_activation1 = relu(X_conv1)
    print("Activated shape 1: ", X_activation1.shape)

    X_pool1, cache_pool1 = max_pool_forward(X_activation1, size1, stride_p1)
    print("Pooled shape 1: ", X_pool1.shape)


    ###### LAYER 2 ######
    N, C, H, W = X_pool1.shape

    X_conv2, cache_conv2 = conv_forward(X_pool1, K2, stride_c2)
    print("Convolved shape 2: ", X_conv2.shape)

    X_activation2 = relu(X_conv2)
    print("Activated shape 2: ", X_activation2.shape)

    X_pool2, cache_pool2 = max_pool_forward(X_activation2, size2, stride_p2)
    print("Pooled shape 2: ", X_pool2.shape)


    ###### Fully Connected + Softmax Layer ######
    N, C, out_height, out_width = X_pool2.shape

    X_flat = X_pool2.reshape(N, out_height*out_width)
    print("Flattened shape: " , np.array(X_flat.shape))

    X_fc = np.dot(X_flat, W3) + b3
    print("FC shape: ", X_fc.shape)

    X_fc_act = relu(X_fc)
    print("Activated shape: ", X_fc_act.shape)

    y_pred = np.dot(X_fc_act, W4) + b4
    print("y_pred shape: ", y_pred.shape)
    
    cache = (cache_conv1, cache_pool1, cache_conv2, cache_pool2, X_flat, X_fc_act)
        
    return y_pred, cache

In [7]:
###### Calculate Loss ######
def calculate_loss(N, y_pred, Y):
    exp_scores = np.exp(y_pred - np.max(y_pred, axis=1, keepdims=True), casting="unsafe")

    # Softmax activation
    probs = exp_scores/np.sum(exp_scores, axis=1, keepdims=True)

    # Log loss of the correct class of each of samples
    epsilon = 1e-4
    correct_logprobs = -np.log(probs[np.arange(N), Y] + epsilon)

    # Compute the average loss
    loss = np.sum(correct_logprobs) / N

    return loss, probs

def one_hot(a, num_classes):
    return np.squeeze(np.eye(num_classes)[a])

In [8]:
###### BackProp ######
def backprop(N, probs, cache, Y):
    cache_conv1, cache_pool1, cache_conv2, cache_pool2, X_flat, X_fc_act = cache
    delta4 = probs
    delta4[range(N), Y] -= 1

    dW4 = (d_relu(X_fc_act).T).dot(delta4)
    db4 = np.sum(delta4, axis=0, keepdims=True).reshape(-1)
    print("Weights shape softmax layer: ", dW4.shape, db4.shape)

    global W4
    global W3
    global b4
    global b3
    global K2
    global K1
    delta3 = delta4.dot(W4.T) * (1 - np.power(X_fc_act, 2))

    dW3 = np.dot(X_flat.T, delta3)
    db3 = np.sum(delta3, axis=0, keepdims=True).reshape(-1)
    print("Weights shape FC layer: ", dW3.shape, db3.shape)

    delta2 = delta3.dot(W3.T) * (1 - np.power(X_flat, 2))
    delta2 = delta2.reshape(N, 1, out_height, out_width)

    da_pool2_prev = pool_backward(delta2, cache_pool2)
    print("Pooled layer 2 shape: ", da_pool2_prev.shape)

    da_conv2_prev, dK2 = conv_backward(da_pool2_prev, cache_conv2)
    print("Convolved layer 2 shape: ", da_conv2_prev.shape)
    print("Convolved layer 2 weight shape: ", dK2.shape)

    da_pool1_prev = pool_backward(da_conv2_prev, cache_pool1)
    print("Pooled layer 1 shape: ", da_pool1_prev.shape)

    da_conv1_prev, dK1 = conv_backward(da_pool1_prev, cache_conv1)
    print("Convolved layer 1 shape: ", da_conv1_prev.shape)
    print("Convolved layer 1 weight shape: ", dK1.shape)

    # Gradient descent parameter update
    W4 -= eta * dW4
    b4 -= eta * db4
    W3 -= eta * dW3
    b3 -= eta * db3
    
    np.subtract(K2, np.multiply(dK2, eta, casting='unsafe'), out = K2, casting = 'unsafe')
    np.subtract(K1, np.multiply(dK1, eta, casting='unsafe'), out = K1, casting = 'unsafe')

# Training Model

In [9]:
epochs = 5
for i in range(epochs):
    print("Epoch ", i+1)
    # Load dataset batch i
    f = open("./data/cifar-10-batches-py/data_batch_{}".format(i+1), 'rb')
    datadict = cPickle.load(f,encoding='latin1')
    f.close()
    X_raw = datadict["data"]
    Y = datadict['labels']
    X = X_raw.reshape(10000, 3, 32, 32)
    batch_size = 10000
    print("Feed Forward:")
    y_pred, cache = model(X, Y)
    loss, probs = calculate_loss(batch_size, y_pred, Y)
    print("Batch {} Loss ".format(i+1), loss)
    print("Back Propogation:")
    backprop(batch_size, probs, cache, Y)

Epoch  1
Feed Forward:
Convolved shape 1:  (10000, 1, 28, 28)
Activated shape 1:  (10000, 1, 28, 28)
Pooled shape 1:  (10000, 1, 14, 14)
Convolved shape 2:  (10000, 1, 12, 12)
Activated shape 2:  (10000, 1, 12, 12)
Pooled shape 2:  (10000, 1, 11, 11)
Flattened shape:  [10000   121]
FC shape:  (10000, 64)
Activated shape:  (10000, 64)
y_pred shape:  (10000, 10)
Batch 1 Loss  8.283770071058346
Back Propogation:
Weights shape softmax layer:  (64, 10) (10,)
Weights shape FC layer:  (121, 64) (64,)
Pooled layer 2 shape:  (10000, 1, 12, 12)
Convolved layer 2 shape:  (10000, 1, 14, 14)
Convolved layer 2 weight shape:  (1, 3, 3)
Pooled layer 1 shape:  (10000, 1, 28, 28)
Convolved layer 1 shape:  (10000, 3, 32, 32)
Convolved layer 1 weight shape:  (3, 5, 5)
Epoch  2
Feed Forward:
Convolved shape 1:  (10000, 1, 28, 28)
Activated shape 1:  (10000, 1, 28, 28)
Pooled shape 1:  (10000, 1, 14, 14)
Convolved shape 2:  (10000, 1, 12, 12)
Activated shape 2:  (10000, 1, 12, 12)
Pooled shape 2:  (10000, 1

# Testing Model

In [13]:
# Calculate accuracy with test dataset
f = open("./data/cifar-10-batches-py/test_batch", 'rb')
datadict = cPickle.load(f,encoding='latin1')
f.close()
X_raw = datadict["data"]
Y_test = datadict['labels']
X_test = X_raw.reshape(10000, 3, 32, 32)
batch_size = 10000
y_pred, cache = model(X_test, Y_test)
loss, probs = calculate_loss(batch_size, y_pred, Y_test)
print("Test Data Loss: ", loss)

Convolved shape 1:  (10000, 1, 28, 28)
Activated shape 1:  (10000, 1, 28, 28)
Pooled shape 1:  (10000, 1, 14, 14)
Convolved shape 2:  (10000, 1, 12, 12)
Activated shape 2:  (10000, 1, 12, 12)
Pooled shape 2:  (10000, 1, 11, 11)
Flattened shape:  [10000   121]
FC shape:  (10000, 64)
Activated shape:  (10000, 64)
y_pred shape:  (10000, 10)
Test Data Loss:  8.289296335278532


# Model Accuracy

In [14]:
from sklearn.metrics import accuracy_score
print("Accuracy : ",accuracy_score(Y_test, np.argmax(y_pred, axis=1))*100)

Accuracy :  10.0
