In [506]:
!pip install tqdm



In [1]:
from tqdm import tqdm

In [3]:
import pickle
import numpy as np
import pandas as pd
import keras
import matplotlib.pyplot as plt
from keras.datasets import cifar10
import tensorflow as tf
from tensorflow.keras.optimizers import Adam

2023-04-28 16:26:33.938680: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [5]:
(x_train, y_train), (x_test, y_test) = cifar10.load_data()
print('x_train shape:', x_train.shape)
print('y_train shape:', y_train.shape)
print(x_train.shape[0], 'train samples')
print(x_test.shape[0], 'test samples')

x_train shape: (50000, 32, 32, 3)
y_train shape: (50000, 1)
50000 train samples
10000 test samples


In [6]:
x = np.transpose(x_train, (0, 3, 1, 2))

In [7]:
x_val = np.transpose(x_test, (0, 3, 1, 2))

In [8]:
y_train_one_hot = np.eye(10)[y_train.reshape(-1)]
y_test_one_hot = np.eye(10)[y_test.reshape(-1)]

In [9]:
y_train_one_hot

array([[0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 1.],
       [0., 0., 0., ..., 0., 0., 1.],
       ...,
       [0., 0., 0., ..., 0., 0., 1.],
       [0., 1., 0., ..., 0., 0., 0.],
       [0., 1., 0., ..., 0., 0., 0.]])

In [10]:
class Adam:
    def __init__(self, learning_rate=0.001, beta1=0.9, beta2=0.999, epsilon=1e-8):
        self.learning_rate = learning_rate
        self.beta1 = beta1
        self.beta2 = beta2
        self.epsilon = epsilon
        self.m = None
        self.v = None
        self.t = 0
        
    def update(self, weights, gradients):
        m = self.m
        v = self.v
        learning_rate = self.learning_rate
        epsilon = self.epsilon
        m = np.zeros_like(weights)
        v = np.zeros_like(weights)
        b1 = self.beta1
        b2 = self.beta2
        
        for t in range(1,100):

            # Update the m and v parameter
            m = [b1*m_i + (1 - b1)*g_i for m_i, g_i in zip(m, gradients)]
            v = [b2*v_i + (1 - b2)*(g_i**2) for v_i, g_i in zip(v, gradients)]

            # Bias correction for m and v
            m_cor = [m_i / (1 - (b1**t)) for m_i in m]
            v_cor = [v_i / (1 - (b2**t)) for v_i in v]

            # Update the parameter
            weights = [weight - (learning_rate / (np.sqrt(v_cor_i) + epsilon))*m_cor_i for weight, v_cor_i, m_cor_i in zip(weights, v_cor, m_cor)]
        weights = np.array(weights)
        return weights
        



In [11]:
class ConvLayer:
    def __init__(self, input_channels, num_filters, kernel_size):

        self.input_channels = input_channels
        self.num_filters = num_filters
        self.kernel_size = kernel_size
        self.weights = np.random.randn(num_filters, input_channels, kernel_size, kernel_size) * 0.001
        self.biases = np.zeros((num_filters, 1))

    def forward(self, X):
        #print("Enter Convolution")
        # Save input for backpropagation
        self.X = X
