In [1157]:
# import libraries
import numpy as np
import os
import cv2
import matplotlib.pyplot as plt
import pandas as pd
import math

In [1158]:
class ConvolutionLayer:
 
    def __init__(self, num_of_filters, kernel_size, padding, stride=1):
        self.num_of_filters = num_of_filters
        self.kernel_size_h = kernel_size
        self.kernel_size_w = kernel_size
        self.stride = stride
        self.padding = padding
        self.weights = None
        self.biases = None
 
    def forward(self, input_data):
        self.input_shape = input_data.shape
        self.input_data = input_data
        batch_size, height, width, channels = input_data.shape
 
        # print("input data shape: ", input_data.shape)
        # print(batch_size, height, width, channels)
        output_height = int(math.floor(
            (height - self.kernel_size_h + 2 * self.padding)) / self.stride + 1)
        output_width = int(math.floor(
            (width - self.kernel_size_w + 2 * self.padding)) / self.stride + 1)
        output_shape = (batch_size, output_height,
                        output_width, self.num_of_filters)
 
        if self.weights is None:
 
            # initialize weights and biases
            # also include channels in the shape
            # do i have to also include the channel numbers when initializing the weights?
            self.weights = np.random.randn(self.num_of_filters, self.kernel_size_h,
                                           self.kernel_size_w, channels) / np.sqrt(self.kernel_size_h * self.kernel_size_w * channels)
 
            # print("weights shape: ", self.weights.shape)
 
 
 
        if self.biases is None:
            self.biases = np.zeros(self.num_of_filters)
 
        # pad the input data
        self.input_data_padded = np.pad(input_data, ((
            0, 0), (self.padding, self.padding), (self.padding, self.padding), (0, 0)), 'constant')
 
 
        output = np.zeros(output_shape)
 
        for i in range(output_height):
            for j in range(output_width):
                input_matrix = self.input_data_padded[:, i * self.stride: i * self.stride +
                                                      self.kernel_size_h, j * self.stride: j * self.stride + self.kernel_size_w, :]
                input_matrix = input_matrix.reshape(batch_size, -1)
                # print("input matrix: ", input_matrix.shape)
                # print("weights: ", self.weights.shape)
                temp_weight = self.weights.reshape(self.num_of_filters, -1)
                # print("temp weight: ", temp_weight.shape)
                output[:, i, j, :] = np.dot(
                    input_matrix, temp_weight.T) + self.biases
 
        return output
 
    def backprop(self, output_error, learning_rate=0.05):
        lr = learning_rate
        batch_size, height, width, channels = self.input_shape
        output_height, output_width, output_channels = output_error.shape[1:]
 
        # print("output error shape: ", output_error.shape)
        # print("input shape: ", self.input_shape)
 
        # initialize the weight error and bias error
        weight_error = np.zeros(self.weights.shape)
        bias_error = np.zeros(self.biases.shape)
 
        # initialize the input error
        input_error = np.zeros(self.input_data.shape)
 
        # initialize the input error for padding
        input_error_padded = np.zeros(self.input_data_padded.shape)
 
        for i in range(output_height):
            for j in range(output_width):
                input_matrix = self.input_data_padded[:, i * self.stride: i * self.stride +
                                                      self.kernel_size_h, j * self.stride: j * self.stride + self.kernel_size_w, :]
                input_matrix = input_matrix.reshape(batch_size, -1)
                # print("input matrix: ", input_matrix.shape)
                # print("weights: ", self.weights.shape)
                temp_weight = self.weights.reshape(self.num_of_filters, -1)
                # print("temp weight: ", temp_weight.shape)
                weight_error += np.dot(output_error[:, i, j, :].T,
                                       input_matrix).reshape(self.weights.shape)
                bias_error += np.sum(output_error[:, i, j, :], axis=0)
 
                input_error_padded[:, i * self.stride: i * self.stride + self.kernel_size_h, j * self.stride: j * self.stride + self.kernel_size_w, :] += np.dot(
                    output_error[:, i, j, :], temp_weight).reshape(batch_size, self.kernel_size_h, self.kernel_size_w, channels)
 
        # remove the padding from the input error
        input_error = input_error_padded[:, self.padding: self.padding + height,
                                         self.padding: self.padding + width, :]
 
        # update the weights and biases
        self.weights = self.update_weight(weight_error, lr)
        self.biases = self.update_bias(bias_error, lr)
 
        return input_error
 
    def update_weight(self, weight_error, lr):
        temp_weight = self.weights - lr * weight_error
        return temp_weight
 
    def update_bias(self, bias_error, lr):
        temp_bias = self.biases - lr * bias_error
        return temp_bias

