In [None]:
import numpy as np

## Layers

In [None]:
# Dense layer
class Layer_Dense:

    # Layer initialisation
    def __init__(self, n_inputs, n_neurons):
        # Initialize weights randomly and biases to 0
        self.weights = 0.01 * np.random.randn(n_inputs, n_neurons)
        self.biases = np.zeros((1, n_neurons))

    # Forward pass
    def forward(self, inputs, training):

        # Remember input values for calculating partial derivative 
        # during backpropagation
        self.inputs = inputs

        # Calculate output values from inputs, weights and biases
        self.output = np.dot(inputs, self.weights) + self.biases


    # Backward pass
    # dvalues - gradient passsed from the activation function
    def backward(self, dvalues):

        # Gradients on parameters
        self.dweights = np.dot(self.inputs.T, dvalues)
        self.dbiases = np.sum(dvalues, axis=0, keepdims=True)

        # Gradient on values
        self.dinputs = np.dot(dvalues, self.weights.T)

# Input "layer"
class Layer_Input:

    # Pass inputs on to the first hidden layer
    def forward(self, inputs, training):
        self.output = inputs

## Activation Functions

Includes the ReLU activationfunction used in the hidden layers and the Softmax activation function at the output layer.

In [None]:
# ReLU activation
class Activation_ReLU:

    # Forward pass
    def forward(self, inputs, training):

        # Remember input values for calculating partial derivative 
        # during backpropagation
        self.inputs = inputs

        # Calculate output values from inputs
        self.output = np.maximum(0, inputs)

    # Backward pass
    def backward(self, dvalues):

        # Make a copy of dvalues - derivatives from the next layer
        # to modify them
        self.dinputs = dvalues.copy()

        # Derivative of ReLU is the same as that passed from the next layer 
        # if it is not negative
        # Zero gradient where input values were negative
        self.dinputs[self.inputs <= 0] = 0

    # Calculate predictions for outputs
    def predictions(self, outputs):
        return outputs


# Softmax activation
class Activation_Softmax:

    # Forward pass
    def forward(self, inputs, training):

        # Remember input values for calculating partial derivative 
        # during backpropagation
        self.inputs = inputs

        # Get unnormalized probabilities
        # Subtract the largest of the inputs before doing exponentiation
        # to prevent overflow/"exploding" values down the line
        exp_values = np.exp(inputs - np.max(inputs, axis=1,keepdims=True))

        # Normalize them for each sample 
        # to convert to a probability distribution
        probabilities = exp_values / np.sum(exp_values, axis=1,keepdims=True)

        self.output = probabilities

    # Backward pass
    def backward(self, dvalues):

        # Create uninitialized array
        self.dinputs = np.empty_like(dvalues)

        # Enumerate outputs and gradients
        for index, (single_output, single_dvalues) in \
                enumerate(zip(self.output, dvalues)):

            # Flatten output array
            single_output = single_output.reshape(-1, 1)

            # Calculate Jacobian matrix of the output and
            jacobian_matrix = np.diagflat(single_output) - \
                              np.dot(single_output, single_output.T)

            # Calculate sample-wise gradient
            # and add it to the array of sample gradients
            self.dinputs[index] = np.dot(jacobian_matrix,
                                         single_dvalues)

    # Calculate predictions for outputs
    def predictions(self, outputs):
        return np.argmax(outputs, axis=1)

## Optimizers