#         print("1. X shape", self.X.shape)
#         print("2. self.weights shape",self.weights.shape)
        # Get dimensions of input
        m, input_channels, input_height, input_width  = X.shape
        
        if input_height < self.kernel_size or input_width < self.kernel_size:
            raise ValueError("Input dimensions are too small for given kernel size")
        # Compute output dimensions
        
        output_height = input_height - self.kernel_size + 1
        output_width = input_width - self.kernel_size + 1

        # Initialize output tensor
        output = np.zeros((m, self.num_filters, output_height, output_width))

        # Perform convolution
        for i in range(output_height):
            for j in range(output_width):
                for f in range(self.num_filters):

                    #Construct the square of the kernel
                    h_start = i
                    h_end = h_start + self.kernel_size
                    w_start = j
                    w_end = w_start + self.kernel_size
                    #selecting all samples and over the whole depth, this particualr kernel square
                    X_slice = X[:, :, h_start:h_end, w_start:w_end]      
                    #weight at the f-th depth considered.            
                    output[:, f, i, j] = np.sum(X_slice * self.weights[f, :, :, :], axis=(1, 2, 3))

        # Add biases and apply activation function
        output += self.biases.reshape(1, self.num_filters, 1, 1)
        #activation function
        output = np.maximum(output, 0)

       
        #print("2. output shape",output.shape)
        return output
                  
    
    def backward(self, dL_dZ):
        # Get dimensions of input
        m, input_channels, input_height, input_width = self.X.shape
        #print("backward - Conv")
        # Get dimensions of output
        m, num_filters, output_height, output_width = dL_dZ.shape
        #print("1. Input dL_dZ shape", dL_dZ.shape)
        # Initialize gradients
        dL_dX = np.zeros_like(self.X)
        dL_dW = np.zeros_like(self.weights)
        dL_db = np.zeros_like(self.biases)
        
        # Compute gradients

        for i in range(output_height):
            for j in range(output_width):
                for f in range(num_filters):

                    h_start = i
                    h_end = h_start + self.kernel_size
                    w_start = j
                    w_end = w_start + self.kernel_size

                    X_slice = self.X[:, :, h_start:h_end, w_start:w_end]
                    dL_dX = dL_dX.astype('float64')
                    dL_dX[:, :, h_start:h_end, w_start:w_end] += np.sum(self.weights[f] * dL_dZ[:, f, i, j][:, np.newaxis, np.newaxis, np.newaxis], axis=0)
                    dL_dW[f] += np.sum(X_slice * dL_dZ[:, f, i, j][:, np.newaxis, np.newaxis, np.newaxis], axis=0)
                    dL_db[f] += np.sum(dL_dZ[:, f, i, j])
        #print("1. Output shape", dL_dX.shape)
        self.weights_grad = dL_dW
        self.bias_grad = dL_db
        return dL_dX, dL_dW, dL_db

In [12]:
class MaxPoolLayer:
    def __init__(self, pool_size):
        self.pool_size = pool_size
    
    def forward(self, X):
        # Save input for backpropagation
        self.X = X
        
        # Get dimensions of input
        m, input_channels, input_height, input_width = X.shape
        
        #print("Enter Pooling") 
        #print("1. X shape", self.X.shape)
        
        output_height = int(input_height / self.pool_size)
        output_width = int(input_width / self.pool_size)
        
        # Initialize output tensor
        output = np.zeros((m, input_channels, output_height, output_width))
        self.max_indices = np.zeros((m, input_channels, output_height*output_width), dtype=int)
        
        # Perform pooling
        for i in range(output_height):
            for j in range(output_width):
                h_start = i * self.pool_size
                h_end = h_start + self.pool_size
                w_start = j * self.pool_size
                w_end = w_start + self.pool_size
                
                X_slice = X[:, :, h_start:h_end, w_start:w_end]
                max_vals = np.amax(X_slice, axis=(2,3), keepdims=True)
                output[:, :, i, j] = max_vals.squeeze()
                for k in range(m):
                    for l in range(input_channels):
                        
                        max_index = np.argmax(X_slice[k,l,:,:], keepdims = True).flatten()
                        #self.max_indices[k,l,i,j] = max_index
                        self.max_indices[:, :, i * output_width + j] = max_index
    
                        
        #print("1. output shape", output.shape)     
        return output
    def backward(self, dL_dZ):
        # Get dimensions of input
        #print("backward - Pooling")
        m, input_channels, input_height, input_width = self.X.shape

        # Get dimensions of output
        m, num_filters, output_height, output_width = dL_dZ.shape
        #print("1. Input dL_dZ shape", dL_dZ.shape)
        # Initialize gradients
        dL_dX = np.zeros_like(self.X)
        max_indices_reshaped = self.max_indices.reshape(m, input_channels, output_height, output_width)
        # Compute gradients
        for i in range(output_height):
            for j in range(output_width):
                h_start = i * self.pool_size
                h_end = h_start + self.pool_size
                w_start = j * self.pool_size
                w_end = w_start + self.pool_size

                dL_dZ_slice = dL_dZ[:, :, i, j][:, :, np.newaxis, np.newaxis]
                X_slice = self.X[:, :, h_start:h_end, w_start:w_end]
                max_vals = np.amax(X_slice, axis=(2,3), keepdims=True)
                dL_dX_mask = (X_slice == max_vals) * dL_dZ_slice
                dL_dX[:, :, h_start:h_end, w_start:w_end] += dL_dX_mask
        return dL_dX, dL_dX, dL_dX
 