In [1159]:
class ReLULayer:
    def __init__(self):
        self.last_input = None

    def forward(self, input):
        self.last_input = input
        return np.maximum(0, input)

    def backprop(self, d_L_d_out, learning_rate):
        d_L_d_input = d_L_d_out.copy()
        d_L_d_input[self.last_input <= 0] = 0
        return d_L_d_input

In [1160]:
class MaxPoolingLayer:
 
    def __init__(self, pool_size , stride=2):
        pool_size = (pool_size, pool_size)
        self.pool_size = pool_size
 
        self.stride = stride
 
    def forward(self, input_data):
 
        self.input_shape = input_data.shape
 
        self.input_data = input_data
 
        batch_size, height, width, channels = input_data.shape
 
 
 
        output_height = int((height - self.pool_size[0]) / self.stride + 1)
 
        output_width = int((width - self.pool_size[1]) / self.stride + 1)
 
        output_shape = (batch_size, output_height, output_width, channels)
 
        output = np.zeros(output_shape)
 
        for b in range(batch_size):
 
            h_indices = np.arange(
                0, height - self.pool_size[0] + 1, self.stride)
 
            w_indices = np.arange(
                0, width - self.pool_size[1] + 1, self.stride)
 
            for i in h_indices:
 
                for j in w_indices:
 
 
                    current_region = input_data[b, i:i +
                                                self.pool_size[0], j:j+self.pool_size[1], :]
 
                    output[b, int(i/self.stride), int(j/self.stride),
                           :] = np.max(current_region, axis=(0, 1))
 
 
        return output
 
 
 
    def backprop(self, output_error, learning_rate):
 
        batch_size, height, width, channels = output_error.shape
 
    # initialize the gradient of the input data
 
        input_data_gradient = np.zeros(self.input_shape)
 
        stride = self.stride
        pool_size = self.pool_size
 
        for b in range(batch_size):
 
            for m in range(channels):
 
                for i in range(0, height - pool_size[0] + 1, stride):
 
                    for j in range(0, width - pool_size[1] + 1, stride):
 
                        current_region = self.input_data[b, i:i + pool_size[0], j:j+pool_size[1], m]
 
                        max_value = np.max(current_region)
                        max_index = np.argmax(current_region)
 
                        k, l = np.unravel_index(max_index, pool_size)
 
                        input_data_gradient[b, i+k, j+l, m] = output_error[b, int(i/stride), int(j/stride), m]
 
 
        return input_data_gradient

In [1161]:
# Flatten Layer
class FlattenLayer:
    def __init__(self):
        self.last_input_shape = None
    
    def forward(self, input):
        self.last_input_shape = input.shape
        return input.reshape(input.shape[0], -1)
    
    def backprop(self, grad_output, learning_rate):
        return grad_output.reshape(self.last_input_shape)

In [1162]:
# Fully Connected Layer
class FullyConnectedLayer:
    def __init__(self, output_size):
        self.weights = None
        self.bias = None
        self.output_size = output_size
        self.input = None
    
    def forward(self, input):
        # initialize weights and bias
        if self.weights is None:
            self.weights = np.random.randn(input.shape[1], self.output_size)
        if self.bias is None:
            self.bias = np.random.randn(self.output_size)

        self.input = input

        return np.dot(input, self.weights) + self.bias

    def backprop(self, grad_output, learning_rate):
        grad_input = np.dot(grad_output, self.weights.T)
        grad_weights = np.dot(self.input.T, grad_output)
        grad_bias = np.sum(grad_output, axis=0)

        self.weights -= learning_rate * grad_weights
        self.bias -= learning_rate * grad_bias

        return grad_input


