In [92]:
import numpy as np
import nnfs
from nnfs.datasets import sine_data
from nnfs.datasets import spiral_data
import matplotlib.pyplot as plt
from timeit import timeit

In [93]:
nnfs.init()

In [94]:
class Layer_Dense:
    # Layer initialization
    def __init__(self, n_inputs, n_neurons, weight_regularizer_l1=0, weight_regularizer_l2=0, bias_regularizer_l1=0, bias_regularizer_l2=0) -> None:
        # Initialize weights and biases
        self.weights = 0.01 * np.random.randn(n_inputs, n_neurons)
        self.biases = np.zeros((1, n_neurons))
        # Set regularization strength
        self.weight_regularizer_l1 = weight_regularizer_l1
        self.weight_regularizer_l2 = weight_regularizer_l2
        self.bias_regularizer_l1 = bias_regularizer_l1
        self.bias_regularizer_l2 = bias_regularizer_l2

        """
        [[1, 2],
         [3, 4],
         [5, 6]]
        """

    # forward pass
    def forward(self, inputs, training):
        # Remember input values
        self.inputs = inputs

        # Calculate output values from inputs, weights and biases
        self.output = np.dot(inputs, self.weights) + self.biases
        pass

    def backward(self, dvalues):
        # Gradients on parameters
        self.dweights = np.dot(self.inputs.T, dvalues)
        self.dbiases = np.sum(dvalues, axis=0, keepdims=True)

        # Gradients on regularization
        # L1 on weights
        if self.weight_regularizer_l1 > 0:
            dL1 = np.ones_like(self.weights)
            dL1[self.weights < 0] = -1
            self.dweights += dL1
        # L2 on weights
        if self.weight_regularizer_l2 > 0:
            self.dweights += 2 * self.weights * self.weight_regularizer_l2

        # L1 on biases
        if self.bias_regularizer_l1 > 0:
            dL1 = np.ones_like(self.biases)
            dL1[self.biases < 0] = -1
            self.dbiases += dL1
        # L2 on biases
        if self.bias_regularizer_l2 > 0:
            self.dbiases += 2 * self.biases * self.weight_regularizer_l2

        # Gradients on values
        self.dinputs = np.dot(dvalues, self.weights.T)

In [95]:
class Activation_ReLU:
    # Forward pass
    def forward(self, inputs, training):
        # Remember inputs for backward pass
        self.inputs = inputs

        self.output = np.maximum(0, inputs)
    
    def backward(self, dvalues):
        self.dinputs = dvalues.copy()

        # Zero gradient where iunput values were negative
        self.dinputs[self.inputs <= 0] = 0

    def predictions(self, outputs):
        return outputs

In [96]:
class Activation_Softmax:
    def forward(self, inputs, training):
        exp_values = np.exp(inputs - np.max(inputs, axis=1, keepdims=True))
        probabilities = exp_values / np.sum(exp_values, axis=1, keepdims=True)
        self.output = probabilities

    def backward (self, dvalues):

        # Create uninitialized array
        self.dinputs = np.empty_like(dvalues)

        # Enumerate outputs and gradients
        for index, (single_output, single_dvalues) in enumerate(zip(self.output, dvalues)):
            # Flatten output array
            single_output = single_output.reshape(-1, 1)
            # Calculate Jacobian matrix of the output
            jacobian_matrix =  np.diagflat(single_output) - np.dot(single_output, single_output.T)
            # Calculate sample-wise gradient and add it to the array of sample gradients
            self.dinputs[index] = np.dot(jacobian_matrix, single_dvalues)

    def predictions (self, outputs):
        return np.argmax(outputs, axis=1)