In [13]:
class FCLayer:
    def __init__(self, input_size, output_size, activation=None):
        # Initialize weights and biases
        self.weights = np.random.randn(output_size, input_size) * 0.01
        self.biases = np.zeros((output_size, 1))
        self.activation = activation
        
    def forward(self, X):
        # Save input for backpropagation
        #print("Enter Fully Connected")
        self.X = X
        #print("1. Original X",  X.shape)
        # Flatten input
        X_flat = X.reshape(X.shape[0], -1)
        
#         print("2. Flattened X",  X_flat.shape)
#         print("3. Transpose weights shape", self.weights.T.shape)
#         print("4. Transpose biases shape", self.biases.T.shape)
        # Compute output
        output = np.dot(X_flat, self.weights.T) + self.biases.T      
        if self.activation is not None:
            output = self.activation(output,0)
#         print("5. output shape", output.shape)
        return output
    
    def backward(self, dL_dZ):
        # Flatten input
#         print("backward - FC")
#         print("1. Input dldlz shape ", dL_dZ.shape)
#         print("2. X shape ", self.X.shape)
        
        X_flat = self.X.reshape(self.X.shape[0], -1)
#         print("3. X_flat shape ", X_flat.shape)
        dL_dX_flat = np.dot(dL_dZ, self.weights)
        dL_dW = np.dot(dL_dZ.T, X_flat)
        dL_db = np.sum(dL_dZ, axis=0, keepdims=True).T
        
        # Reshape gradients
        dL_dX = dL_dX_flat.reshape(self.X.shape)
#         print("4. dL_dX shape, input to next ", dL_dX.shape)
        self.weights_grad = dL_dW
        self.bias_grad = dL_db
        return dL_dX, dL_dW, dL_db

In [14]:
def categorical_crossentropy(y_pred, y_true):
    num_samples = y_pred.shape[0]
    num_classes = y_pred.shape[1]
    loss = -np.sum(y_true * np.log(y_pred + 1e-10)) / num_samples / num_classes
    return loss

