<p align="center">
  <img src="VGG-11.jpg"
</p>

In [1]:
import numpy as np
from numba import cuda
import time
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.utils import to_categorical
import os
import pickle

In [None]:
# @title Build VGG-16 model
class Layer():
    def forward(self, inputs):
        pass

    def backward(self, output_gradient, learning_rate):
        pass

    def get_out_shape(self):
        pass

    def init_weight(self):
        pass

class VGG11:
    def __init__(self, layers: list[Layer] = []):
        pre_layer = layers[0]
        pre_layer.init_weight()
        for layer in layers[1:]:
            layer.input_shape = pre_layer.get_out_shape()
            layer.init_weight()
            pre_layer = layer
        self.layers: list[Layer] = layers

    def forward(self, X):
        output = X
        for layer in self.layers:
            output = layer.forward(output)
        return output

    def backward(self, out_grad, learning_rate):
        for layer in reversed(self.layers):
            out_grad = layer.backward(out_grad, learning_rate)
        return out_grad

    def fit(self, X_train, Y_train, epochs=1, batch_size=32, learning_rate=0.001):
        num_batch = (len(X_train)-1)//batch_size+1
        for i_epoch in range(epochs):
            print(f"\nEpoch {i_epoch+1}/{epochs}:")
            train_loss = 0
            acc = 0
            progress = '.'*30
            for i in range(num_batch-1):

                batch_start = i * batch_size
                batch_end = (i + 1) * batch_size
                batch_X = X_train[batch_start: batch_end]
                batch_Y = Y_train[batch_start: batch_end]
                predictions = self.forward(batch_X)
                out_grad = 2.0 * (predictions - batch_Y)
                self.backward(out_grad, learning_rate)

                # print result
                acc_batch = np.mean(
                    np.argmax(predictions, axis=1) == np.argmax(batch_Y, axis=1))
                acc += acc_batch
                loss = np.sum((predictions - batch_Y) ** 2)
                train_loss += loss
                i_str = int(i/num_batch*30)
                progress = progress[:i_str] + ">" + progress[i_str+1:]
                print(
                    f"\r {i}/{num_batch} [{progress}] accuaray: {acc_batch:.5f}, train loss = {loss/len(batch_Y):.5f}", end='')
                progress = progress[:i_str] + "=" + progress[i_str+1:]

            train_loss /= len(X_train)

            print(
                f"\r {num_batch}/{num_batch} [{progress}] accuaray: {acc/num_batch:.5f}, train loss = {train_loss:.5f}", end='')

    def predict(self, X):
        return self.forward(X)

    def use_device(self, value):
        for layer in self.layers:
            output = layer.use_device = value

class Flatten(Layer):
    def __init__(self, input_shape=(28, 28, 1)):
        self.input_shape = input_shape
        pass

    def get_out_shape(self):
        t = 1
        for i in self.input_shape:
            t *= i
        return t

    def forward(self, inputs):
        self.inputs = inputs
        assert self.input_shape == inputs.shape[1:], "Input shape incorrect"
        return inputs.reshape(inputs.shape[0], -1)

    def backward(self, output_gradient, learning_rate):
        shape = self.inputs.shape
        return output_gradient.reshape(shape)

    def init_weight(self):
        pass