In [97]:
class Loss:

    def remember_trainable_layers(self, trainable_layers):
        self.trainable_layers = trainable_layers

    def calculate(self, output, truth, *, include_regularization=False):
        sample_losses = self.forward(output, truth)

        data_loss = np.mean(sample_losses)

        if not include_regularization:
            return data_loss
        else: return data_loss, self.regularization_loss()

    def regularization_loss(self):

        regularization_loss = 0

        for layer in self.trainable_layers:
            if layer.weight_regularizer_l1 > 0:
                regularization_loss += layer.weight_regularizer_l1 * (np.sum(np.abs(layer.weights)))

            if layer.weight_regularizer_l2 > 0:
                regularization_loss += layer.weight_regularizer_l2 * (np.sum(layer.weights * layer.weights))

            if layer.bias_regularizer_l1 > 0:
                regularization_loss += layer.bias_regularizer_l1 * (np.sum(np.abs(layer.biases)))

            if layer.bias_regularizer_l1 > 0:
                regularization_loss += layer.bias_regularizer_l1 * (np.sum(layer.biases * layer.biases))

        return regularization_loss

In [98]:
class Loss_CategoricalCrossEntropy(Loss):     
    def forward(self, output, truth):
        clipped_output = np.clip(output, 1e-7, 1 - 1e-7)

        # Output should be an array of prediction arrays. Truth can either be a an array of indexes to query for loss, or a matrix of equal size with ground truths
        if (len(truth.shape) == 1):
            correct_confidences = clipped_output[range(len(clipped_output)), truth]
            losses = -np.log(correct_confidences)
            mean_loss = np.mean(losses)
            return mean_loss

        elif (len(truth.shape) == 2):
            correct_confidences = np.sum(clipped_output * truth, axis=1)
            losses = -np.log(correct_confidences)
            mean_loss = np.mean(losses)
            return mean_loss
        
    def backward(self, predicted_values, y_true):

        # Number of samples
        samples = len(predicted_values)
        # Number of labels in every sample
        labels = len(predicted_values[0])

        # If labels are sparse, turn them into one-hot vector
        if len(y_true.shape) == 1:
            y_true = np.eye(labels)[y_true]

        # Calculate gradient
        self.dinputs = -y_true / predicted_values
        # Normalize gradient
        self.dinputs = self.dinputs / samples

In [99]:
class Optimizer_SGD:

    # Initialize optimizer - set settings, learning rate of 1. is default for the optimizer
    def __init__(self, learning_rate=1.0, decay=0., momentum = 0.):
        self.learning_rate = learning_rate
        self.current_learning_rate = learning_rate
        self.decay = decay
        self.iterations = 0
        self.momentum = momentum

    
    # Call once before any parameter updates
    def pre_update_params(self):
        if self.decay:
            self.current_learning_rate = self.learning_rate * (1. / (1 + self.decay * self.iterations))

    # Update parameters
    def update_params(self, layer):

        if self.momentum:
            if not hasattr(layer, 'weight_momentums'):
                layer.weight_momentums = np.zeros_like(layer.weights)
                layer.bias_momentums = np.zeros_like(layer.biases)

            # Build weight updates with momentum - take previous updates multiplied by retain factor and update with current gradients
            weight_updates = self.momentum * layer.weight_momentums - (self.current_learning_rate * layer.dweights)
            layer.weight_momentums = weight_updates

            bias_updates = self.momentum * layer.bias_momentums - (self.current_learning_rate * layer.dbiases)
            layer.bias_momentums = bias_updates  

        # Vanilla SGD
        else:
            weight_updates = -self.current_learning_rate * layer.dweights
            bias_updates += -self.current_learning_rate * layer.dbiases
        
        layer.weights += weight_updates
        layer.biases += bias_updates

    def post_update_params(self):
        self.iterations += 1
        

In [100]:
class Optimizer_Adagrad(Optimizer_SGD):
    def __init__(self, learning_rate=1.0, decay=0., epsilon = 1e-7):
        super().__init__(learning_rate, decay, 0)
        self.epsilon = epsilon
    
    def update_params(self, layer):
        
        if not hasattr(layer, 'weight_cache'):
            layer.weight_cache = np.zeros_like(layer.weights)
            layer.bias_cache = np.zeros_like(layer.biases)
        
        # Update cache with squared current gradients
        layer.weight_cache += layer.dweights ** 2
        layer.bias_cache += layer.dbiases ** 2

        # Vanilla SGD paramter update + normalization with square rooted cache
        layer.weights += -self.current_learning_rate * layer.dweights / (np.sqrt(layer.weight_cache) + self.epsilon)
        layer.biases += -self.current_learning_rate * layer.dbiases / (np.sqrt(layer.bias_cache) + self.epsilon)

