In [1]:
import os
import random

import cv2
import numpy as np
import numpy.typing as npt

In [2]:
class Utils:
    @staticmethod
    def load_dataset(dataset_path: str) -> tuple[npt.NDArray, npt.NDArray, dict]:
        folder_list = sorted(os.listdir(dataset_path))
        folder_path = []
        class_label = np.array([], dtype=np.int16)
        class_dictionary = {}
        for i, folder_name in enumerate(folder_list):
            class_folder_path = os.path.join(dataset_path, folder_name)
            list_image_name = sorted(os.listdir(class_folder_path))
            temp_folder_path = [
                os.path.join(class_folder_path, image_name)
                for image_name in list_image_name
            ]

            folder_path += temp_folder_path
            temp_class_label = np.full(len(list_image_name), i, dtype=np.int16)
            class_label = np.concatenate((class_label, temp_class_label), axis=0)
            class_dictionary[str(i)] = folder_name

        return np.asarray(folder_path), class_label, class_dictionary

    @staticmethod
    def convert_image_to_matrix(folder_path: str) -> npt.NDArray:
        list_of_image_matrix = []
        size = (256, 256)

        for file_img in folder_path:
            image = cv2.imread(file_img, 1)
            image_matrix = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            image_matrix = cv2.resize(image_matrix, size)
            list_of_image_matrix.append(image_matrix)

        # The returned NDArray is transposed from (batch_size, height, width,
        # channels) into (batch_size, channels, height, width) to ease further
        # operations
        return np.array(list_of_image_matrix).transpose(0, 3, 1, 2)


# For testing purposes
# if __name__ == "__main__":
#     folder_path, class_label, class_dictionary = Utils.load_dataset("./dataset")
#     print(Utils.convert_image_to_matrix(folder_path))

In [3]:
class Model:
    def __init__(self) -> None:
        self._layers = []

    class Layer:
        def __init__(self, name):
            self.name = name

        def feed_forward(self):
            print(f"Performing feed forward on {self.name} layer...")
            print()

    class ConvolutionLayer(Layer):
        def __init__(
            self,
            filter_size: list[tuple[int]],
            padding_size: int,
            stride_size: list[int],
        ) -> None:
            super().__init__("convolution")
            self._filter_weight = np.array(
                [
                    [
                        [random.random() for _ in range(filter_size[i][1])]
                        for _ in range(filter_size[i][0])
                    ]
                    for i in range(len(filter_size))
                ]
            )
            self._padding_size = padding_size
            self._stride_size = stride_size

        @staticmethod
        def _pad_weights(
            weights: npt.NDArray[npt.NDArray[npt.NDArray[float]]],
            padding_size: int,
        ) -> npt.NDArray[npt.NDArray[npt.NDArray[float]]]:
            padded_weights = []

            for i in range(len(weights)):
                weight_height = len(weights[i])
                weight_width = len(weights[i][0])
                padded_height = weight_height + 2 * padding_size
                padded_width = weight_width + 2 * padding_size

                new_weight = [
                    [
                        weights[i][j - padding_size][k - padding_size]
                        if padding_size <= j < weight_height + padding_size
                        or padding_size <= k < weight_width + padding_size
                        else 0.0
                        for k in range(padded_width)
                    ]
                    for j in range(padded_height)
                ]

                padded_weights.append(new_weight)

            return np.array(padded_weights)

        def convolute(
            self,
            weights: npt.NDArray[npt.NDArray[npt.NDArray[float]]],
        ) -> npt.NDArray[npt.NDArray[npt.NDArray[float]]]:
            feature_maps = []
            print("Randomly initialised filter weight: ")
            print(self._filter_weight)
            print()
            weights = self._pad_weights(weights, self._padding_size)
            for i in range(len(weights)):
                feature_map = []
                for j in range(
                    0,
                    len(weights[i]) - len(self._filter_weight[i]) + 1,
                    self._stride_size[i],
                ):
                    feature_row = []
                    for k in range(
                        0,
                        len(weights[i][j]) - len(self._filter_weight[i][0]) + 1,
                        self._stride_size[i],
                    ):
                        field = weights[
                            i,
                            j : j + len(self._filter_weight[i]),
                            k : k + len(self._filter_weight[i][0]),
                        ]
                        feature = field * self._filter_weight[i]
                        feature_row.append(np.sum(feature))
                    feature_map.append(feature_row)
                feature_maps.append(feature_map)
            return np.array(feature_maps)

        def feed_forward(
            self, weights: npt.NDArray[npt.NDArray[npt.NDArray[float]]]
        ) -> npt.NDArray[npt.NDArray[npt.NDArray[float]]]:
            super().feed_forward()
            result = self.convolute(weights)
            print("Convolution result: ")
            print(result)
            print()
            return result

    class DetectorLayer(Layer):
        def __init__(self):
            super().__init__("detector")

        @staticmethod
        def detect(feature: npt.NDArray) -> npt.NDArray:
            return np.maximum(feature, 0)

        def feed_forward(self, feature: npt.NDArray) -> npt.NDArray:
            super().feed_forward()
            result = self.detect(feature)
            print("Detector result: ")
            print(result)
            print()
            return result

    class PoolingLayer(Layer):
        def __init__(
            self, filter_size: int, stride_size: int, mode: str = "max"
        ) -> None:
            super().__init__("pooling")
            self.filter_size = filter_size
            self.stride_size = stride_size
            self.mode = mode

        def average(
            self, input_matrix: npt.NDArray, d: int, h: int, w: int
        ) -> npt.NDArray:
            h_start = h * self.stride_size
            w_start = w * self.stride_size
            h_end = h_start + self.filter_size
            w_end = w_start + self.filter_size
            return np.average(input_matrix[d, h_start:h_end, w_start:w_end])

        def max(self, input_matrix: npt.NDArray, d: int, h: int, w: int) -> npt.NDArray:
            h_start = h * self.stride_size
            w_start = w * self.stride_size
            h_end = h_start + self.filter_size
            w_end = w_start + self.filter_size
            return np.max(input_matrix[d, h_start:h_end, w_start:w_end])

        def pool(self, input_matrix: npt.NDArray) -> npt.NDArray:
            depth, height, width = input_matrix.shape
            filter_height = (height - self.filter_size) // self.stride_size + 1
            filter_width = (width - self.filter_size) // self.stride_size + 1
            pooled = np.zeros([depth, filter_height, filter_width], dtype=np.double)
            for d in range(0, depth):
                for h in range(0, filter_height):
                    for w in range(0, filter_width):
                        if self.mode == "average":
                            pooled[d, h, w] = self.average(input_matrix, d, h, w)
                        elif self.mode == "max":
                            pooled[d, h, w] = self.max(input_matrix, d, h, w)
            return pooled

        def feed_forward(self, input_matrix: npt.NDArray) -> npt.NDArray:
            super().feed_forward()
            result = self.pool(input_matrix)
            print("Pooling result: ")
            print(result)
            print()
            return result

    class DenseLayer(Layer):
        def __init__(self, unit_count: int, activation: str = "sigmoid") -> None:
            super().__init__("dense")
            self.unit_count = unit_count
            self.activation = activation
            self.bias = np.zeros(unit_count)
            self.weight = np.random.randn(unit_count)

        def dense(self, input_matrix: npt.NDArray) -> float:
            result = np.zeros(self.unit_count)

            for i in range(self.unit_count):
                input_weight = np.sum(self.weight[i] * input_matrix)
                result[i] = input_weight + self.bias[i]

            if self.activation == "sigmoid":
                return 1 / (1 + np.exp(-result))
            elif self.activation == "relu":
                return np.maximum(result, 0)

        def feed_forward(self, input_matrix: npt.NDArray) -> float:
            super().feed_forward()
            result = self.dense(input_matrix)
            print("Dense result: ")
            print(result)
            print()
            return result

    class FlattenLayer(Layer):
        def __init__(self):
            super().__init__("flatten")

        @staticmethod
        def flatten(input_matrix: npt.NDArray) -> npt.NDArray:
            return input_matrix.flatten()

        def feed_forward(self, input_matrix: npt.NDArray) -> npt.NDArray:
            super().feed_forward()
            result = self.flatten(input_matrix)
            print("Flatten result: ")
            print(result)
            print()
            return result

    def add_layer(self, name: str, **kwargs):
        match name:
            case "convolution":
                self._layers.append(self.ConvolutionLayer(**kwargs))
            case "detector":
                self._layers.append(self.DetectorLayer())
            case "pooling":
                self._layers.append(self.PoolingLayer(**kwargs))
            case "dense":
                self._layers.append(self.DenseLayer(**kwargs))
            case "flatten":
                self._layers.append(self.FlattenLayer())

    def feed_forward(self, tensor: npt.NDArray) -> None:
        for layer in self._layers:
            tensor = layer.feed_forward(tensor)
        print("Feedforward result: ")
        print(tensor)

    def back_propagate(self) -> None:
        pass