In [1163]:
# Softmax Layer
class SoftmaxLayer:
    def __init__(self):
        self.last_input = None
    
    def forward(self, input):
        self.last_input = input
        exp = np.exp(input - np.max(input, axis=1, keepdims=True))
        return exp / np.sum(exp, axis=1, keepdims=True)
    
    def backprop(self, grad_output, learning_rate):
        return grad_output


# Load Data 

In [1164]:
# load images from folders
def load_data_from_folder(image_folder, label_path):
    images = []
    for filename in os.listdir(image_folder):
        img = cv2.imread(os.path.join(image_folder, filename))
        if img is not None:
            

            # convert to grayscale
            img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

            # resize to 28x28x1
            img = cv2.resize(img, (28, 28), interpolation=cv2.INTER_CUBIC)

            # convert to float32
            img = img.astype(np.float32)

            images.append(img)

            if len(images) == 700:
                break

    df = pd.read_csv(label_path)
    labels = df['digit'].values
    labels = labels[:len(images)]


    # rotate images iwth 90, 180, 270 degrees and coreesponding labels
    for i in range(len(images)):
        images.append(np.rot90(images[i], 1))
        labels = np.append(labels, labels[i])
        images.append(np.rot90(images[i], 1))
        labels = np.append(labels, labels[i])
        images.append(np.rot90(images[i], 1))
        labels = np.append(labels, labels[i])

        


    # CONVERT TO NUMPY ARRAY
    images = np.array(images)
    labels = np.array(labels)

    return images, labels

In [1165]:
# load images
train_images, train_labels = load_data_from_folder('training-a', 'training-a.csv')

# reshape images to 28x28x1
train_images = train_images.reshape(train_images.shape[0], 28, 28, 1)


In [1166]:
# print shape of images and labels
print(train_images.shape)
print(train_labels.shape)

#print shape of first image
print(train_images[0].shape)

(2800, 28, 28, 1)
(2800,)
(28, 28, 1)


# Preprocessing Data

In [1167]:
# normalize images with std and mean
train_images = (train_images - np.mean(train_images)) / np.std(train_images)

# Split Data for train and test

In [1168]:
# split data into train and validation
train_percent = 0.8
X_train = train_images[:int(train_percent*len(train_images))]
y_train = train_labels[:int(train_percent*len(train_labels))]
X_val = train_images[int(train_percent*len(train_images)):]
y_val = train_labels[int(train_percent*len(train_labels)):]

# Model Class Definition

In [1169]:
# model class definition
class Model:
    def __init__(self, num_classes):
        self.layers = []
        self.num_classes = num_classes

    def add(self, layer):
        self.layers.append(layer)

    def forward(self, input):
        for layer in self.layers:
            input = layer.forward(input)
        return input

    def one_hot_encode(self, labels):
        one_hot = np.zeros((len(labels), self.num_classes))
        for i, label in enumerate(labels):
            one_hot[i][label] = 1
        return one_hot
    
    def cross_entropy(self, y_true, y_pred):
        eps = 1e-9
        return -np.sum(y_true * np.log(y_pred + eps))

    def predict(self, X):
        y_pred = self.forward(X)
        return np.argmax(y_pred, axis=1)

    def evaluate(self, X, y):
        y_pred = self.predict(X)
        #print (y_pred), (y)
        print("y_pred: ", y_pred)
        print("y: ", y)
        accuracy = np.mean(y_pred == y)
        return accuracy 

    def train(self, X_train, y_train, X_val, y_val, learning_rate, epochs, batch_size):
        for epoch in range(epochs):
            print("epoch: ", epoch)

            # split data into batches
            batches = []
            for i in range(0, len(X_train), batch_size):
                batches.append((X_train[i:i+batch_size], y_train[i:i+batch_size]))

            # train model
            for i in range(len(batches)):
                X_batch, y_batch = batches[i]
                print("\tbatch: ", i)
                y_batch_one_hot = self.one_hot_encode(y_batch)
                y_pred = self.forward(X_batch)
                loss = self.cross_entropy(y_batch_one_hot, y_pred)
                print("\t\tloss: ", loss)
                grad = y_pred - y_batch_one_hot
                for layer in reversed(self.layers):
                    grad = layer.backprop(grad, learning_rate)
            
            # evaluate model
            accuracy = self.evaluate(X_val, y_val)
            print("accuracy: ", accuracy)