In [None]:
class Optimizer_Adam:

    # Initialize optimizer - set settings
    def __init__(self, learning_rate=0.001, decay=0., epsilon=1e-7,
                 beta_1=0.9, beta_2=0.999):
        self.learning_rate = learning_rate
        self.current_learning_rate = learning_rate
        self.decay = decay
        self.iterations = 0
        self.epsilon = epsilon
        self.beta_1 = beta_1
        self.beta_2 = beta_2

    # Call once before any parameter updates
    def pre_update_params(self):
        if self.decay:
            self.current_learning_rate = self.learning_rate * \
                (1. / (1. + self.decay * self.iterations))

    # Update parameters
    def update_params(self, layer):

        # If layer does not contain cache arrays,
        # create them filled with zeros
        if not hasattr(layer, 'weight_cache'):
            layer.weight_momentums = np.zeros_like(layer.weights)
            layer.weight_cache = np.zeros_like(layer.weights)
            layer.bias_momentums = np.zeros_like(layer.biases)
            layer.bias_cache = np.zeros_like(layer.biases)

        # Update momentum  with current gradients
        layer.weight_momentums = self.beta_1 * \
                                 layer.weight_momentums + \
                                 (1 - self.beta_1) * layer.dweights
        layer.bias_momentums = self.beta_1 * \
                               layer.bias_momentums + \
                               (1 - self.beta_1) * layer.dbiases

        # Get corrected momentum
        # self.iteration is 0 at first pass
        # and we need to start with 1 here
        weight_momentums_corrected = layer.weight_momentums / \
            (1 - self.beta_1 ** (self.iterations + 1))
        bias_momentums_corrected = layer.bias_momentums / \
            (1 - self.beta_1 ** (self.iterations + 1))

        # Update cache with squared current gradients
        layer.weight_cache = self.beta_2 * layer.weight_cache + \
            (1 - self.beta_2) * layer.dweights**2
        layer.bias_cache = self.beta_2 * layer.bias_cache + \
            (1 - self.beta_2) * layer.dbiases**2

        # Get corrected cache
        weight_cache_corrected = layer.weight_cache / \
            (1 - self.beta_2 ** (self.iterations + 1))
        bias_cache_corrected = layer.bias_cache / \
            (1 - self.beta_2 ** (self.iterations + 1))

        # Vanilla SGD parameter update + normalization
        # with square rooted cache
        layer.weights += -self.current_learning_rate * \
                         weight_momentums_corrected / \
                         (np.sqrt(weight_cache_corrected) +
                             self.epsilon)
        layer.biases += -self.current_learning_rate * \
                         bias_momentums_corrected / \
                         (np.sqrt(bias_cache_corrected) +
                             self.epsilon)

    # Call once after any parameter updates
    def post_update_params(self):
        self.iterations += 1

## Loss Function

In [None]:
# Cross-entropy loss
class Loss_CategoricalCrossentropy:

    # Set/remember trainable layers
    def remember_trainable_layers(self, trainable_layers):
        self.trainable_layers = trainable_layers

    # Calculates the data losses
    # given model output and ground truth values
    def calculate(self, output, y):

        # Calculate sample losses in a batch
        sample_losses = self.forward(output, y)

        # Calculate mean loss for the batch
        data_loss = np.mean(sample_losses)

        # Add accumulated sum of losses and sample count
        self.accumulated_sum += np.sum(sample_losses)
        self.accumulated_count += len(sample_losses)

        return data_loss

    # Calculates accumulated loss
    def calculate_accumulated(self):

        # Calculate mean loss
        data_loss = self.accumulated_sum / self.accumulated_count

        return data_loss

    # Reset variables for accumulated loss
    def new_pass(self):
        self.accumulated_sum = 0
        self.accumulated_count = 0

    # Forward pass
    def forward(self, y_pred, y_true):

        # Number of samples in a batch
        samples = len(y_pred)

        # Clip data to prevent division by 0
        # Clip both sides to not drag mean towards any value
        y_pred_clipped = np.clip(y_pred, 1e-7, 1 - 1e-7)

        # Probabilities for target values -
        # if sparse labels
        if len(y_true.shape) == 1:
            correct_confidences = y_pred_clipped[
                range(samples),
                y_true
            ]

        # If one-hot encoded labels - convert to sparse
        elif len(y_true.shape) == 2:
            correct_confidences = np.sum(
                y_pred_clipped * y_true,
                axis=1
            )

        # Losses
        negative_log_likelihoods = -np.log(correct_confidences)
        return negative_log_likelihoods

    # Backward pass
    def backward(self, dvalues, y_true):

        # Number of samples
        samples = len(dvalues)

        # Number of labels in every sample
        # We'll use the first sample to count them
        labels = len(dvalues[0])

        # If labels are sparse, turn them into one-hot vector
        if len(y_true.shape) == 1:
            y_true = np.eye(labels)[y_true]

        # Calculate gradient
        self.dinputs = -y_true / dvalues

        # Normalize gradient to make their sum’s (calculated by optimizers)
        # magnitude invariant to the number of samples
        self.dinputs = self.dinputs / samples