In [None]:
class Convolution(Layer):
    def __init__(self, n_filters=32, filter_size=3, stride=1, activation=None, input_shape=(1, 32, 32), padding='valid'):
        self.input_shape = input_shape
        self.n_filters = n_filters
        self.filter_size = filter_size
        self.stride = stride
        self.activation = activation
        self.padding = padding
        self.bias = np.zeros((n_filters, 1))
        self.init_weight()

    def get_out_shape(self):
        if self.padding == 'valid':
            output_width = (self.input_shape[2] - self.filter_size) // self.stride + 1
            output_height = (self.input_shape[1] - self.filter_size) // self.stride + 1
        elif self.padding == 'same':
            output_width = self.input_shape[2] // self.stride
            output_height = self.input_shape[1] // self.stride
        return (self.n_filters, output_height, output_width)
    
    def init_weight(self):
            np.random.seed(10)
            self.weights = np.random.randn(self.n_filters, self.input_shape[0],self.filter_size, self.filter_size)/(self.filter_size**2)
            
    def forward(self, inputs):
        self.inputs = inputs
        n_batchs, n_channels, in_height, in_width = inputs.shape
        print(inputs.shape)
        assert self.input_shape == inputs.shape[1:], "Input shape incorrect"

        output_height, output_width = self.get_out_shape()[1:]
        outputs = np.zeros((n_batchs, self.n_filters, output_height, output_width))

        # Pad inputs if needed
        if self.padding == 'same':
            pad_height = ((output_height - 1) * self.stride + self.filter_size - in_height) // 2
            pad_width = ((output_width - 1) * self.stride + self.filter_size - in_width) // 2
            inputs_padded = np.pad(inputs, ((0, 0), (0, 0), (pad_height, pad_height), (pad_width, pad_width)), mode='constant')
        else:
            inputs_padded = inputs

        # Convolution operation
        for row in range(output_height):
            for col in range(output_width):
                for f_idx in range(self.n_filters):
                    row_start = row * self.stride
                    row_end = row_start + self.filter_size
                    col_start = col * self.stride
                    col_end = col_start + self.filter_size
                    outputs[:, f_idx, row, col] = np.sum(self.weights[f_idx] * inputs_padded[:, :, row_start:row_end, col_start:col_end], axis=(1, 2, 3))

        if self.activation == "relu":
            outputs = np.maximum(0, outputs)
        print(outputs.shape)
        return outputs

    def backward(self, output_gradient, learning_rate):
        n_batchs, input_channels, input_height, input_width = self.inputs.shape
        _, n_filters, output_height, output_width = output_gradient.shape
        filter_gradient = np.zeros(self.weights.shape)
        
        # Pad input_gradient if needed
        if self.padding == 'same':
            input_gradient = np.zeros(self.inputs.shape)
            pad_height = ((output_height - 1) * self.stride + self.filter_size - input_height) // 2
            pad_width = ((output_width - 1) * self.stride + self.filter_size - input_width) // 2
            input_padded = np.pad(self.inputs, ((0, 0), (0, 0), (pad_height, pad_height), (pad_width, pad_width)), mode='constant')
            input_gradient_padded = np.pad(input_gradient, ((0, 0), (0, 0), (pad_height, pad_height), (pad_width, pad_width)), mode='constant')
        else:
            input_gradient_padded = np.zeros(self.inputs.shape)
            input_padded = self.inputs
        
            pad_height = ((output_height - 1) * self.stride + self.filter_size - input_height) // 2
        
        print(input_height, input_width, output_height, output_width)
        # print(f'filter_gradient, input_gradient_padded, output_gradient, input_padded: {filter_gradient.shape, input_gradient_padded.shape, output_gradient.shape, input_padded.shape}')

        # Backpropagation
        for row in range(output_height):
            for col in range(output_width):
                for filterIdx in range(n_filters):
                    row_start = row * self.stride
                    row_end = row_start + self.filter_size
                    col_start = col * self.stride
                    col_end = col_start + self.filter_size

                    # print(f'row_start, row_end, col_start, col_end: {row_start, row_end, col_start, col_end}')
                    out_grad_val = output_gradient[:, filterIdx, row, col, np.newaxis, np.newaxis, np.newaxis]
                    filter_gradient[filterIdx] += np.sum(input_padded[:, :, row_start:row_end, col_start:col_end] * out_grad_val, axis=0)
                    input_gradient_padded[:, :, row_start:row_end, col_start:col_end] += self.weights[filterIdx] * out_grad_val
                    # print(f'out_grad_val, filter_gradient, input_gradient_padded: {out_grad_val.shape, filter_gradient.shape, input_gradient_padded.shape}')

        # Determine the indices to slice the array to remove padding
        top = pad_height
        bottom = output_height + pad_height
        left = pad_width
        right = output_width + pad_width
        # Slice the padded array to remove padding
        input_gradient_padded = input_gradient_padded[:, :, top:bottom, left:right]

        # print(input_gradient_padded.shape)
        if self.activation == "relu":
            input_gradient_padded[self.inputs <= 0] = 0
        # print(input_gradient_padded.shape)
        self.weights -= learning_rate * filter_gradient / n_batchs
        return input_gradient_padded


In [None]:
input_shape=(3,32,32)
inputs = np.random.randint(0,255,(32,*input_shape))/255
conv = Convolution(n_filters = 32,
                   filter_size = 3,
                   stride = 1,
                   padding = 'same',
                   input_shape = input_shape)
%time out_host = conv.forward(inputs)

In [None]:
%time in_grad_host=conv.backward(out_host, 0.0001)

In [None]:
%time in_grad_host=conv.backward(out_host, 0.0001)

In [None]:
# @title Maxpooling Layer