In [101]:
class Optimizer_RMSProp(Optimizer_SGD):
    def __init__(self, learning_rate=0.001, decay=0, epsilon = 1e-7, rho=0.9):
        super().__init__(learning_rate, decay, 0.)
        self.epsilon = epsilon
        self.rho = rho

    def update_params(self, layer):
        if not hasattr(layer, 'weight_cache'):
            layer.weight_cache = np.zeros_like(layer.weights)
            layer.bias_cache = np.zeros_like(layer.biases)

        layer.weight_cache = self.rho * layer.weight_cache + (1-self.rho) * (layer.dweights ** 2)
        layer.bias_cache = self.rho * layer.bias_cache + (1-self.rho) * (layer.dbiases ** 2)

        layer.weights += -self.current_learning_rate * layer.dweights / (np.sqrt(layer.weight_cache) + self.epsilon)
        layer.biases += -self.current_learning_rate * layer.dbiases / (np.sqrt(layer.bias_cache) + self.epsilon)

In [102]:
class Optimizer_Adam(Optimizer_SGD):
    def __init__(self, learning_rate=0.001, decay=0, epsilon=1e-7, beta_1=0.9, beta_2=0.999 ):
        super().__init__(learning_rate, decay)
        self.epsilon=epsilon
        self.beta_1=beta_1
        self.beta_2=beta_2

    def update_params(self, layer):
        
        # If layer does not contain cache arrays, generate and fill with zeros
        if not hasattr(layer, 'weight_cache'):
            layer.weight_cache = np.zeros_like(layer.weights)
            layer.weight_momentums = np.zeros_like(layer.weights)
            layer.bias_cache = np.zeros_like(layer.biases)
            layer.bias_momentums = np.zeros_like(layer.biases)

        # Update momentum with current gradients
        layer.weight_momentums = self.beta_1 * layer.weight_momentums + (1 - self.beta_1) * layer.dweights
        layer.bias_momentums = self.beta_1 * layer.bias_momentums + (1 - self.beta_1) * layer.dbiases

        # Get corrected momentum
        # self.iteration is 0 at first and we need to start with 1 here
        weight_momentums_corrected = layer.weight_momentums / (1 - self.beta_1 ** (self.iterations + 1))
        bias_momentums_corrected = layer.bias_momentums / (1 - self.beta_1 ** (self.iterations + 1))

        # Update cache with squared current gradients
        layer.weight_cache = self.beta_2 * layer.weight_cache + (1 - self.beta_2) * layer.dweights ** 2
        layer.bias_cache = self.beta_2 * layer.bias_cache + (1 - self.beta_2) * layer.dbiases ** 2

        # Get corrected cache
        weight_cache_corrected = layer.weight_cache / (1 - self.beta_2 ** (self.iterations + 1))
        bias_cache_corrected = layer.bias_cache / (1 - self.beta_2 ** (self.iterations + 1))

        # Vanilla SGD parameter update + normalization with square rooted cache
        layer.weights += -self.current_learning_rate * weight_momentums_corrected / (np.sqrt(weight_cache_corrected) + self.epsilon)
        layer.biases += -self.current_learning_rate * bias_momentums_corrected / (np.sqrt(bias_cache_corrected) + self.epsilon)

