# Building CNN from Scratch

## Basic Introduction

* CNN widely known for their applications in
  * Image recognition
  * Object detection
* Basic CNN architecture can be shown as
  * <img src = "visuals/cnn_basic_architecture.jpg" width=800>

# Implementation

## Importing Libraries

In [42]:
import numpy as np
from scipy.signal import correlate2d
import tensorflow.keras as keras
from sklearn.metrics import accuracy_score
from tensorflow.keras.utils import to_categorical

## Making Convolution Class

**what do we need for the convolution layers ???**
* we need <u>number of filters</u> that we want to slide over the input data to extract different features.
  * we need to tell what is the <u>input size</u> to scan on.
  * we need <u>filter size</u> of these filters. Filters are square size, so we need only 1 dimension.
  * we need to tell <u>output shape</u> (shape obtained after convolution). Formula is, $$\text{Output shape} = \frac{\text{Input shape} - \text{filter size} + 2*\text{padding}}{\text{Stride}}+1$$
    > If stride increases, output dimension will decrease. However, if padding increases, output dimension will increase. 

In [43]:
''' Initialize basic attributes of a convolution class '''
class Convolution:
    def __init__(self, input_shape, filter_size, num_filters):
        # get height and width from input_shape
        input_height, input_width = input_shape
        self.input_shape = input_shape    # input to scan on?
        self.num_filters = num_filters    # how many filters?
        self.filter_shape = (num_filters, filter_size, filter_size) 

        # with zero padding and a stride of 1
        self.output_shape = (num_filters, 
                             input_height - filter_size + 1, 
                             input_width - filter_size + 1)
        
        self.filters = np.random.randn(*self.filter_shape)   # weights
        self.biases = np.random.randn(*self.output_shape)    # and biases

### Forward Pass in Covolution

Let's see how convolution happens in forward pass,
As an example, consider an input as follows,
$$ \begin{bmatrix} 1 & 2 & 3 & 0\\ 0 & 1 & 2 & 3\\ 3 & 2 & 1 & 0\\ 0 & 1 & 2 & 1\end{bmatrix} $$
and kernel as follows,
$$ \begin{bmatrix} 1 & 0 \\ 0 & -1\end{bmatrix} $$

Sliding this kernel on top-left window, i.e.,
$$ \begin{bmatrix} 1 & 2 \\ 0 & 1\end{bmatrix} $$

will give, $1*1 + 2*0 + 0*0 + 1*-1 = 0$ as the element in (1, 1) of output.

In [44]:
''' This is forward pass in convolution - effectively the conv process '''
def conv_forward(self, input_data):
    # assign input data to class attribute
    self.input_data = input_data
    
    # Initialized the input value
    output = np.zeros(self.output_shape)

    # each convolution will give 1 layer, and thus each will be stacked.
    # output[0] will be first layer, output[1] will be second layer and so on ...
    for i in range(self.num_filters):
        output[i] = correlate2d(self.input_data, self.filters[i], mode="valid")
    
    # Applying Relu Activation function
    ### ReLU is an element-wise operation. It simply replaces all 
    ### negative values with zero and leaves positive values unchanged, 
    ### keeping the shape intact.
    output = np.maximum(output, 0)
    
    return output 

# assign to Convolution class
Convolution.forward = conv_forward

<img src= "visuals/cnn_forward.jpg" width=800>

### Backward Pass in Covolution

<img src= "visuals/cnn_backward.jpg" width=800>

In [45]:
''' Method for backward pass in convolution layer '''
def conv_backward(self, dL_dz, lr):
    # Create a random dL_dout array to accommodate output gradients
    dL_dinput = np.zeros_like(self.input_data)
    dL_dw = np.zeros_like(self.filters)

    # we will do this for all layers of filters
    for i in range(self.num_filters):
        # Calculating the gradient of loss with respect to filter
        ### Question is "How much does changing this filter affect the output?"
        dL_dw[i] = correlate2d(self.input_data, 
                               dL_dz[i],
                               mode="valid")

        # Calculating the gradient of loss with respect to inputs
        dL_dinput += correlate2d(dL_dz[i],
                                 self.filters[i], 
                                 mode="full")

    # Updating the parameters with learning rate
    self.filters -= lr * dL_dw
    self.biases -= lr * dL_dz

    # returning the gradient of inputs
    return dL_dinput

Convolution.backward = conv_backward

## Building the Max-Pooling layer

In [46]:
''' This is class for max pooling layer '''
class MaxPool:
    def __init__(self, pool_size):
        self.pool_size = pool_size

### Forward pass in Max-Pooling layer

During the forward pass of Max Pooling, the input is taken from the output of the convolution operation

