In [1]:
import numpy as np
from scipy.signal import correlate2d
import tensorflow as tf
from tensorflow import keras

## **Input Data**

In [2]:
(train_images, train_labels), (test_images, test_labels) = keras.datasets.fashion_mnist.load_data()

X_train = train_images[:5000] / 255.0
y_train = train_labels[:5000]

X_test = train_images[5000:10000] / 255.0
y_test = train_labels[5000:10000]

X_train.shape

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-labels-idx1-ubyte.gz
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-images-idx3-ubyte.gz
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-labels-idx1-ubyte.gz
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-images-idx3-ubyte.gz


(5000, 28, 28)

In [3]:
from keras.utils import to_categorical

y_train = to_categorical(y_train)
y_test = to_categorical(y_test)

y_test[0]

array([0., 0., 0., 0., 1., 0., 0., 0., 0., 0.], dtype=float32)

## **Convolution Layer**

In [9]:
class Convolution:
  def __init__(self, input_shape, filter_size, num_filters):
    input_height, input_width = input_shape
    self.num_filters = num_filters
    self.input_shape = input_shape

    #Size of outputs and filters
    self.filter_shape = (num_filters, filter_size, filter_size) #(3,3)
    self.output_shape = (num_filters, input_height - filter_size + 1, input_width - filter_size + 1)

    #Initialize the convolutional filters and biases with random values
    self.filters = np.random.randn(*self.filter_shape) #shape is determined by filter shape
    self.biases = np.random.randn(*self.output_shape) #shape is determined by output shape


  def forward(self, input_data):
    #Storing the input data for later use
    self.input_data = input_data

    #shape of this 0 filled output array is determined by the output shape
    output = np.zeros(self.output_shape)

    #Performing Convolution between the input data and the current filter (convolution operation for each filter)
    for i in range(self.num_filters):
      output[i] = correlate2d(self.input_data, self.filters[i], mode="valid")

    #Applying Relu Activtion function(element wise). This sets all -ve values to 0 and introduces non-linearity to our model
    output = np.maximum(output, 0)
    return output


  def backward(self, dL_dout, lr):
    #Initialize arrays to store the gradients of the loss w.r.t the input data and the filters
    dL_dinput = np.zeros_like(self.input_data)
    dL_dfilters = np.zeros_like(self.filters)

    #Calculate gradient of the loss w.r.t the filters(for updating the filters during optimization filters)
    for i in range(self.num_filters):
      dL_dfilters[i] = correlate2d(self.input_data, dL_dout[i],mode="valid")

      #Calculating the gradient of loss w.r.t inputs
      dL_dinput += correlate2d(dL_dout[i],self.filters[i], mode="full")

    #Updating the filters and bias with learning rate and the gradients
    self.filters -= lr * dL_dfilters
    self.biases -= lr * dL_dout

    #returning the gradient of inputs
    return dL_dinput

## **Max-Pooling Layer**

In [11]:
class MaxPool:
  def __init__(self, pool_size):
    self.pool_size = pool_size

  def forward(self, input_data):
    self.input_data = input_data
    self.num_channels, self.input_height, self.input_width = input_data.shape
    self.output_height = self.input_height // self.pool_size
    self.output_width = self.input_width // self.pool_size

    #An array filled with 0 to store the output of Max-Pooling function. Shape is determined by the 3 parameters
    self.output = np.zeros((self.num_channels, self.output_height, self.output_width))

    #Iterating over each channels of input_data
    for c in range(self.num_channels):

      #Iterate through the height of output feature map
      for i in range(self.output_height):

        #Iterate through the width of output feature map
        for j in range(self.output_width):

          #Calculates the starting and ending positions of the current patch in the input data
          start_i = i * self.pool_size
          start_j = j * self.pool_size
          end_i = start_i + self.pool_size
          end_j = start_j + self.pool_size

          #Extracts a patch from the input data based on the calculated coordinates
          patch = input_data[c, start_i:end_i, start_j:end_j]

          #perform the max-pooling operation for the current patch and Finding the maximum value from each patch/window
          self.output[c, i, j] = np.max(patch)
    return self.output


  def backward(self, dL_dout, lr):
    #Initialize an array to store the gradient of the loss w.r.t the input data.
    dL_dinput = np.zeros_like(self.input_data)

    for c in range(self.num_channels):
      for i in range(self.output_height):
        for j in range(self.output_width):
          start_i = i * self.pool_size
          start_j = j * self.pool_size
          end_i = start_i + self.pool_size
          end_j = start_j + self.pool_size

          patch = self.input_data[c, start_i:end_i, start_j:end_j]

          #Creates a binary mask where the maximum value in the patch is marked as True
          mask = patch == np.max(patch)

          #Assigns the gradient of the loss w.r.t the output multiplied by the binary mask to the corresponding positions in the gradient array
          dL_dinput[c,start_i:end_i, start_j:end_j] = dL_dout[c, i, j] * mask

    return dL_dinput

