In [24]:

# matplotlib for plotting
import matplotlib
matplotlib.rcParams['figure.figsize'] = (10.0, 10.0)
from matplotlib import pyplot as plt

# numpy for vector and matrix manipulations
import numpy as np

# for data manipulation
from sklearn.metrics import accuracy_score
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split



In [25]:
def activation(z, type):
    """
    Activation function that takes input z and returns activated z 
    Sigmoid: output lies between zero and 1
    Tanh: output lies between -1 and 1
    """
    if type == 'sigmoid':
        return 1 / (1 + np.exp(-z))
    if type == 'tanh':
        return np.tanh(z)

def activation_derivative(z, type):
    '''
    Finds the derivative of the activation function, given z
    From the definition f'(x) = f(x) * (1 - f(x))
    '''
    if type == 'sigmoid':
        sig = activation(z, 'sigmoid')
        return sig * (1 - sig)
    if type == 'tanh':
        tanh = activation(z, 'tanh')
        return tanh * (1 - tanh)

def mse(y_true, y_pred):
    """
    Calculates the Mean Squared Error between the predicted value and the ground truth
    - for calculating loss
    """
    n = y_pred.shape[1]
    cost = (1./(2*n)) * np.sum((y_true - y_pred) ** 2)
    return cost

def sensitivity(y_true, y_pred):
    """
    Calculates the difference between predicted and ground truth 
    """
    cost_prime = y_pred - y_true
    return cost_prime