In [47]:
def pool_forward(self, input_data):
    self.input_data = input_data

    # num_channels is actually the number of filters
    # input_height and input_width are dimensions as a result of convolution
    self.num_channels, self.input_height, self.input_width = input_data.shape
    
    # each pooling window covers pool_size × pool_size pixels 
    # and produces one output, reducing the spatial size by that factor
    self.output_height = self.input_height // self.pool_size
    self.output_width = self.input_width // self.pool_size

    # Determining the output shape
    self.output = np.zeros((self.num_channels, 
                            self.output_height, 
                            self.output_width))

    # Iterating over different channels
    for c in range(self.num_channels):
        # Looping through the height
        for i in range(self.output_height):
            # looping through the width
            for j in range(self.output_width):

                # Starting postition
                start_i = i * self.pool_size
                start_j = j * self.pool_size

                # Ending Position
                end_i = start_i + self.pool_size
                end_j = start_j + self.pool_size

                # Creating a patch from the input data
                patch = input_data[c, start_i:end_i, start_j:end_j]

                #Finding the maximum value from each patch/window
                self.output[c, i, j] = np.max(patch)

    return self.output

MaxPool.forward = pool_forward

* In the above code, we have multiple kernels/filters, resulting in multiple feature maps (multiple channels).
* To handle this, we need three nested loops.
* The outermost loop iterates over each of these feature maps, while the other two loops traverse the height and width of the feature maps.

### Backward pass in Max-Pooling layer

* **We do not calculate gradients in max pooling layer.**
* Instead, we transmit the maximum gradients obtained from the previous layer directly to the corresponding locations in the next layer.
* This process ensures that the maximum gradient values flow through the MaxPooling layer and continue propagating through the network. 

In [48]:
''' This method covers backward pass through the max pooling layer '''
def pool_backward(self, dL_dz, lr):
    # initialize gradients wrt inputs
    dL_dinput = np.zeros_like(self.input_data)

    for c in range(self.num_channels):
        for i in range(self.output_height):
            for j in range(self.output_width):
                start_i = i * self.pool_size
                end_i = start_i + self.pool_size
                
                start_j = j * self.pool_size                
                end_j = start_j + self.pool_size
                
                patch = self.input_data[c, start_i:end_i, start_j:end_j]

                # this creates a binary mask of 1 where max value occurred,
                # 0 elsewhere
                mask = patch == np.max(patch)

                # dL_dz[c,i,j] is the incoming gradient and it is only
                # assigned to max location in the patch
                # In other words, The gradient from the next layer is 
                # broadcasted back only to the position that won in max pooling.
                dL_dinput[c,start_i:end_i, start_j:end_j] = dL_dz[c, i, j] * mask

    return dL_dinput

MaxPool.backward = pool_backward

## Fully Connected (Dense) Layer

In [49]:
''' This class if for fully connected layer '''
class Fully_Connected:
    def __init__(self, input_size, output_size):
        self.input_size = input_size # Size of the inputs coming
        self.output_size = output_size # Size of the output producing
        self.weights = np.random.randn(output_size, self.input_size)
        self.biases = np.random.rand(output_size, 1)

### Activation function

$$
\text{Softmax}(z_i) = \frac{e^{z_i}}{\sum_{j=1}^{K} e^{z_j}}
$$

In [50]:
def fc_softmax(self, z):
    # Shift the input values to avoid numerical instability
    shifted_z = z - np.max(z)
    exp_values = np.exp(shifted_z)
    sum_exp_values = np.sum(exp_values, axis=0)
    log_sum_exp = np.log(sum_exp_values)

    # Compute the softmax probabilities
    probabilities = exp_values / sum_exp_values

    return probabilities

Fully_Connected.softmax = fc_softmax

In [51]:
def fc_softmax_derivative(self, s):
    return np.diagflat(s) - np.dot(s, s.T)

Fully_Connected.softmax_derivative = fc_softmax_derivative

### Forward pass in dense layer (including flattening)

In [52]:
def fc_forward(self, input_data):
    self.input_data = input_data

    # flatten layer
    flattened_input = input_data.flatten().reshape(1, -1)

    # z = w.a + b
    self.z = np.dot(self.weights, flattened_input.T) + self.biases

    # Applying Softmax
    self.output = self.softmax(self.z)
    return self.output

Fully_Connected.forward = fc_forward

### Backward pass in dense layer

In [53]:
def fc_backward(self, dL_dout, lr):
    # Calculate the gradient of the loss with respect to the pre-activation (z)
    dL_dy = np.dot(self.softmax_derivative(self.output), dL_dout)
    
    # Calculate the gradient of the loss with respect to the weights (dw)
    dL_dw = np.dot(dL_dy, self.input_data.flatten().reshape(1, -1))

    # Calculate the gradient of the loss with respect to the biases (db)
    dL_db = dL_dy

    # Calculate the gradient of the loss with respect to the input data (dL_dinput)
    dL_dinput = np.dot(self.weights.T, dL_dy)
    dL_dinput = dL_dinput.reshape(self.input_data.shape)

    # Update the weights and biases based on the learning rate and gradients
    self.weights -= lr * dL_dw
    self.biases -= lr * dL_db

    # Return the gradient of the loss with respect to the input data
    return dL_dinput

Fully_Connected.backward = fc_backward

## Model Loss

In [54]:
def cross_entropy_loss(predictions, targets):
    num_samples = 10

    # Avoid numerical instability by adding a small epsilon value
    epsilon = 1e-7
    predictions = np.clip(predictions, epsilon, 1 - epsilon)
    loss = -np.sum(targets * np.log(predictions)) / num_samples
    return loss