In [117]:
class Activation_Softmax_Loss_CategoricalCrossEntropy():

    def backward(self, predicted_values, y_true):
        # Number of samples
        samples = len(predicted_values)

        # If labels are one-hot encoded, turn them into discrete values
        if len(y_true.shape) == 2:
            y_true = np.argmax(y_true, axis=1)

        # Copy to safely modify
        self.dinputs = predicted_values.copy()
        # Calculate gradient
        self.dinputs[range(samples), y_true] -= 1
        # Normalize gradient
        self.dinputs = self.dinputs / samples

In [104]:
class Activation_Sigmoid:

    # Forward pass
    def forward(self, inputs, training):
        self.inputs = inputs
        self.output = 1 / (1 + np.exp(-inputs))

    # Backward pass
    def backward(self, dvalues):
        # Derivative of sigmoid
        self.dinputs = dvalues * self.output * (1 - self.output)

    def predictions(self, outputs):
        return (outputs > 0.5) * 1

In [105]:
class Activation_Linear:
    def forward(self, inputs, training):
        self.inputs = inputs
        self.output = inputs
    
    def backward(self, dvalues):
        self.dinputs = dvalues.copy()

    def predictions(self, outputs):
        return outputs

In [106]:
class Loss_MeanSquaredError(Loss):

    def forward(self, y_pred, y_true, training):
        sample_losses = np.mean((y_true - y_pred) ** 2, axis=1)
        return sample_losses
    
    def backward(self, dvalues, y_true):
        samples = len(dvalues)
        outputs = len(dvalues[0])

        # Calculate gradient
        self.dinputs = -2 * (y_true - dvalues) / outputs

        # Normalize gradient
        self.dinputs = self.dinputs/samples

In [107]:
class Loss_MeanAbsoluteError(Loss):

    def forward(self, y_pred, y_true, training):
        sample_losses = np.mean(np.abs(y_true - y_pred), axis=1)
        return sample_losses
    
    def backward(self, dvalues, y_true):
        samples = len(dvalues)
        outputs = len(dvalues[0])

        # Calculate gradient
        self.dinputs = np.sign(y_true - dvalues) / outputs

        # Normalize gradient
        self.dinputs = self.dinputs/samples

In [108]:
class Loss_BinaryCrossEntropy(Loss):

    def forward(self, y_pred, y_true):
        # Clip to prevent division by zero
        y_pred_clipped = np.clip(y_pred, 1e-7, 1-1e-7)

        # Calculate sample-wise loss
        sample_losses = -(y_true * np.log(y_pred_clipped) + (1 - y_true) * np.log(1 - y_pred_clipped))
        sample_losses = np.mean(sample_losses, axis=-1)

        return sample_losses
    
    def backward(self, dvalues, y_true):
        # Number of samples
        samples = len(dvalues)
        # Number of outputs in each sample
        outputs = len(dvalues[0])

        # Clip data
        clipped_dvalues = np.clip(dvalues, 1e-7, 1-1e-7)
        
        # Calculate gradient
        self.dinputs = -(y_true/clipped_dvalues - (1-y_true)/(1-clipped_dvalues)) / outputs

        # Normalize gradient
        self.dinputs = self.dinputs / samples

In [109]:
class Layer_Dropout:
    def __init__(self, rate):
        self.rate = 1 - rate

    def forward(self, inputs, training):
        self.inputs = inputs

        if not training:
            self.output = inputs.copy()
            return
        
        self.binary_mask = np.random.binomial(1, self.rate, size=inputs.shape) / self.rate
        self.output = inputs * self.binary_mask

    def backward(self, dvalues):
        self.dinputs = dvalues * self.binary_mask

In [110]:
class Layer_Input:
    def forward(self, inputs, training):
        self.output = inputs

In [111]:
class Accuracy:
    def calculate(self, predictions, y):
        comparisons = self.compare(predictions, y)
        accuracy = np.mean(comparisons)
        return accuracy

In [112]:
class Accuracy_Regression(Accuracy):
    def __init__(self):
        self.precision = None

    def init(self, y, reinit=False):
        if self.precision is None or reinit:
            self.precision = np.std(y) / 250

    def compare(self, predictions, y):
        return np.absolute(predictions - y) < self.precision