# Softmax classifier - combined Softmax activation
# and cross-entropy loss for faster backward step
class Activation_Softmax_Loss_CategoricalCrossentropy():

    # Backward pass
    def backward(self, dvalues, y_true):

        # Number of samples
        samples = len(dvalues)

        # If labels are one-hot encoded,
        # turn them into discrete values
        if len(y_true.shape) == 2:
            y_true = np.argmax(y_true, axis=1)

        # Copy so we can safely modify
        self.dinputs = dvalues.copy()

        # Calculate gradient
        self.dinputs[range(samples), y_true] -= 1

        # Normalize gradient
        self.dinputs = self.dinputs / samples

## Accuracy

In [None]:
# Accuracy calculation for classification model
class Accuracy_Categorical:

    # No initialization is needed
    def init(self, y):
        pass

    # Calculates an accuracy
    # given predictions and ground truth values
    def calculate(self, predictions, y):

        # Get comparison results
        comparisons = self.compare(predictions, y)

        # Calculate an accuracy
        accuracy = np.mean(comparisons)

        # Add accumulated sum of matching values and sample count
        self.accumulated_sum += np.sum(comparisons)
        self.accumulated_count += len(comparisons)

        # Return accuracy
        return accuracy

    # Calculates accumulated accuracy
    def calculate_accumulated(self):

        # Calculate an accuracy
        accuracy = self.accumulated_sum / self.accumulated_count

        return accuracy

    # Reset variables for accumulated accuracy
    def new_pass(self):
        self.accumulated_sum = 0
        self.accumulated_count = 0

    # Compares predictions to the ground truth values
    def compare(self, predictions, y):
        if len(y.shape) == 2:
            y = np.argmax(y, axis=1)
        # print(predictions)
        # print(y)
        # print(predictions.shape)
        # print(y.shape)
        return predictions == y

## Neural Network Model Class


A model class to make it easier to build and train individual models

