In [71]:
import os
import numpy as np
from sklearn.datasets import load_diabetes
from PIL import Image
from sklearn.preprocessing import OneHotEncoder

# Dense Layer implementation

In [72]:
class DenseLayer:
    """
    Represents a dense layer in a neural network.

    Args:
        size (int): Number of neurons in the layer.
        input_layer (bool, optional): Whether the layer is an input layer. Defaults to False.
        activation (str, optional): Activation function for the layer. Defaults to "linear".
        use_bias (bool, optional): Whether to use biases in the layer. Defaults to True.
    """
    def __init__(
            self, 
            size, 
            *, 
            input_layer: bool = False,
            activation: str = "linear",
            use_bias: bool = True,
            ):
        self.size = size
        self.input_layer = input_layer
        self.activation = activation
        self.use_bias = use_bias

        self._input = None  # Placeholder for input data
        self._output = None  # Placeholder for output data

        self.w = None  # Weights matrix
        self._weight_gradient = None  # Gradient of weights matrix

        if self.use_bias:
            self.bias = None  # Our favorite bias vector
            self._bias_gradient = None  # Gradient of biases

    def _weightInit(self, input_size):
        """
        Initialize the weights and biases of the layer.

        Args:
            input_size (int): Number of neurons in the previous layer.
        """
        if self.input_layer:
            return  # Input layer doesn't require weights

        # Initialize weights matrix using a normal distribution with mean 0 and variance 1 / input_size
        self.w = np.random.normal(loc=0, scale=1 / input_size, size=(input_size, self.size))

        # Initialize biases as zeros
        self.bias = np.zeros((1, self.size))

    def activationFunction(self, z):
        """
        Applies the activation function to the input.

        Args:
            z (ndarray): Input values.

        Returns:
            ndarray: Output values after applying the activation function.
        """
        if self.activation == "linear":
            return z

        if self.activation == "relu":
            return np.maximum(z, np.zeros(z.shape))

        if self.activation == "sigmoid":
            return 1 / (1 + np.exp(-z))
        
    def _activationDerivative(self):
        """
        Computes the derivative of the activation function.

        Returns:
            ndarray: Derivative of the activation function for the layer's output.
        """
        if self.activation == "linear":
            return 1

        if self.activation == "relu":
            return (self._output > 0) * 1

        if self.activation == "sigmoid":
            return self._output * (1 - self._output)

    def _setGrad(self, grad):
        """
        Sets the gradients of weights and biases based on the given gradient.

        Args:
            grad (ndarray): Gradient of the layer's output with respect to the subsequent layer.

        Returns:
            ndarray: Gradient of the layer's output with respect to the current layer's input.
        """
        grad = grad * self._activationDerivative()
        self._weight_gradient = self._input.T @ grad

        if self.use_bias:
            self._bias_gradient = grad.sum(axis=0, keepdims=True)

        return grad @ self.w.T
    
    def _updateGrad(self, learning_rate):
        """
        Updates the weights and biases using gradient descent.

        Args:
            learning_rate (float): Learning rate for gradient descent.
        """
        self.w -= learning_rate * self._weight_gradient
        if self.use_bias:
            self.bias -= learning_rate * self._bias_gradient

    def __call__(self, X):
        """
        Computes the output of the layer given an input.

        Args:
            X (ndarray): Input data.

        Returns:
            ndarray: Output of the layer.
        """
        if self.input_layer:
            return X
        
        self._input = X
        self._output = self.activationFunction(X @ self.w + self.bias)

        return self._output


# Convolution Layer implementation