In [113]:
class Accuracy_Categorical(Accuracy):
    def __init__(self, *, binary=False):
        # Binary mode?
        self.binary = binary

    # No initialization needed
    def init(self, y):
        pass

    def compare(self, predictions, y):
        if not self.binary and len(y.shape) == 2:
            y = np.argmax(y, axis=1)
        return predictions == y

In [116]:
# Model class
class Model:
    def __init__(self):
        # Create a list of network obejects
        self.layers = []
        self.softmax_classifier_output = None

    def set(self, *, loss, optimizer, accuracy):
        self.loss = loss
        self.optimizer = optimizer
        self.accuracy = accuracy
    
    def add(self, layer):
        self.layers.append(layer)

    def train(self, X, y, *, epochs=1, print_every=100, validation_data=None):
        self.accuracy.init(y)

        for epoch in range (1, epochs+1):
            output = self.forward(X, training=True)
            
            # Calculate loss
            data_loss, regularization_loss = self.loss.calculate(output, y, include_regularization=True)
            loss = data_loss + regularization_loss

            # Get predictions and calculate accuracy
            predictions = self.output_layer_activation.predictions(output)
            accuracy = self.accuracy.calculate(predictions, y)

            # Perform backward pass
            self.backward(output, y)

            # Optimize layers
            self.optimizer.pre_update_params()
            for layer in self.trainable_layers:
                self.optimizer.update_params(layer)
            self.optimizer.post_update_params()

            if not epoch % print_every:
                print(f'epoch: {epoch},', f'acc: {accuracy:.3f},', f'loss: {loss:.3f},', f'data_loss: {data_loss:.3f},', f'reg_loss: {regularization_loss:.3f},', f'lr: {self.optimizer.current_learning_rate}')

        if validation_data is not None:
            X_val, y_val = validation_data
            output = self.forward(X_val, training=False)
            loss = self.loss.calculate(output, y_val)
            predictions = self.output_layer_activation.predictions(output)
            accuracy = self.accuracy.calculate(predictions, y_val)
            print(f'Validation,', f'acc: {accuracy:.3f},', f'loss: {loss:.3f}')
    
    def forward(self, X, training):
        # Call forward method on input layer to set output property that the first layer "prev" is expecting
        self.input_layer.forward(X, training)

        # Call forward method of every object in a chain, pass output of the previous object as a parameter
        for layer in self.layers:
            layer.forward(layer.prev.output, training)

        # "layer" is now the last object in the list, return its output
        return layer.output
    
    def backward(self, output, y):
        if self.softmax_classifier_output is not None:
            self.softmax_classifier_output.backward(output, y)
            self.layers[-1].dinputs = self.softmax_classifier_output.dinputs

            for layer in reversed(self.layers[:-1]):
                layer.backward(layer.next.dinputs)
            
            return
        
        # first call backward method on the loss, this will set dinputs property that the last layer will try to access
        self.loss.backward(output, y)

        # backpropagate
        for layer in reversed(self.layers):
            layer.backward(layer.next.dinputs)
        
    def finalize(self):
        self.input_layer = Layer_Input()
        layer_count = len(self.layers)
        self.trainable_layers = []

        for i in range(layer_count):
            # If it's the first layer, the previous layer object is the input layer
            if i == 0:
                self.layers[i].prev = self.input_layer
                self.layers[i].next = self.layers[i+1]
            elif i < layer_count - 1:
                    self.layers[i].prev = self.layers[i-1]
                    self.layers[i].next = self.layers[i+1]
            else:
                self.layers[i].prev = self.layers[i-1]
                self.layers[i].next = self.loss
                self.output_layer_activation = self.layers[i]

            if hasattr(self.layers[i], 'weights'):
                self.trainable_layers.append(self.layers[i])

        self.loss.remember_trainable_layers(self.trainable_layers)

        if isinstance(self.layers[-1], Activation_Softmax) and isinstance(loss, Loss_CategoricalCrossEntropy):
            self.softmax_classifier_output = Activation_Softmax_Loss_CategoricalCrossEntropy()