class MaxPool2D(Layer):
    def __init__(self, pool_size=2, stride=2, input_shape=(1,28, 28)):
        self.pool_size = pool_size
        self.stride = stride
        self.use_device = False
        self.inputs = None
        self.inputs_device = None
        self.input_shape = input_shape

    def get_out_shape(self):
        output_height = ( self.input_shape[1] - self.pool_size) // self.stride + 1
        output_width = (self.input_shape[2] -  self.pool_size) // self.stride + 1
        return (self.input_shape[0],output_height, output_width)

    def forward(self, inputs):
        # Save input
        batch_size,num_channels, input_height, input_width = inputs.shape
        assert self.input_shape == inputs.shape[1:], "Input shape incorrect"
        self.inputs = inputs
        (_,output_height, output_width) = self.get_out_shape()

        outputs = np.zeros( (batch_size,num_channels, output_height, output_width))
        for h in range(output_height):
            for w in range(output_width):
                h_start = h * self.stride
                h_end = h_start + self.pool_size
                w_start = w * self.stride
                w_end = w_start + self.pool_size
                outputs[:, :,h, w] = np.max( inputs[:, :, h_start:h_end, w_start:w_end], axis=(2, 3))

        return outputs


    def backward(self, output_gradient, learning_rate):
        batch_size,num_channels, output_height, output_width = output_gradient.shape
        input_gradient = np.zeros(self.inputs.shape)
        for h in range(output_height):
            for w in range(output_width):
                h_start = h * self.stride
                h_end = h_start + self.pool_size
                w_start = w * self.stride
                w_end = w_start + self.pool_size
                input_slice = self.inputs[:, :, h_start:h_end, w_start:w_end]
                max_vals = np.max(
                    input_slice, axis=(2, 3), keepdims=True)
                max_mask = (input_slice == max_vals)
                input_gradient[:,:, h_start:h_end, w_start:w_end] += max_mask * output_gradient[:,:,  h, w,  np.newaxis, np.newaxis]
        return input_gradient
    def init_weight(self):
        pass



In [None]:
input_shape=(32,200,200)
inputs = np.random.randint(0,255,(64,*input_shape))/255
maxp = MaxPool2D(2,2,input_shape=input_shape)
%time out_host=maxp.forward(inputs)

In [None]:
%time in_grad_host=maxp.backward(out_host,0.0001)

In [None]:
# @title Dense Layer
class Dense(Layer):
    def __init__(self, num_outputs, activation=None, input_shape=100):
        self.num_outputs = num_outputs
        self.biases = np.zeros((1, num_outputs))
        self.activation = activation
        self.use_device = False
        self.inputs = None
        self.input_shape = input_shape
        self.init_weight()

    def init_weight(self):
        self.weights = np.random.randn(
            self.input_shape, self.num_outputs) / self.num_outputs

    def get_out_shape(self):
        return self.num_outputs

    def forward(self, inputs):
        self.inputs = inputs
        assert self.input_shape == inputs.shape[-1], "Input shape incorrect"
        outputs = np.dot(inputs, self.weights) + self.biases
        if self.activation == "softmax":
            outputs = self.softmax(outputs)
        if(self.activation == "relu"):
            outputs = np.maximum(0, outputs)
        return outputs

    def softmax(self, x):
        e_x = np.exp(x-np.max(x, axis=1, keepdims=True))
        return e_x/e_x.sum(axis=1, keepdims=True)

    def backward(self, output_gradient, learning_rate):
        input_grad = np.dot(output_gradient, self.weights.T)
        weights_gradient = np.dot(self.inputs.T, output_gradient)
        biases_gradient = np.sum(output_gradient, axis=0, keepdims=True)
        self.weights -= learning_rate * weights_gradient
        self.biases -= learning_rate * biases_gradient
        return input_grad


class Flatten(Layer):
    def __init__(self, input_shape=(28, 28, 1)):
        self.input_shape = input_shape
        pass

    def get_out_shape(self):
        t = 1
        for i in self.input_shape:
            t *= i
        return t

    def forward(self, inputs):
        self.inputs = inputs
        assert self.input_shape == inputs.shape[1:], "Input shape incorrect"
        return inputs.reshape(inputs.shape[0], -1)

    def backward(self, output_gradient, learning_rate):
        shape = self.inputs.shape
        return output_gradient.reshape(shape)

    def init_weight(self):
        pass

In [None]:
inputs=np.random.randint(1,255, (256,10000))/255
dense=Dense(1024, input_shape= 10000)
%time out_host=dense.forward(inputs)