In [73]:
class Conv2d:
    """
    Represents a 2D convolutional layer in a neural network.

    Args:
        size (int): Number of output channels (number of filters).
        kernel_size (tuple): Size of the convolutional kernel (height, width).
        stride (int, optional): Stride for the convolution operation. Defaults to 1.
        activation (str, optional): Activation function for the layer. Defaults to "linear".
        global_pooling (str, optional): Global pooling operation to apply. Defaults to None.
        use_bias (bool, optional): Whether to use biases in the layer. Defaults to True.
    """
    def __init__(
            self,
            size: int,
            kernel_size: tuple,
            *,
            stride: int = 1,
            activation: str = "linear",
            global_pooling: str = None,
            use_bias: bool = True,
        ):
        self.size = size
        self.kernel_size = kernel_size
        self.stride = stride
        self.activation = activation
        self.global_pooling = global_pooling
        self.use_bias = use_bias

        self.kernel = None

        self._kernel_gradient = None # Gradient of the kernel

        if self.use_bias:
            self.bias = None # Our favorite bias vector
            self._bias_gradient = None # Gradient of biases

    def _weightInit(self, depth):
        """
        Initialize the weights and biases of the layer.

        Args:
            depth (int): Depth (number of channels) of the input tensor.
        """
        self.kernel_size = self.kernel_size + (depth, self.size)

        # Initialize kernel using a normal distribution
        self.kernel = np.random.random((self.kernel_size))        

        self._kernel_gradient = np.zeros_like(self.kernel)

        if self.use_bias:
            # Initialize biases as zeros
            self.bias = np.zeros((self.size, 1))

            self._bias_gradient = np.zeros_like(self.bias)

    def activationFunction(self, z):
        """
        Applies the activation function to the input.

        Args:
            z (ndarray): Input values.

        Returns:
            ndarray: Output values after applying the activation function.
        """
        if self.activation == "linear":
            return z

        if self.activation == "relu":
            return np.maximum(z, 0)
        
        if self.activation == "sigmoid":
            return 1 / (1 + np.exp(-z))

    def _activationDerivative(self):
        """
        Computes the derivative of the activation function.

        Returns:
            ndarray: Derivative of the activation function for the layer's output.
        """
        if self.activation == "linear":
            return 1

        if self.activation == "relu":
            return (self._output > 0) * 1

        if self.activation == "sigmoid":
            return self._output * (1 - self._output)

    def _setGrad(self, grad):
        """
        Sets the gradients of weights and biases based on the given gradient.

        Args:
            grad (ndarray): Gradient of the layer's output with respect to the subsequent layer.

        Returns:
            ndarray: Gradient of the layer's output with respect to the current layer's input.
        """
        
        if self.global_pooling == "average":
            grad = grad.T[np.newaxis, np.newaxis, ...] # Do some magic
            output_gradient = np.ones(self.output_shape) / (self.output_shape[0] * self.output_shape[1])
        else:
            output_gradient = np.ones(self.output_shape)

        if self.use_bias:
            # Compute gradient of biases
            self._bias_gradient = (grad * output_gradient).reshape(grad.shape[-2], -1).sum(axis=1, keepdims=True) # Do some magic
        
        output_gradient = output_gradient * self._activationDerivative() * grad

        self._kernel_gradient = np.zeros_like(self.kernel)
        self._input_gradient = np.zeros_like(self._input)
        
        for index in range(self._input.shape[-1]):
            for i in range(len(self._indices_axis1)):
                    x_1, x_2 = self._indices_axis1[i]
                    for j in range(len(self._indices_axis2)):
                        y_1, y_2 = self._indices_axis2[j]

                        # Update kernel gradient using chain rule (May be something wrong here)
                        self._kernel_gradient += self._input[x_1:x_2, y_1:y_2, :, [index]] * output_gradient[i, j, :, [[[index]]]]
                        self._input_gradient[x_1:x_2, y_1:y_2, :, [index]] += (self.kernel * output_gradient[i, j, :, [[[index]]]]).sum(axis=3, keepdims=True)

        return self._input_gradient

    def _updateGrad(self, learning_rate):
        """
        Update the weights and biases based on their gradients.

        Args:
            learning_rate (float): Learning rate for the update.
        """

        self.kernel -= learning_rate * self._kernel_gradient
        self._kernel_gradient = np.zeros_like(self.kernel)

        if self.use_bias:
            self.bias -= learning_rate * self._bias_gradient
            self._bias_gradient = np.zeros_like(self.bias)


    def __call__(self, tensor):
        """
        Performs the forward pass of the convolutional layer.

        Args:
            tensor (ndarray): Input tensor.

        Returns:
            ndarray: Output of the convolutional layer.
        """
        self._input = tensor

        input_shape = np.array(tensor.shape)
        feature_map_shape = ((input_shape[:2]  - self.kernel_size[:2]) / self.stride).astype(int) + 1
        self.output_shape = np.concatenate([feature_map_shape, [self.size, input_shape[-1]]]) # -> (feature_map_shape, filters, images)

        self._output = np.zeros(self.output_shape)

        self._indices_axis1 = [(i - self.kernel_size[0], i) for i in range(self.kernel_size[0], input_shape[0] + 1, self.stride)]
        self._indices_axis2 = [(i - self.kernel_size[1], i) for i in range(self.kernel_size[1], input_shape[1] + 1, self.stride)]

        for index in range(input_shape[-1]):
            for i in range(len(self._indices_axis1)):
                    x_1, x_2 = self._indices_axis1[i]
                    for j in range(len(self._indices_axis2)):
                        y_1, y_2 = self._indices_axis2[j]

                        self._output[i, j, :, index] = (self._input[x_1:x_2, y_1:y_2, :, [index]] * self.kernel).sum(axis=(0, 1, 2))
        
        self._output = self.activationFunction(self._output)

        if self.use_bias:
            self._output += self.bias

        if self.global_pooling == "average":
            return self._output.mean(axis=(0, 1)).T

        return self._output