In [121]:
X, y = spiral_data(samples=100, classes=2)
y = y.reshape(-1, 1)

X_test, y_test = spiral_data(samples=100, classes=2)
y_test = y_test.reshape(-1, 1)

model = Model()

model.add(Layer_Dense(n_inputs=2, n_neurons=64, weight_regularizer_l2=1e-4, bias_regularizer_l2=1e-4))
model.add(Activation_ReLU())
model.add(Layer_Dropout(0.1))
model.add(Layer_Dense(64, 64))
model.add(Activation_Sigmoid())

model.set(loss=Loss_BinaryCrossEntropy(), optimizer=Optimizer_Adam(learning_rate=0.05, decay=5e-5), accuracy=Accuracy_Categorical(binary=True))

model.finalize()

model.train(X, y, epochs=10000, print_every=100, validation_data=(X_test, y_test))



epoch: 100, acc: 0.770, loss: 0.470, data_loss: 0.465, reg_loss: 0.006, lr: 0.04975371909050202
epoch: 200, acc: 0.830, loss: 0.394, data_loss: 0.383, reg_loss: 0.012, lr: 0.049507401356502806
epoch: 300, acc: 0.825, loss: 0.380, data_loss: 0.364, reg_loss: 0.015, lr: 0.0492635105177595
epoch: 400, acc: 0.875, loss: 0.344, data_loss: 0.327, reg_loss: 0.017, lr: 0.04902201088288642
epoch: 500, acc: 0.865, loss: 0.341, data_loss: 0.321, reg_loss: 0.020, lr: 0.048782867456949125
epoch: 600, acc: 0.860, loss: 0.330, data_loss: 0.309, reg_loss: 0.021, lr: 0.04854604592455945
epoch: 700, acc: 0.890, loss: 0.265, data_loss: 0.243, reg_loss: 0.022, lr: 0.048311512633460556
epoch: 800, acc: 0.885, loss: 0.286, data_loss: 0.263, reg_loss: 0.023, lr: 0.04807923457858551
epoch: 900, acc: 0.870, loss: 0.276, data_loss: 0.253, reg_loss: 0.023, lr: 0.04784917938657352
epoch: 1000, acc: 0.887, loss: 0.255, data_loss: 0.232, reg_loss: 0.024, lr: 0.04762131530072861


  self.output = 1 / (1 + np.exp(-inputs))


epoch: 1100, acc: 0.905, loss: 0.261, data_loss: 0.237, reg_loss: 0.024, lr: 0.04739561116640599
epoch: 1200, acc: 0.895, loss: 0.292, data_loss: 0.267, reg_loss: 0.025, lr: 0.04717203641681212
epoch: 1300, acc: 0.910, loss: 0.308, data_loss: 0.283, reg_loss: 0.025, lr: 0.04695056105920466
epoch: 1400, acc: 0.910, loss: 0.283, data_loss: 0.258, reg_loss: 0.025, lr: 0.04673115566147951
epoch: 1500, acc: 0.915, loss: 0.257, data_loss: 0.231, reg_loss: 0.026, lr: 0.046513791339132055
epoch: 1600, acc: 0.905, loss: 0.258, data_loss: 0.232, reg_loss: 0.025, lr: 0.04629843974258068
epoch: 1700, acc: 0.920, loss: 0.238, data_loss: 0.212, reg_loss: 0.026, lr: 0.046085073044840774
epoch: 1800, acc: 0.920, loss: 0.292, data_loss: 0.267, reg_loss: 0.025, lr: 0.04587366392953806
epoch: 1900, acc: 0.925, loss: 0.338, data_loss: 0.312, reg_loss: 0.026, lr: 0.04566418557925019
epoch: 2000, acc: 0.902, loss: 0.263, data_loss: 0.237, reg_loss: 0.026, lr: 0.045456611664166556
epoch: 2100, acc: 0.925, lo