In [None]:
model_1 = VGG11([
    Convolution(n_filters=64, filter_size=3, stride=1,activation='relu', input_shape=(3,32,32), padding = 'same'),
    MaxPool2D(pool_size=2),
    Convolution(n_filters=128, filter_size=3, stride=1,activation='relu', padding = 'same'),
    MaxPool2D(pool_size=2),
    Convolution(n_filters=256, filter_size=3, stride=1,activation='relu', padding = 'same'),
    Convolution(n_filters=256, filter_size=3, stride=1,activation='relu', padding = 'same'),
    MaxPool2D(pool_size=2),
    Convolution(n_filters=512, filter_size=3, stride=1,activation='relu', padding = 'same'),
    Convolution(n_filters=512, filter_size=3, stride=1,activation='relu', padding = 'same'),
    MaxPool2D(pool_size=2),
    Convolution(n_filters=512, filter_size=3, stride=1,activation='relu', padding = 'same'),
    Convolution(n_filters=512, filter_size=3, stride=1,activation='relu', padding = 'same'),
    MaxPool2D(pool_size=2),
    Flatten(),
    Dense(4096, activation='relu'),
    Dense(4096, activation='relu'),
    Dense(10, activation='softmax'),
])

In [2]:
def single_batch_cifar10(file):
    with open(file, 'rb') as f_single_batch:
        d_single_batch = pickle.load(f_single_batch, encoding='latin1')  
        x = d_single_batch['data']  
        y = d_single_batch['labels']  
        x = x.reshape(10000, 3, 32, 32).transpose(0, 2, 3, 1).astype('float')  # (10000, 32, 32, 3)
        y = np.array(y)
        return x, y

def whole_cifar10():
    x_collect = []
    y_collect = []
    x, y = [], []
    for k in range(1, 6):
        filename = os.path.join('./dataset/cifar-10-batches-py', 'data_batch_' + str(k))
        x, y = single_batch_cifar10(filename)
        x_collect.append(x)
        y_collect.append(y)
    x_train = np.concatenate(x_collect)  # (50000, 32, 32, 3)
    y_train = np.concatenate(y_collect)  # (50000,)
    del x, y
    filename = os.path.join('./dataset/cifar-10-batches-py', 'test_batch')
    x_test, y_test = single_batch_cifar10(filename)
    return x_train, y_train, x_test, y_test

def pre_process_cifar10():
    x_train, y_train, x_test, y_test = whole_cifar10()
    x_train /= 255.0
    x_test /= 255.0

    batch_mask = list(range(49000, 50000))
    x_validation = x_train[batch_mask]  # (1000, 32, 32, 3)
    y_validation = y_train[batch_mask]  # (1000,)
    batch_mask = list(range(49000))
    x_train = x_train[batch_mask]  # (49000, 32, 32, 3)
    y_train = y_train[batch_mask]  # (49000,)
    batch_mask = list(range(1000))
    x_test = x_test[batch_mask]  # (1000, 32, 32, 3)
    y_test = y_test[batch_mask]  # (1000,)
    mean_image = np.mean(x_train, axis=0)  # numpy.ndarray (32, 32, 3)
    std = np.std(x_train, axis=0)  # numpy.ndarray (32, 32, 3)
    dictionary = {'mean_image': mean_image, 'std': std}
    with open('./dataset/mean_and_std.pickle', 'wb') as f_mean_std:
        pickle.dump(dictionary, f_mean_std)
    x_train -= mean_image
    x_validation -= mean_image
    x_test -= mean_image
    x_train /= std
    x_validation /= std
    x_test /= std
    x_train = x_train.transpose(0, 3, 1, 2)  # (49000, 3, 32, 32)
    x_test = x_test.transpose(0, 3, 1, 2)  # (1000, 3, 32, 32)
    x_validation = x_validation.transpose(0, 3, 1, 2)  # (1000, 3, 32, 32)


    train_y = np.zeros((len(y_train),10))
    test_y = np.zeros((len(y_test),10))

    for i in range (len(y_train)):
        train_y[i,y_train[i]]=1
    for i in range (len(y_test)):
        test_y[i,y_test[i]]=1
    
    # Returning result as dictionary
    d_processed = {'x_train': x_train, 'y_train': train_y,
                   'x_validation': x_validation, 'y_validation': y_validation,
                   'x_test': x_test, 'y_test': test_y}

    # Returning dictionary
    return d_processed


# Preprocessing data
data = pre_process_cifar10()
for i, j in data.items():
    print(i + ':', j.shape)

# Saving loaded and preprocessed data into 'pickle' file
with open('./dataset/data.pickle', 'wb') as f:
    pickle.dump(data, f)

x_train: (49000, 3, 32, 32)
y_train: (49000, 10)
x_validation: (1000, 3, 32, 32)
y_validation: (1000,)
x_test: (1000, 3, 32, 32)
y_test: (1000, 10)


In [None]:
%%time
model_1.fit(data['x_train'], data['y_train'], epochs=1, batch_size=64)