In [4]:
input_tensor = np.array(
    [
        [
            [1, 2, 3, 4, 5, 6],
            [7, 8, 9, 10, 11, 12],
            [13, 14, 15, 16, 17, 18],
            [19, 20, 21, 22, 23, 24],
            [25, 26, 27, 28, 29, 30],
            [31, 32, 33, 34, 35, 36],
        ]
    ]
)

model = Model()
model.add_layer("convolution", filter_size=[(3, 3)], padding_size=0, stride_size=[1])
model.add_layer("detector")
model.add_layer("pooling", filter_size=2, stride_size=1, mode="average")
model.add_layer("dense", unit_count=2, activation="sigmoid")
model.add_layer("flatten")
model.feed_forward(input_tensor)

Performing feed forward on convolution layer...

Randomly initialised filter weight: 
[[[0.2844609  0.36497594 0.8775294 ]
  [0.24961977 0.15627734 0.88738319]
  [0.13822589 0.50672309 0.97418619]]]

Convolution result: 
[[[ 38.13485931  42.574241    47.0136227   51.45300439]
  [ 64.77114947  69.21053117  73.64991286  78.08929456]
  [ 91.40743964  95.84682134 100.28620303 104.72558473]
  [118.04372981 122.4831115  126.9224932  131.36187489]]]

Performing feed forward on detector layer...

Detector result: 
[[[ 38.13485931  42.574241    47.0136227   51.45300439]
  [ 64.77114947  69.21053117  73.64991286  78.08929456]
  [ 91.40743964  95.84682134 100.28620303 104.72558473]
  [118.04372981 122.4831115  126.9224932  131.36187489]]]

Performing feed forward on pooling layer...

Pooling result: 
[[[ 53.67269524  58.11207693  62.55145863]
  [ 80.30898541  84.7483671   89.18774879]
  [106.94527557 111.38465727 115.82403896]]]

Performing feed forward on dense layer...

Dense result: 
[9.261339