# Model Building

In [1170]:


# use lenet-5 model
model = Model(10)
model.add(ConvolutionLayer(6, 5, 1, 1))
model.add(ReLULayer())
model.add(MaxPoolingLayer(pool_size=2, stride=2))
model.add(ConvolutionLayer(16, 5, 1, 1))
model.add(ReLULayer())
model.add(MaxPoolingLayer(pool_size=2, stride=2))
model.add(FlattenLayer())
model.add(FullyConnectedLayer(output_size=120))
model.add(ReLULayer())
model.add(FullyConnectedLayer(output_size=84))
model.add(ReLULayer())
model.add(FullyConnectedLayer(output_size=10))
model.add(SoftmaxLayer())




# model = Model(10)
# model.add(ConvolutionLayer(6, 6, 1, 1))
# model.add(ReLULayer())
# model.add(MaxPoolingLayer(2, 2))
# model.add(ConvolutionLayer(16, 10, 1, 1))
# model.add(ReLULayer())
# model.add(MaxPoolingLayer(2, 2))
# model.add(FlattenLayer())
# model.add(FullyConnectedLayer(90))
# model.add(ReLULayer())
# model.add(FullyConnectedLayer(74))
# model.add(ReLULayer())
# model.add(FullyConnectedLayer(10))
# model.add(ReLULayer())
# model.add(SoftmaxLayer())


# Train Model

In [1171]:
# train
model.train(X_train=X_train, y_train=y_train, X_val=X_val, y_val=y_val, learning_rate=0.0000008, epochs=10, batch_size=128)

epoch:  0
	batch:  0
		loss:  2321.0057737219977
	batch:  1
		loss:  2341.7290395599443
	batch:  2
		loss:  2397.2724347826897
	batch:  3
		loss:  2383.175571235837
	batch:  4
		loss:  2286.148265269617
	batch:  5
		loss:  2265.869516824513
	batch:  6
		loss:  2196.6661786943196
	batch:  7
		loss:  2218.57000851474
	batch:  8
		loss:  2351.921880914376
	batch:  9
		loss:  2403.8988370737834
	batch:  10
		loss:  2528.253642556271
	batch:  11
		loss:  2548.884480081096
	batch:  12
		loss:  2279.5592420473404
	batch:  13
		loss:  2466.0686106298044
	batch:  14
		loss:  2403.898837073092
	batch:  15
		loss:  2341.729039551882
	batch:  16
		loss:  2155.219680617822
	batch:  17
		loss:  1077.6098283181811
y_pred:  [2 2 8 8 8 8 8 8 2 2 2 2 2 2 4 4 4 3 3 3 1 1 1 1 1 1 2 2 2 2 2 2 6 6 6 7 7
 7 2 2 2 6 6 6 1 1 1 2 2 2 4 4 4 1 1 1 2 2 2 4 4 4 1 1 1 4 4 4 2 2 2 1 1 1
 7 7 7 1 1 1 1 1 1 1 1 1 4 4 4 4 4 4 4 4 4 2 2 2 2 2 2 4 4 4 2 2 2 2 2 2 2
 2 2 5 5 5 5 5 5 2 2 2 0 0 0 2 2 2 2 2 2 2 2 2 2 2 2 8 8 