<a href="https://colab.research.google.com/github/podo47/DL_HW2_Handcraft_LeNet5-Computational_Graph/blob/main/DL_HW2_Handcraft_Lenet5.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# LeNet-5

## Mount to drive

In [1]:
from google.colab import drive
drive.mount('/content/drive') 

Mounted at /content/drive


## Part 1 : Data preprocessing

### Import data

In [None]:
train = pd.read_csv('/content/drive/MyDrive/images/train.txt', sep=" ",header=None)
train_dir = np.array(train[0])
train_y = np.array(train[1])

valid = pd.read_csv('/content/drive/MyDrive/images/val.txt', sep=" ",header=None)
valid_dir = np.array(valid[0])
valid_y = np.array(valid[1])

test = pd.read_csv('/content/drive/MyDrive/images/test.txt', sep=" ",header=None)
test_dir = np.array(test[0])
test_y = np.array(test[1])

### Read images to array

In [None]:
def read_image(imgname):
    img = cv2.imread('/content/drive/MyDrive/'+imgname)
    img = cv2.resize(img, (28, 28))
    return img

def read_images_to_array(data_dir):
    pool = ThreadPool(processes=2) # 指定使用 2 個進程
    X = pool.map(read_image, data_dir)
    pool.close()
    pool.join()
    X = np.array(X)
    return X

#### Save data (can skip)

In [None]:
# To save time, you can just skip this step, as the output has already been stored

X_train = read_images_to_array(train_dir)
X_valid = read_images_to_array(valid_dir)
X_test = read_images_to_array(test_dir)


In [None]:
# save them to a file
np.savez("/content/drive/MyDrive/images/rgb_dataset.npz", traindata=X_train, validdata=X_valid, testdata=X_test)

#### Load data

In [None]:
# To save time, you can just skip this step, as the output has already been stored

with np.load("/content/drive/MyDrive/images/rgb_dataset.npz") as data:
    X_train = data["traindata"]
    X_valid = data["validdata"]
    X_test = data["testdata"]


In [None]:
print('X_train shape : ', X_train.shape)
print('X_valid shape : ',X_valid.shape)
print('X_test shape : ',X_test.shape)

X_train shape :  (63325, 28, 28, 3)
X_valid shape :  (450, 28, 28, 3)
X_test shape :  (450, 28, 28, 3)


### Mini-batch

In [None]:
# generate random-shuffled mini-batches
def random_mini_batches(image, label, mini_batch_size = 256):
    dataset_size = image.shape[0] # number of training examples
    mini_batches = []
    # shuffle (image, label)
    permutation = list(np.random.permutation(dataset_size))
    shuffled_image = image[permutation, :, :, :]
    shuffled_label = label[permutation]
    # partition (shuffled_image, shuffled_label). Minus the end case.
    complete_minibatches_number = math.floor(dataset_size / mini_batch_size) # number of mini batches of size mini_batch_size in your partitionning
    for k in range(0, complete_minibatches_number):
        mini_batch_image = shuffled_image[k * mini_batch_size: k * mini_batch_size + mini_batch_size, :, :, :]
        mini_batch_label = shuffled_label[k * mini_batch_size: k * mini_batch_size + mini_batch_size]
        mini_batch = (mini_batch_image, mini_batch_label)
        mini_batches.append(mini_batch)
    # handle the end case (last mini-batch < mini_batch_size)
    if dataset_size % mini_batch_size != 0:
        mini_batch_image = shuffled_image[complete_minibatches_number * mini_batch_size: dataset_size, :, :, :]
        mini_batch_label = shuffled_label[complete_minibatches_number * mini_batch_size: dataset_size]
        mini_batch = (mini_batch_image, mini_batch_label)
        mini_batches.append(mini_batch)
    return mini_batches


### Data preprocessing

#### Zero pad

In [None]:
# padding for the matrix of images
def zero_pad(X, pad):
    X_pad = np.pad(X, ((0, ), (pad, ), (pad, ), (0, )), "constant", constant_values = (0, 0))
    return X_pad

#### Normalization

In [None]:
# normalise the dataset
def normalise(image):
    image -= image.min()
    image = image / image.max()
    image = (image - np.mean(image)) / np.std(image)
    return image