In [25]:

    # Define the architecture
    architecture = [
        ConvLayer(3, 32, 3), # CONV1
        MaxPoolLayer(2), # POOL1
        ConvLayer(32, 64, 5), # CONV2
        MaxPoolLayer(2), # POOL2
        ConvLayer(64, 64, 3), # CONV3
        FCLayer(576, 64), # FC1
        FCLayer(64, 10,np.maximum) # FC2
    ]
    

    # Define the forward pass function
    def forward(x, architecture):
        output = x
        for layer in architecture:
            output = layer.forward(output)
        return output

    # Define the backward pass function
    def backward(y_pred, y_true, architecture):
        grad = y_pred - y_true
        for layer in reversed(architecture):
            grad, gradw, gradb = layer.backward(grad)
            
    def train(x_train, y_train, x_val, y_val, architecture, epochs, learning_rate, batch_size):
        train_loss_history = []
        val_loss_history = []
        val_acc_history = []
        overall_val_acc_history = []
        optimizer = Adam(learning_rate=learning_rate)
        # Define a list of trainable variables
        num_batches = len(x_train) // batch_size
            
        for i in tqdm(range(epochs)):
            epoch_train_loss = 0
            epoch_val_loss = 0
            epoch_val_acc = 0
            
            batchno = 0
            for batch_idx in range(num_batches):
                batchno = batchno + 1
                start_idx = batch_idx * batch_size
                end_idx = (batch_idx + 1) * batch_size
                
                # Forward pass
                y_pred = forward(x_train[start_idx:end_idx], architecture)
                
                # Compute loss
                loss = categorical_crossentropy(y_pred, y_train[start_idx:end_idx])
                epoch_train_loss += loss
                
                # Backward pass
                backward(y_pred, y_train[start_idx:end_idx], architecture)
                
                # Update weights using Adam optimizer
                for layer in architecture:
                    if hasattr(layer, 'weights'):
                        layer.weights = optimizer.update(layer.weights, layer.weights_grad)
                        layer.biases = optimizer.update(layer.biases, layer.bias_grad)
                print("Currently on the batch - ", batchno)
            
            # Compute the average training loss for the epoch
            epoch_train_loss /= num_batches
            train_loss_history.append(epoch_train_loss)
            
            # Compute the validation loss and accuracy for the epoch
            y_val_pred = forward(x_val, architecture)
            epoch_val_loss = categorical_crossentropy(y_val_pred, y_val)
            val_loss_history.append(epoch_val_loss)
            epoch_val_acc = categorical_accuracy(y_val_pred, y_val)
            val_acc_history.append(epoch_val_acc)
            
            # Compute overall validation accuracy
            overall_val_acc = np.mean(np.argmax(y_val, axis=1) == np.argmax(y_val_pred, axis=1))
            overall_val_acc_history.append(overall_val_acc)
            
            # Print progress
            print("Epoch {}/{} - train loss: {:.4f} - val loss: {:.4f} - val accuracy: {:.4f}".format(
                epoch+1, epochs, epoch_train_loss, epoch_val_loss, overall_val_acc))
        return train_loss_history, val_loss_history, val_acc_history, overall_val_acc_history
            


In [26]:
train(x, y_train, x_val, y_test, architecture, epochs=20, learning_rate=0.001, batch_size=32)

  0%|                                                    | 0/20 [00:00<?, ?it/s]

Currently on the batch -  1
Currently on the batch -  2
Currently on the batch -  3
Currently on the batch -  4
Currently on the batch -  5
Currently on the batch -  6
Currently on the batch -  7
Currently on the batch -  8
Currently on the batch -  9
Currently on the batch -  10
Currently on the batch -  11
Currently on the batch -  12
Currently on the batch -  13
Currently on the batch -  14
Currently on the batch -  15
Currently on the batch -  16
Currently on the batch -  17
Currently on the batch -  18
Currently on the batch -  19
Currently on the batch -  20
Currently on the batch -  21
Currently on the batch -  22
Currently on the batch -  23
Currently on the batch -  24
Currently on the batch -  25
Currently on the batch -  26
Currently on the batch -  27
Currently on the batch -  28
Currently on the batch -  29
Currently on the batch -  30
Currently on the batch -  31
Currently on the batch -  32
Currently on the batch -  33
Currently on the batch -  34
Currently on the batch 

  0%|                                                 | 0/20 [18:52:13<?, ?it/s]


KeyboardInterrupt: 

In [27]:
epochs = 20
# Plot the training and validation losses
plt.plot(range(1, epochs+1), train_loss_history, label='Training Loss')
plt.plot(range(1, epochs+1), val_loss_history, label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

# Plot the validation accuracy
plt.plot(range(1, epochs+1), val_acc_history)
plt.xlabel('Epochs')
plt.ylabel('Validation Accuracy')
plt.show()

# Report the class-wise validation accuracy
y_val_pred = forward(x_val, architecture)
class_val_acc = []
for i in range(10):
    class_idx = np.where(y_val[:,i] == 1)[0]
    class_pred = np.argmax(y_val_pred[class_idx], axis=1)
    class_true = np.argmax(y_val[class_idx], axis=1)
    class_acc = np.mean(class_pred == class_true)
    class_val_acc.append(class_acc)
    print("Validation accuracy for class {}: {:.4f}".format(i, class_acc))

# Plot the epoch vs. overall validation accuracy
plt.plot(range(1, epochs+1), overall_val_acc_history)
plt.xlabel('Epochs')
plt.ylabel('Overall Validation Accuracy')
plt.show()

NameError: name 'train_loss_history' is not defined