In [None]:
class Model:

    def __init__(self):
        # Create a list of network objects
        self.layers = []
        # Softmax classifier's output object
        self.softmax_classifier_output = \
                Activation_Softmax_Loss_CategoricalCrossentropy()

    # Add objects to the model
    def add(self, layer):
        self.layers.append(layer)

    # Set loss, optimizer and accuracy
    def set(self, *, loss, optimizer, accuracy):
        self.loss = loss
        self.optimizer = optimizer
        self.accuracy = accuracy

    # Finalize the model
    def finalize(self):

        # Create and set the input layer
        self.input_layer = Layer_Input()

        # Count all the objects
        layer_count = len(self.layers)

        # Initialize a list containing trainable layers:
        self.trainable_layers = []

        # Iterate the objects
        for i in range(layer_count):

            # If it's the first layer,
            # the previous layer object is the input layer
            if i == 0:
                self.layers[i].prev = self.input_layer
                self.layers[i].next = self.layers[i+1]

            # All hidden layers between first and last
            elif i < layer_count - 1:
                self.layers[i].prev = self.layers[i-1]
                self.layers[i].next = self.layers[i+1]

            # The last layer - the next object is the loss
            # Also save a reference to the last object
            # whose output is the model's output
            else:
                self.layers[i].prev = self.layers[i-1]
                self.layers[i].next = self.loss
                self.output_layer_activation = self.layers[i]

            # If layer contains an attribute called "weights",
            # it's a trainable layer -
            # add it to the list of trainable layers
            # Don't need to check for biases -
            # checking for weights is enough
            if hasattr(self.layers[i], 'weights'):
                self.trainable_layers.append(self.layers[i])

            # Update loss object with trainable layers
            self.loss.remember_trainable_layers(
                self.trainable_layers
            )

    # Train the model
    def train(self, X, y, *, epochs=1, batch_size=None,
              print_every=1, validation_data=None):

        # Initialize accuracy object
        self.accuracy.init(y)

        # Default value if batch size is not set
        train_steps = 1

        # If there is validation data passed,
        # set default number of steps for validation as well
        if validation_data is not None:
            validation_steps = 1

            # For better readability
            X_val, y_val = validation_data

        # Calculate number of steps
        if batch_size is not None:
            train_steps = len(X) // batch_size
            # Dividing rounds down. If there are some remaining
            # data but not a full batch, this won't include it
            # Add `1` to include this not full batch
            if train_steps * batch_size < len(X):
                train_steps += 1

            if validation_data is not None:
                validation_steps = len(X_val) // batch_size
                # Dividing rounds down. If there are some remaining
                # data but nor full batch, this won't include it
                # Add `1` to include this not full batch
                if validation_steps * batch_size < len(X_val):
                    validation_steps += 1

        # Main training loop
        for epoch in range(1, epochs+1):

            # Print epoch number
            print('==============================================')
            print(f'epoch: {epoch}')

            # Reset accumulated values in loss and accuracy objects
            self.loss.new_pass()
            self.accuracy.new_pass()

            # Iterate over steps
            for step in range(train_steps):

                # If batch size is not set -
                # train using one step and full dataset
                if batch_size is None:
                    batch_X = X
                    batch_y = y

                # Otherwise slice a batch
                else:
                    batch_X = X[step*batch_size:(step+1)*batch_size]
                    batch_y = y[step*batch_size:(step+1)*batch_size]

                # Perform the forward pass
                output = self.forward(batch_X, training=True)

                # Calculate loss
                loss = self.loss.calculate(output, batch_y)

                # Get predictions and calculate an accuracy
                predictions = self.output_layer_activation.predictions(
                                  output)
                accuracy = self.accuracy.calculate(predictions,
                                                   batch_y)

                # Perform backward pass
                self.backward(output, batch_y)

                # Optimize (update parameters)
                self.optimizer.pre_update_params()
                for layer in self.trainable_layers:
                    self.optimizer.update_params(layer)
                self.optimizer.post_update_params()

                # Print a summary
                if not step % print_every or step == train_steps - 1:
                    print(f'step: {step}, ' +
                          f'acc: {accuracy:.3f}, ' +
                          f'loss: {loss:.3f} ' +
                          f'lr: {self.optimizer.current_learning_rate}')

            # Get and print epoch loss and accuracy
            epoch_loss = self.loss.calculate_accumulated()
            epoch_accuracy = self.accuracy.calculate_accumulated()

            print(f'training, ' +
                  f'acc: {epoch_accuracy:.3f}, ' +
                  f'loss: {epoch_loss:.3f} ' +
                  f'lr: {self.optimizer.current_learning_rate}')

            # If there is the validation data
            if validation_data is not None:

                # Evaluate the model:
                self.evaluate(*validation_data,
                              batch_size=batch_size)


    # Performs forward pass
    def forward(self, X, training):

        # Call forward method on the input layer
        # this will set the output property that
        # the first layer in "prev" object is expecting
        self.input_layer.forward(X, training)

        # Call forward method of every object in a chain
        # Pass output of the previous object as a parameter
        for layer in self.layers:
            layer.forward(layer.prev.output, training)

        # "layer" is now the last object from the list,
        # return its output
        return layer.output

    # Performs backward pass
    def backward(self, output, y):

        # First call backward method
        # on the combined activation/loss
        # this will set dinputs property
        self.softmax_classifier_output.backward(output, y)

        # Since backward method of the last layer
        # which is Softmax activation won't be called
        # as we used combined activation/loss
        # object, set dinputs in this object
        self.layers[-1].dinputs = \
            self.softmax_classifier_output.dinputs

        # Call backward method going through
        # all the objects but last
        # in reversed order passing dinputs as a parameter
        for layer in reversed(self.layers[:-1]):
            layer.backward(layer.next.dinputs)

        return
    
    # Evaluates the model using passed in dataset
    def evaluate(self, X_val, y_val, *, batch_size=None):

        # Default value if batch size is not being set
        validation_steps = 1

        # Calculate number of steps
        if batch_size is not None:
            validation_steps = len(X_val) // batch_size
            # Dividing rounds down. If there are some remaining
            # data, but not a full batch, this won't include it
            # Add `1` to include this not full minibatch
            if validation_steps * batch_size < len(X_val):
                validation_steps += 1

        # Reset accumulated values in loss
        # and accuracy objects
        self.loss.new_pass()
        self.accuracy.new_pass()

        # Iterate over steps
        for step in range(validation_steps):

            # If batch size is not set -
            # train using one step and full dataset
            if batch_size is None:
                batch_X = X_val
                batch_y = y_val

            # Otherwise slice a batch
            else:
                batch_X = X_val[
                    step*batch_size:(step+1)*batch_size
                ]
                batch_y = y_val[
                    step*batch_size:(step+1)*batch_size
                ]

            # Perform the forward pass
            output = self.forward(batch_X, training=False)

            # Calculate the loss
            self.loss.calculate(output, batch_y)

            # Get predictions and calculate an accuracy
            predictions = self.output_layer_activation.predictions(
                              output)
            self.accuracy.calculate(predictions, batch_y)

        # Get and print validation loss and accuracy
        validation_loss = self.loss.calculate_accumulated()
        validation_accuracy = self.accuracy.calculate_accumulated()

        # Print a summary
        print(f'validation, ' +
              f'acc: {validation_accuracy:.3f}, ' +
              f'loss: {validation_loss:.3f}')