## **Dense(Softmax) Layer**

In [12]:
class Fully_Connected:
  def __init__(self, input_size, output_size):
    self.input_size = input_size
    self.output_size = output_size

    #Initialize random values of weights and biases
    self.weights = np.random.randn(output_size, self.input_size)
    self.biases = np.random.rand(output_size, 1)

  def softmax(self, z):
    #Shift the input values to avoid numerical instability to prevent large exponentiated values
    shifted_z = z - np.max(z)
    exp_values = np.exp(shifted_z)

    #Sum used in the softmax denominator
    sum_exp_values = np.sum(exp_values, axis=0)

    #Log useful for numerical stability
    log_sum_exp = np.log(sum_exp_values)

    #Calculate the softmax probabilities(resulting probabilities sum to 1)
    probabilities = exp_values / sum_exp_values

    return probabilities

  #calculate the derivative of the softmax activation function
  def softmax_derivative(self, s):

    #Create a diagonal matrix with the elements of the input vector s and subtract the outer product of the softmax probabilities s
    return np.diagflat(s) - np.dot(s, s.T)

  def forward(self, input_data):
    self.input_data = input_data

    #Flattens/converts the input data into a 1D array(vector)
    flattened_input = input_data.flatten().reshape(1, -1)

    #Computing the linear transformation
    self.z = np.dot(self.weights, flattened_input.T) + self.biases

    #Applying Softmax to to the abouve output(sums to 1)
    self.output = self.softmax(self.z)
    return self.output

  def backward(self, dL_dout, lr):

    #Calculate the gradient of the loss using chain rule w.r.t the pre-activation z
    dL_dy = np.dot(self.softmax_derivative(self.output), dL_dout)

    #Calculate the gradient of the loss w.r.t weights
    dL_dw = np.dot(dL_dy, self.input_data.flatten().reshape(1, -1))

    #Calculate the gradient of the loss w.r.t biases
    dL_db = dL_dy

    #Calculate the gradient of the loss w.r.t input data
    dL_dinput = np.dot(self.weights.T, dL_dy)
    dL_dinput = dL_dinput.reshape(self.input_data.shape)

    #Update the weights and biases based on the learning rate and gradients
    self.weights -= lr * dL_dw
    self.biases -= lr * dL_db

    #Return the gradient of the loss w.r.t the input data
    return dL_dinput


## **Cross-Entropy Loss**

In [13]:
def cross_entropy_loss(predictions, targets):
  num_samples = 10

  #Using epsilon value to avoid numerical instability
  epsilon = 1e-7
  predictions = np.clip(predictions, epsilon, 1 - epsilon)

  #Calcultae cross entropy loss
  loss = -np.sum(targets * np.log(predictions)) / num_samples
  return loss

def cross_entropy_loss_gradient(actual_labels, predicted_probs):
  num_samples = actual_labels.shape[0]
  gradient = -actual_labels / (predicted_probs + 1e-7) / num_samples

  return gradient


## **Training the Model**

In [14]:
#initialize a convolutional layer with the specified input shape, number of filters, and filter size
conv = Convolution(X_train[0].shape, 6, 1)

#initialize a max-pooling layer with a pooling size of 2
pool = MaxPool(2)

#initialize a fully connected layer with the specified input and output size (10 no of neurons)
full = Fully_Connected(121, 10)