In [26]:
class NeuralNetwork(object):

    def __init__(self, size, seed=86):
        '''
        Instantiate the weights and biases of the network
        that will be updated during training
        '''
        self.seed = seed
        np.random.seed(self.seed)
        self.size = size
        self.weights = [np.random.randn(self.size[i], self.size[i-1]) * np.sqrt(1 / self.size[i-1]) for i in range(1, len(self.size))]
        self.biases = [np.random.rand(n, 1) for n in self.size[1:]]

    def forward_propagation(self, input):
        '''
        Computes the forward propagation matrix calculations for a given input
        '''
        a = input
        pre_activations = []
        activations = [a]
        for w, b in zip(self.weights, self.biases):
            z = np.dot(w, a) + b
            a  = activation(z, 'sigmoid')
            pre_activations.append(z)
            activations.append(a)
        return a, pre_activations, activations

    def compute_deltas(self, pre_activations, y_true, y_pred):
        """
        Calculates the delta between layers
        """
        delta_L = sensitivity(y_true, y_pred) * activation_derivative(pre_activations[-1], 'sigmoid')
        deltas = [0] * (len(self.size) - 1)
        deltas[-1] = delta_L
        for l in range(len(deltas) - 2, -1, -1):
            delta = np.dot(self.weights[l + 1].transpose(), deltas[l + 1]) * activation_derivative(pre_activations[l], 'sigmoid') 
            deltas[l] = delta
        return deltas

    def backpropagate(self, deltas, pre_activations, activations):
        """
        Computes the derivative of the loss w.r.t weight and bias
        """
        dW = []
        db = []
        deltas = [0] + deltas
        for l in range(1, len(self.size)):
            dW_l = np.dot(deltas[l], activations[l-1].transpose()) 
            db_l = deltas[l]
            dW.append(dW_l)
            db.append(np.expand_dims(db_l.mean(axis=1), 1))
        return dW, db

    def train(self, X, y, batch_size, epochs, learning_rate, validation_split=0.2, print_every=10, plot_every=None):
        """
        Trains the network and outputs accuracy and loss
        """

        x_train, x_test, y_train, y_test = train_test_split(X.T, y.T, test_size=validation_split, )
        x_train, x_test, y_train, y_test = x_train.T, x_test.T, y_train.T, y_test.T 
            
        epoch_iterator = range(epochs)

        for e in epoch_iterator:

            # calculate the number of batches
            if x_train.shape[1] % batch_size == 0:
                n_batches = int(x_train.shape[1] / batch_size)
            else:
                n_batches = int(x_train.shape[1] / batch_size ) - 1

            # randomize and transpose data
            x_train, y_train = shuffle(x_train.T, y_train.T)
            x_train, y_train = x_train.T, y_train.T

            # list of batches based on the inputted batch size
            batches_x = [x_train[:, batch_size*i:batch_size*(i+1)] for i in range(0, n_batches)]
            batches_y = [y_train[:, batch_size*i:batch_size*(i+1)] for i in range(0, n_batches)]

            # for tracking accuracy/loss
            train_losses, train_accuracies, test_losses, test_accuracies = [], [], [], []

            # hold averages
            average_train_losses, average_train_accuracies, average_test_losses, average_test_accuracies = [], [], [], []

            dw_per_epoch = [np.zeros(w.shape) for w in self.weights]
            db_per_epoch = [np.zeros(b.shape) for b in self.biases] 
            
            for batch_x, batch_y in zip(batches_x, batches_y):
                batch_y_pred, pre_activations, activations = self.forward_propagation(batch_x)
                deltas = self.compute_deltas(pre_activations, batch_y, batch_y_pred)
                dW, db = self.backpropagate(deltas, pre_activations, activations)
                for i, (dw_i, db_i) in enumerate(zip(dW, db)):
                    dw_per_epoch[i] += dw_i / batch_size
                    db_per_epoch[i] += db_i / batch_size

                batch_y_train_pred = self.predict(batch_x)

                train_loss = mse(batch_y, batch_y_train_pred)
                train_losses.append(train_loss)
                train_accuracy = accuracy_score(batch_y.T, batch_y_train_pred.T)
                train_accuracies.append(train_accuracy)

                batch_y_test_pred = self.predict(x_test)

                test_loss = mse(y_test, batch_y_test_pred)
                test_losses.append(test_loss)
                test_accuracy = accuracy_score(y_test.T, batch_y_test_pred.T)
                test_accuracies.append(test_accuracy)


            # weight update
            for i, (dw_epoch, db_epoch) in enumerate(zip(dw_per_epoch, db_per_epoch)):
                self.weights[i] = self.weights[i] - learning_rate * dw_epoch
                self.biases[i] = self.biases[i] - learning_rate * db_epoch

            average_train_losses.append(np.mean(train_losses))
            average_train_accuracies.append(np.mean(train_accuracies))
            
            average_test_losses.append(np.mean(test_losses))
            average_test_accuracies.append(np.mean(test_accuracies))

        history = {'epochs': epochs,
                   'train_loss': average_train_losses, 
                   'train_acc': average_train_accuracies,
                   'test_loss': average_test_losses,
                   'test_acc': average_test_accuracies
                   }
        return history

    def predict(self, a):
        '''
        Make a prediction on the trained network for a given state 'a'
        '''
        for w, b in zip(self.weights, self.biases):
            z = np.dot(w, a) + b
            a = activation(z, 'sigmoid')
        predictions = (a > 0.5).astype(int)
        return predictions

In [27]:
'''
Used for testing against sklearn datasets
'''
from sklearn import datasets

data = datasets.make_moons(n_samples=1000, noise=0.1)
X = data[0].T
y = np.expand_dims(data[1], 1).T

# takes a list with num layers = length of list and num nodes for layer i = list[i-1]
neural_net = NeuralNetwork([2, 4, 2, 1], seed=2)
history = neural_net.train(X=X, y=y, batch_size=32, epochs=500, learning_rate=0.4, print_every=200, validation_split=0.2, plot_every=15)
print('Test Accuracy:', history['test_acc'][-1],
	'\nTrain Accuracy:', history['train_acc'][-1],
	'\nTest Loss:', history['test_loss'][-1],
	'\nTrain Loss:', history['train_loss'][-1])

Test Accuracy: 0.98 
Train Accuracy: 0.985 
Test Loss: 0.01 
Train Loss: 0.0075