def cross_entropy_loss_gradient(actual_labels, predicted_probs):
    num_samples = actual_labels.shape[0]
    gradient = -actual_labels / (predicted_probs + 1e-7) / num_samples

    return gradient

# Training

In [55]:
def train_network(X, y, conv, pool, full, lr=0.01, epochs=50):
    for epoch in range(epochs):
        total_loss = 0.0
        correct_predictions = 0

        for i in range(len(X)):
            # Forward pass
            conv_out = conv.forward(X[i])
            pool_out = pool.forward(conv_out)
            full_out = full.forward(pool_out)
            loss = cross_entropy_loss(full_out.flatten(), y[i])
            total_loss += loss

            # Converting to One-Hot encoding
            one_hot_pred = np.zeros_like(full_out)
            one_hot_pred[np.argmax(full_out)] = 1
            one_hot_pred = one_hot_pred.flatten()

            num_pred = np.argmax(one_hot_pred)
            num_y = np.argmax(y[i])

            if num_pred == num_y:
                correct_predictions += 1
            # Backward pass
            gradient = cross_entropy_loss_gradient(y[i], full_out.flatten()).reshape((-1, 1))
            full_back = full.backward(gradient, lr)
            pool_back = pool.backward(full_back, lr)
            conv_back = conv.backward(pool_back, lr)

        # Print epoch statistics
        average_loss = total_loss / len(X)
        accuracy = correct_predictions / len(X_train) * 100.0
        print(f"Epoch {epoch + 1}/{epochs} - Loss: {average_loss:.4f} - Accuracy: {accuracy:.2f}%")


# Prediction

In [56]:
def predict(input_sample, conv, pool, full):
    # Forward pass through Convolution and pooling
    conv_out = conv.forward(input_sample)
    pool_out = pool.forward(conv_out)
    # Flattening
    flattened_output = pool_out.flatten()
    # Forward pass through fully connected layer
    predictions = full.forward(flattened_output)
    return predictions

# Getting Data

In [57]:
# Load the Fashion MNIST dataset
(train_images, train_labels), (_,_) = keras.datasets.fashion_mnist.load_data()

# Code Run down

In [58]:
X_train = train_images[:5000] / 255.0
y_train = train_labels[:5000]

X_test = train_images[5000:10000] / 255.0
y_test = train_labels[5000:10000]

In [59]:
# Converting labels to One-Hot vectors for easy loss calculation, 
# and training stability
y_train = to_categorical(y_train)
y_test = to_categorical(y_test)

## Training

In [60]:
X_train[0].shape

(28, 28)

In [61]:
conv = Convolution(X_train[0].shape, 6, 1)
pool = MaxPool(2)
full = Fully_Connected(121, 10)

In [62]:
train_network(X_train, y_train, conv, pool, full)

Epoch 1/50 - Loss: 1.0968 - Accuracy: 22.68%
Epoch 2/50 - Loss: 0.9370 - Accuracy: 32.40%
Epoch 3/50 - Loss: 0.8111 - Accuracy: 33.80%
Epoch 4/50 - Loss: 0.7274 - Accuracy: 44.86%
Epoch 5/50 - Loss: 0.6621 - Accuracy: 49.78%
Epoch 6/50 - Loss: 0.6009 - Accuracy: 51.62%
Epoch 7/50 - Loss: 0.5486 - Accuracy: 52.18%
Epoch 8/50 - Loss: 0.4724 - Accuracy: 55.94%
Epoch 9/50 - Loss: 0.3876 - Accuracy: 56.98%
Epoch 10/50 - Loss: 0.3410 - Accuracy: 55.86%
Epoch 11/50 - Loss: 0.2467 - Accuracy: 57.74%
Epoch 12/50 - Loss: 0.1857 - Accuracy: 59.86%
Epoch 13/50 - Loss: 0.1116 - Accuracy: 64.40%
Epoch 14/50 - Loss: 0.0956 - Accuracy: 67.64%
Epoch 15/50 - Loss: 0.0883 - Accuracy: 70.18%
Epoch 16/50 - Loss: 0.0848 - Accuracy: 71.14%
Epoch 17/50 - Loss: 0.0819 - Accuracy: 72.56%
Epoch 18/50 - Loss: 0.0803 - Accuracy: 73.38%
Epoch 19/50 - Loss: 0.0784 - Accuracy: 73.78%
Epoch 20/50 - Loss: 0.0766 - Accuracy: 74.46%
Epoch 21/50 - Loss: 0.0758 - Accuracy: 74.52%
Epoch 22/50 - Loss: 0.0749 - Accuracy: 75.1

## Making Predictions

In [63]:
predictions = []

for data in X_test:
    pred = predict(data, conv, pool, full)
    one_hot_pred = np.zeros_like(pred)
    one_hot_pred[np.argmax(pred)] = 1
    predictions.append(one_hot_pred.flatten())

predictions = np.array(predictions)

In [64]:
accuracy_score(predictions, y_test)

0.7566