def train_network(X, y, conv, pool, full, lr=0.01, epochs=50):
  for epoch in range(epochs):
    total_loss = 0.0
    correct_predictions = 0

    for i in range(len(X)):

      #Each input sample X[i] is passed through the convolutional layer, the max-pooling layer, and the fully connected layer
      conv_out = conv.forward(X[i])
      pool_out = pool.forward(conv_out)
      full_out = full.forward(pool_out)

      #Calculate the cross entropy loss
      loss = cross_entropy_loss(full_out.flatten(), y[i])
      total_loss += loss

      #Converting to One-Hot encoding to compare with the ground truth values
      one_hot_pred = np.zeros_like(full_out)

      #find the max value in the output layer and set it to 1
      one_hot_pred[np.argmax(full_out)] = 1

      #Flatten to match the output shape of the ground truth labels
      one_hot_pred = one_hot_pred.flatten()

      #Find the index of the max value in predictions and ground truth labels
      num_pred = np.argmax(one_hot_pred)
      num_y = np.argmax(y[i])

      if num_pred == num_y:
        correct_predictions += 1

      #perform backward propagation
      gradient = cross_entropy_loss_gradient(y[i], full_out.flatten()).reshape((-1, 1))
      full_back = full.backward(gradient, lr)
      pool_back = pool.backward(full_back, lr)
      conv_back = conv.backward(pool_back, lr)

    #Printing the epoch statistics
    average_loss = total_loss / len(X)
    accuracy = correct_predictions / len(X_train) * 100.0
    print(f"Epoch {epoch + 1}/{epochs} - Loss: {average_loss:.4f} - Accuracy: {accuracy:.2f}%")

## **Predictions**

In [15]:
def predict(input_sample, conv, pool, full):

  #Perform forward pass through Convolution and pooling
  conv_out = conv.forward(input_sample)
  pool_out = pool.forward(conv_out)

  #Flattening the layer to match the output shape
  flattened_output = pool_out.flatten()

  #Perform forward pass through fully connected layer
  predictions = full.forward(flattened_output)
  return predictions

In [16]:
train_network(X_train, y_train, conv, pool, full)


Epoch 1/20 - Loss: 1.0678 - Accuracy: 22.50%
Epoch 2/20 - Loss: 0.7336 - Accuracy: 31.74%
Epoch 3/20 - Loss: 0.3990 - Accuracy: 28.56%
Epoch 4/20 - Loss: 0.2169 - Accuracy: 36.08%
Epoch 5/20 - Loss: 0.1669 - Accuracy: 45.72%
Epoch 6/20 - Loss: 0.1445 - Accuracy: 51.70%
Epoch 7/20 - Loss: 0.1282 - Accuracy: 56.16%
Epoch 8/20 - Loss: 0.1173 - Accuracy: 59.12%
Epoch 9/20 - Loss: 0.1102 - Accuracy: 61.62%
Epoch 10/20 - Loss: 0.1050 - Accuracy: 63.24%
Epoch 11/20 - Loss: 0.1011 - Accuracy: 64.52%
Epoch 12/20 - Loss: 0.0979 - Accuracy: 65.68%
Epoch 13/20 - Loss: 0.0952 - Accuracy: 66.50%
Epoch 14/20 - Loss: 0.0930 - Accuracy: 67.52%
Epoch 15/20 - Loss: 0.0910 - Accuracy: 68.54%
Epoch 16/20 - Loss: 0.0892 - Accuracy: 69.26%
Epoch 17/20 - Loss: 0.0876 - Accuracy: 69.60%
Epoch 18/20 - Loss: 0.0861 - Accuracy: 70.22%
Epoch 19/20 - Loss: 0.0849 - Accuracy: 70.46%
Epoch 20/20 - Loss: 0.0837 - Accuracy: 71.00%


In [17]:
predictions = []

for data in X_test:
  pred = predict(data, conv, pool, full)
  one_hot_pred = np.zeros_like(pred)
  one_hot_pred[np.argmax(pred)] = 1
  predictions.append(one_hot_pred.flatten())

predictions = np.array(predictions)

predictions

array([[0., 0., 0., ..., 0., 0., 0.],
       [1., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 1.],
       ...,
       [0., 0., 0., ..., 0., 0., 0.],
       [1., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.]])

In [18]:
from sklearn.metrics import accuracy_score

accuracy_score(predictions, y_test)

0.6818