#### Load dataset

## Part 2 : CNN Layer

#### Initialisation 

In [None]:
# Initialisation of the weights & bias
def initialise(kernel_shape, sigma = 0.01, bias_factor = 0.001):
    bias_shape = (1, 1, 1, kernel_shape[-1]) if len(kernel_shape) == 4 else (kernel_shape[-1], )
    weight = np.random.normal(0, sigma, kernel_shape)
    bias = np.ones(bias_shape) * bias_factor
    return weight, bias

#### Softmax activation function

In [None]:
# Softmax activation function for the output layer
def softmax(X):
    X_softmax = np.exp(X) / np.array([np.sum(np.exp(X), axis = 1)]).T
    return X_softmax

#### Convolution Layer

In [None]:
class Conv_Layer:
    def __init__(self, kernel_shape, stride = 1, pad = 0, sigma = 0.01, bias_factor = 0.001):
        self.weight, self.bias = initialise(kernel_shape, sigma, bias_factor)
        self.stride = stride
        self.pad = pad
    
    def forward_propagation(self, input_map):
        self.input_map = input_map
        batch_size, height_input, width_input, _ = input_map.shape
        f, _, _, channel_output = self.weight.shape
        height_output = int((height_input + 2 * self.pad - f) / self.stride + 1)
        width_output = int((width_input + 2 * self.pad - f) / self.stride + 1)
        output_map = np.zeros((batch_size, height_output, width_output, channel_output))
        input_map_pad = zero_pad(input_map, self.pad)
        for height in range(height_output):
            for width in range(width_output):
                vertical_start, vertical_end = height * self.stride, height * self.stride + f
                horizontal_start, horizontal_end = width * self.stride, width * self.stride + f
                input_map_slice = input_map_pad[:, vertical_start: vertical_end, horizontal_start: horizontal_end, :]
                output_map[:, height, width, :] = np.tensordot(input_map_slice, self.weight, axes = ([1, 2, 3], [0, 1, 2])) + self.bias
        return output_map
    
    def back_propagation(self, d_output_map, learning_rate):
        f, _, _, channel_output = self.weight.shape
        _, height_output, width_output, channel_output = d_output_map.shape
        d_input_map = np.zeros(self.input_map.shape)
        d_weight = np.zeros(self.weight.shape)
        d_bias = np.zeros((1, 1, 1, channel_output))
        if self.pad != 0:
            input_map_pad = zero_pad(self.input_map, self.pad)
            d_input_map_pad = zero_pad(d_input_map, self.pad)
        else:
            input_map_pad = self.input_map
            d_input_map_pad = d_input_map
        for height in range(height_output):
            for width in range(width_output):
                vertical_start, vertical_end = height * self.stride, height * self.stride + f
                horizontal_start, horizontal_end = width * self.stride, width * self.stride + f
                input_map_slice = input_map_pad[:, vertical_start: vertical_end, horizontal_start: horizontal_end, :]
                d_input_map_pad[:, vertical_start: vertical_end, horizontal_start: horizontal_end, :] += np.transpose(np.dot(self.weight, d_output_map[:, height, width, :].T), (3, 0, 1, 2))
                d_weight += np.dot(np.transpose(input_map_slice, (1, 2, 3, 0)), d_output_map[:, height, width, :])
                d_bias += np.sum(d_output_map[:, height, width, :], axis = 0)
        d_input_map = d_input_map_pad if self.pad == 0 else d_input_map_pad[:, self.pad: -self.pad, self.pad: -self.pad, :]
        self.weight -= learning_rate * d_weight
        self.bias -= learning_rate * d_bias
        self.input_map = None
        return d_input_map

#### Sigmoid Activation Layer

In [None]:
class Sigmoid_Layer:
    def forward_propagation(self, input_map):
        self.output_map = 1 / (1 + np.exp(-input_map))
        return self.output_map

    
    def back_propagation(self, d_output_map):
        d_input_map = np.multiply(d_output_map, np.multiply(self.output_map, 1 - self.output_map))
        self.output_map = None
        return d_input_map

#### Max-Pooling Layer