## Training and Evaluation: Iris dataset

In [None]:
from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler

# import the dataset
iris = datasets.load_iris()
X = iris.data  
y = iris.target

# rescales the data set such that all feature values are in the range [-1, 1]
scaler = MinMaxScaler([-1, 1])
X = scaler.fit_transform(X)

# split the training, evaluation and test sets
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.20, random_state=123)

X_train, X_validate, y_train, y_validate = train_test_split(
    X_train, y_train, test_size=0.25, random_state=456)

# Instantiate the model
modelx = Model()

# Add layers
modelx.add(Layer_Dense(X_train.shape[1], 128))
modelx.add(Activation_ReLU())
modelx.add(Layer_Dense(128, 128))
modelx.add(Activation_ReLU())
modelx.add(Layer_Dense(128, 3))
modelx.add(Activation_Softmax())

# Set loss, optimizer and accuracy objects
modelx.set(
    loss=Loss_CategoricalCrossentropy(),
    optimizer=Optimizer_Adam(decay=1e-4),
    accuracy=Accuracy_Categorical()
)

# Finalize the model
modelx.finalize()

# Train the model
modelx.train(X_train, y_train, validation_data=(X_validate, y_validate),
            epochs=150, batch_size=32, print_every=1)

# Evaluate the model
print("Evaluation on test set")
modelx.evaluate(X_test, y_test)