# Network implementation

In [107]:
class NeuralNetwork:
    """
    A class representing a Neural Network.

    Attributes:
        layers (list): List of Layer objects representing the network layers.
        loss_function (str): Loss function to be used for training (default: "mse").
        learning_rate (float): Learning rate for gradient descent optimization (default: 0.01).
        verbose (bool): Flag indicating whether to print progress during training (default: True).
        input_depth (int): Number of channels in the input data (default: 3).
        epochs (int): Number of training epochs (default: 1).
        batch_size (int): Size of the training batches (default: 32).

    Methods:
        lossFunction(y_true, y_pred):
            Computes the loss function value for the given true and predicted labels.

        fit(X, y):
            Trains the neural network on the provided input and output data.

        predict(X):
            Performs forward pass and returns the predicted labels for the input data.

        forward(X):
            Performs forward propagation through the network layers and returns the output.

        backward(y_pred, y_true):
            Performs backward propagation to update the gradients and weights of the network layers.
    """

    def __init__(
            self,
            layers: list,
            loss_function: str = "mse",
            learning_rate=0.01,
            verbose: bool = True,
            input_depth: int = 3,
            epochs: int = 1,
            batch_size: int = 32,
    ):
        """
        Initializes a NeuralNetwork instance with the provided parameters.

        Args:
            layers (list): List of Layer objects representing the network layers.
            loss_function (str): Loss function to be used for training (default: "mse").
            learning_rate (float): Learning rate for gradient descent optimization (default: 0.01).
            verbose (bool): Flag indicating whether to print progress during training (default: False).
            input_depth (int): Number of channels in the input data (default: 3).
            epochs (int): Number of training epochs (default: 1).
            batch_size (int): Size of the training batches (default: 32).
        """
        self.layers = layers
        self.loss_function = loss_function
        self.learning_rate = learning_rate
        self.verbose = verbose
        self.input_depth = input_depth
        self.epochs = epochs
        self.batch_size = batch_size

        # Weights initializing:
        self.layers[0]._weightInit(self.input_depth)

        for i in range(1, len(self.layers)):
            self.layers[i]._weightInit(self.layers[i - 1].size)
            # Initialize weights for each layer

    def lossFunction(self, y_true, y_pred):
        """
        Computes the loss function value for the given true and predicted labels.

        Args:
            y_true: True labels.
            y_pred: Predicted labels.

        Returns:
            The computed loss function value.
        """
        if self.loss_function == "mse":
            return 0.5 * np.mean(np.linalg.norm(y_pred - y_true, axis=1) ** 2)

        if self.loss_function == "cross_entropy":
            self.probabilities_ = np.exp(y_pred - y_pred.max(axis=1, keepdims=True)) # avoid overflow
            self.probabilities_ = self.probabilities_ / self.probabilities_.sum(axis=1, keepdims=True)

            return -(np.log(self.probabilities_[np.arange(y_true.shape[0]), np.argmax(y_true, axis=1)])).mean()

        # Add other loss functions here

    def _lossFunctionDerivative(self, y_pred, y_true):
        """
        Computes the derivative of the loss function with respect to the predicted labels.

        Args:
            y_pred: Predicted labels.
            y_true: True labels.

        Returns:
            The computed derivative of the loss function.
        """
        if self.loss_function == "mse":
            derivative = 1 / len(y_pred) * (y_pred - y_true)

        if self.loss_function == "cross_entropy":
            return 1 / len(y_true) * (self.probabilities_ - y_true)

        return derivative

    def fit(self, X, y):
        """
        Trains the neural network on the provided input and output data.

        Args:
            X: Input data.
            y: Output data.
        """
        batch_separation = [(i, i + self.batch_size) for i in range(0, X.shape[-1], self.batch_size)]  # Get batch indices
        epoch_len = len(batch_separation)

        indices = np.arange(X.shape[-1])

        for _ in range(self.epochs):
            np.random.shuffle(indices)  # Shuffle the training data

            for iter, (i, j) in enumerate(batch_separation):
                X_ = X[:, :, :, indices[i:j]]  # Get current batch
                y_ = y[indices[i:j]]  # Get current batch

                pred = self.forward(X_)

                loss = self.lossFunction(y_, pred)

                if self.verbose:
                    accuracy = (pred.argmax(axis=1) == y_.argmax(axis=1)).mean()
                    process_percent = int(iter / epoch_len * 10)
                    print(
                        f"\r Epoch {_ + 1}/{self.epochs}; Batch {iter}/{epoch_len}: [{process_percent * '=' + '>' + (10 - process_percent) * '-'}] - loss: {loss}; accuracy: {accuracy}",
                        end='',
                    )

                self.backward(pred, y_)

            if self.verbose:
                print(
                    f"\r Epoch {_ + 1}/{self.epochs}; Batch {iter + 1}/{epoch_len}: [{11 * '='}] - loss: {loss}; accuracy: {accuracy}"
                )

    def predict(self, X):
        """
        Performs forward pass and returns the predicted labels for the input data.

        Args:
            X: Input data.

        Returns:
            The predicted labels.
        """
        return self.forward(X)

    def forward(self, X):
        """
        Performs forward propagation through the network layers and returns the output.

        Args:
            X: Input data.

        Returns:
            The output of the network.
        """
        X_ = np.copy(X)

        for layer in self.layers:
            X_ = layer(X_)

        return X_

    def backward(self, y_pred, y_true):
        """
        Performs backward propagation to update the gradients and weights of the network layers.

        Args:
            y_pred: Predicted labels.
            y_true: True labels.
        """
        gradient = self._lossFunctionDerivative(y_pred, y_true)

        for layer in reversed(self.layers):
            gradient = layer._setGrad(gradient)
            layer._updateGrad(self.learning_rate)

# Make Data and testing

In [75]:
ohe = OneHotEncoder(sparse_output=False)

labels = {
    "cucumber": 0,
    "eggplant": 1,
    "mushroom": 2,
}

path = "./test_data/train/"

X = np.concatenate([np.asarray(Image.open(path + image_path))[..., np.newaxis] / 255 for image_path in os.listdir(path)], axis=3)

y_cat = np.array([label.split('_')[0] for label in os.listdir(path)]).reshape(-1, 1)
y = ohe.fit_transform(y_cat)

# Something wrong with this implementation

In [113]:
nn = NeuralNetwork(layers=[
        # Conv2d(64, (15, 15), stride=10, activation='relu'),
        Conv2d(3, (15, 15), stride=10, activation='relu', global_pooling="average"),
        # DenseLayer(3, activation='relu'),
    ],
    loss_function="cross_entropy",
    learning_rate=0.001,
    verbose=True,
    batch_size=8,
    epochs=5,
)

nn.fit(X, y)