In [None]:
class MaxPool_Layer:
    def __init__(self, stride = 2, f = 2):
        self.stride = stride
        self.f = f

    def forward_propagation(self, input_map):
        self.input_map = input_map
        batch_size, height_input, width_input, channel = input_map.shape
        height_output = int(1 + (height_input - self.f) / self.stride)
        width_output = int(1 + (width_input - self.f) / self.stride)
        output_map = np.zeros((batch_size, height_output, width_output, channel))
        for height in range(height_output):
            for width in range(width_output):
                vertical_start, vertical_end = height * self.stride, height * self.stride + self.f
                horizontal_start, horizontal_end = width * self.stride, width * self.stride + self.f
                input_map_slice = input_map[:, vertical_start: vertical_end, horizontal_start: horizontal_end, :]
                output_map[:, height, width, :] = np.max(input_map_slice, axis = (1, 2))
        return output_map

    def back_propagation(self, d_output_map):
        _, height_output, width_output, _ = d_output_map.shape
        d_input_map = np.zeros(self.input_map.shape)
        for height in range(height_output):
            for width in range(width_output):
                vertical_start, vertical_end = height * self.stride, height * self.stride + self.f
                horizontal_start, horizontal_end = width * self.stride, width * self.stride + self.f
                input_map_slice = self.input_map[:, vertical_start: vertical_end, horizontal_start: horizontal_end, :]
                input_map_slice = np.transpose(input_map_slice, (1, 2, 3, 0))
                mask = input_map_slice == input_map_slice.max((0, 1))
                mask = np.transpose(mask, (3, 2, 0, 1))
                d_input_map[:, vertical_start: vertical_end, horizontal_start: horizontal_end, :] += np.transpose(np.multiply(d_output_map[:, height, width, :][:, :, np.newaxis, np.newaxis], mask), (0, 2, 3, 1))
        self.input_map = None
        return d_input_map

#### Fully Connected Layer

In [None]:
class FC_Layer:
    def __init__(self, weight_shape, sigma = 0.1, bias_factor = 0.01):
        self.weight, self.bias = initialise(weight_shape, sigma, bias_factor)

    def forward_propagation(self, input_array):
        self.input_array = input_array
        return np.matmul(input_array, self.weight) + self.bias

    def back_propagation(self, d_output_array, learning_rate):
        d_input_array = np.matmul(d_output_array, self.weight.T)
        d_weight = np.matmul(self.input_array.T, d_output_array)
        d_bias = np.sum(d_output_array.T, axis = 1)
        self.weight -= learning_rate * d_weight
        self.bias -= learning_rate * d_bias
        self.input_array = None
        return d_input_array

#### Fully Connected Output Layer

In [None]:
class FC_Output_Layer:
    def __init__(self, weight_shape, sigma = 0.1, bias_factor = 0.01):
        self.weight, self.bias = initialise(weight_shape, sigma, bias_factor)
    
    def forward_propagation(self, input_array, labels, mode):
        self.input_array = input_array
        self.labels = labels
        self.output_array = np.matmul(input_array, self.weight) + self.bias
        output = softmax(self.output_array)
        predictions = np.argmax(output, axis = 1)
        if mode == "train":
            cost_value = -np.log(output[range(output.shape[0]), labels])
            return np.sum(cost_value)
        elif mode == "test":
            acc = np.sum(labels == predictions)
            return acc, predictions
    
    def back_propagation(self, learning_rate):
        d_output_array = softmax(self.output_array)
        d_output_array[range(d_output_array.shape[0]), self.labels] -= 1
        d_output_array = d_output_array / d_output_array.shape[0]
        d_input_array = np.matmul(d_output_array, self.weight.T)
        d_weight = np.matmul(self.input_array.T, d_output_array)
        d_bias = np.sum(d_output_array.T, axis = 1)
        self.weight -= learning_rate * d_weight
        self.bias -= learning_rate * d_bias
        self.input_array, self.labels, self.output_array = None, None, None
        return d_input_array

In [None]:
def load_dataset(X_dataset, label):
    # data preprocessing
    image_normalised_pad = normalise(zero_pad(X_dataset, 2))
    return (image_normalised_pad, label)