epoch: 1
step: 0, acc: 0.312, loss: 1.099 lr: 0.001
step: 1, acc: 0.656, loss: 1.098 lr: 0.000999900009999
step: 2, acc: 0.308, loss: 1.098 lr: 0.0009998000399920016
training, acc: 0.433, loss: 1.098 lr: 0.0009998000399920016
validation, acc: 0.200, loss: 1.098
epoch: 2
step: 0, acc: 0.375, loss: 1.097 lr: 0.000999700089973008
step: 1, acc: 0.406, loss: 1.097 lr: 0.0009996001599360256
step: 2, acc: 0.308, loss: 1.096 lr: 0.0009995002498750624
training, acc: 0.367, loss: 1.097 lr: 0.0009995002498750624
validation, acc: 0.467, loss: 1.096
epoch: 3
step: 0, acc: 0.625, loss: 1.095 lr: 0.0009994003597841295
step: 1, acc: 0.719, loss: 1.093 lr: 0.0009993004896572402
step: 2, acc: 0.615, loss: 1.092 lr: 0.0009992006394884093
training, acc: 0.656, loss: 1.093 lr: 0.0009992006394884093
validation, acc: 0.533, loss: 1.092
epoch: 4
step: 0, acc: 0.656, loss: 1.090 lr: 0.0009991008092716555
step: 1, acc: 0.719, loss: 1.086 lr: 0.0009990009990009992
step: 2, acc: 0.615, loss: 1.085 lr: 0.000998901

## Training and Evaluation: MNIST dataset

In [None]:
from tensorflow.keras.datasets import mnist

# Load data and split into 3 sets
(X_train, Y_train), (X_test, Y_test) = mnist.load_data()
X_train, X_validate, Y_train, Y_validate = train_test_split(
    X_train, Y_train, test_size=0.2, random_state=789)

# Preprocessing
# Scale and reshape samples
X_train = (X_train.reshape(X_train.shape[0], -1).astype(np.float32) - 
            127.5) / 127.5
X_validate = (X_validate.reshape(X_validate.shape[0], -1).astype(np.float32) - 
            127.5) / 127.5   
X_test = (X_test.reshape(X_test.shape[0], -1).astype(np.float32) -
             127.5) / 127.5

# Instantiate the model
modelr = Model()

# Add layers
modelr.add(Layer_Dense(X_train.shape[1], 128))
modelr.add(Activation_ReLU())
modelr.add(Layer_Dense(128, 128))
modelr.add(Activation_ReLU())
modelr.add(Layer_Dense(128, 10))
modelr.add(Activation_Softmax())

# Set loss, optimizer and accuracy objects
modelr.set(
    loss=Loss_CategoricalCrossentropy(),
    optimizer=Optimizer_Adam(decay=1e-4),
    accuracy=Accuracy_Categorical()
)

# Finalize the model
modelr.finalize()

# Train the model
modelr.train(X_train, Y_train, validation_data=(X_validate, Y_validate),
            epochs=62, batch_size=128, print_every=200)

# Evaluate the model
print("Evaluation on test set")
modelr.evaluate(X_test, Y_test)

epoch: 1
step: 0, acc: 0.078, loss: 2.302 lr: 0.001
step: 200, acc: 0.891, loss: 0.379 lr: 0.000980392156862745
step: 374, acc: 0.875, loss: 0.502 lr: 0.0009639483323693849
training, acc: 0.789, loss: 0.670 lr: 0.0009639483323693849
validation, acc: 0.895, loss: 0.358
epoch: 2
step: 0, acc: 0.914, loss: 0.360 lr: 0.0009638554216867469
step: 200, acc: 0.914, loss: 0.251 lr: 0.0009456264775413711
step: 374, acc: 0.906, loss: 0.383 lr: 0.0009303190994511118
training, acc: 0.910, loss: 0.305 lr: 0.0009303190994511118
validation, acc: 0.921, loss: 0.264
epoch: 3
step: 0, acc: 0.906, loss: 0.262 lr: 0.0009302325581395349
step: 200, acc: 0.930, loss: 0.201 lr: 0.0009132420091324202
step: 374, acc: 0.930, loss: 0.301 lr: 0.0008989572096368213
training, acc: 0.933, loss: 0.228 lr: 0.0008989572096368213
validation, acc: 0.936, loss: 0.215
epoch: 4
step: 0, acc: 0.938, loss: 0.216 lr: 0.0008988764044943821
step: 200, acc: 0.922, loss: 0.152 lr: 0.0008830022075055188
step: 374, acc: 0.938, loss: 0