# Implementing a neural network from scratch

Object oriented approach.

Technically a basic multilayer perceptron.

Testing this out on MNIST (the handwritten digit dataset), then will convert it into a Tensorflow NN. Current plan is to use Tensorflow.js and create a web app that shows the training process in real-time with D3 + React. Might also do a visualization in 3dsmax/cinema4d or something.

The data that this network exports can come in two forms

1. Entire training history: on each training iteration, all of the weights, the activation values and processed inputs for each neuron, the prediction, cost, etc are all recorded. This is essentially a snapshot of the entire network.

2. Results on each iteration: only the cost, prediction, and expected value are recorded. This is much lighter weight and can be exported as a csv.

TO IMPLEMENT:
1. Stochastic gradient descent. Currently the network is just randomly shuffling all of the data and going through each example.

In [1]:
import math
import numpy as np
from sklearn.datasets import fetch_mldata
from sklearn.utils import shuffle
import json
import pandas as pd

In [2]:
"""
Load the MNIST dataset
Contains 70,000 examples of hand-written digits in 28x28 pixel form (784 item array)
So the shape is (70000, 784)
"""
mnist = fetch_mldata('MNIST original')

# The original data
# Stored as a 70,000 x 784 array
data = mnist["data"]

# Normalize the data because every grayscale value is out of 255
data = data/255

# Take first 60,000 samples for training, last 10,000 for testing
sample_size = 60000
training_data = data[:sample_size]
testing_data = data[sample_size:]

print("Total data length:",len(data))
print("Training data length:",len(training_data))
print("Testing data length:",len(testing_data))

# The target labels for classifying the data
# These are used to actually train the network for a given input
targets = mnist["target"]
target_vals = list(set(targets))

# use the training targets to calculate output error during back propogation
training_targets = targets[0 : sample_size]

# use the testing targets to predict accuracy
testing_targets = targets[sample_size : len(targets)]

# shuffle the training data and targets so it doesn't overfit
training_data, training_targets = shuffle(training_data, training_targets, random_state=0)

print("Targets: ", target_vals)

Total data length: 70000
Training data length: 60000
Testing data length: 10000
Targets:  [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]


In [3]:
class NeuralNetwork:
    """
    A basic 3 layer feed forward neural network
    
    The goal here is to abstract each component of the network (neuron, layer, synapses)
    to a high level to easily visualize how the network operates and how it trains
    
    The network records the state of the weights/neurons at each epoch and exports
    a JSON file representing the entire training process
    """
        
    def __init__(self, input_size, hidden_size, output_size):
        """
        Initializes the network given input layer,
        hidden layer, and output layer neuron size
        """
        
        # DEFINE LAYERS
        # Each layer is an array of Neuron objects
        self.input_layer = Layer(size=input_size, name="Input")
        self.hidden_layer = Layer(size=hidden_size, name="Hidden")
        self.output_layer = Layer(size=output_size, name="Output")
        
        # DEFINE SYNAPSES
        # Represented by matrices
        # input -> hidden layer synapses
        self.input_to_hidden_synapses = Synapses(input_size, hidden_size)
        
        # hidden -> output synapses
        self.hidden_to_output_synapses = Synapses(hidden_size, output_size)
        
        # States stores "snapshots" of the network at each training iteration
        self.training_states = []

        # Results stores the results of eachiteration of the network (a slightly minified state)
        self.results = []
        
        #store the initial state of the network
        self.initial_state = {
            "w0": self.input_to_hidden_synapses.weights.tolist(),
            "w1": self.hidden_to_output_synapses.weights.tolist()
        }
    
    def __str__(self):
        """
        Returns a string representation of the network
        Prints the layers with the synapses in between
        """
        delim = "\n"
        return delim.join([str(layer) for layer in self.layers])
        
    #########################################
    # MARK: Training and processing functions
    #########################################
    
    def test(self, data, targets):
        """
        Tests the network on an input data set
        """
        # keeps the count of correct guesses
        correct = 0
        
        for i in range(len(targets)):
            
            test_sample = data[i]
            prediction = self.forward_propagate(test_sample)
            
            expected_digit = targets[i]
            predicted_digit = np.argmax(prediction)
                        
            if expected_digit == predicted_digit:
                correct+=1
        
        accuracy = correct / len(targets)
        return accuracy
            
        
    def train(self, data, targets, iterations, learning_rate):
        """
        Trains the network
        
        Data:
        The training data represented as 784-dimensional vectors
        
        Targets:
        The sample labels
        """
                
        for i in range(iterations):
                        
            # grab the expected result from the targets/labels array
            self.expected = self.vectorize_digit(int(targets[i]))
            
            # grab the relevant training sample to test against the expected label
            self.training_sample = data[i]
            
            # calculate the predicted value via forward propogation
            self.prediction = self.forward_propagate(self.training_sample)

            # cost vector
            costv = self.expected - self.prediction
            
            # cost is the sum of the squares of the differences
            self.cost = sum([(num)**2 for num in costv])
                
            # update the weights of the network via backward propogation
            self.back_propagate(self.training_sample, self.expected, self.prediction, learning_rate)
            
            # Save the state every 1000 iterations
            # print out the cost of the network every 1000 iterations
            if i % 1000 == 0:
                print("Iteration {} | cost: {}".format(i, self.cost))
                self.save_state()

            # on every iteration, save the predictions, costs, and expected
            self.save_result()
            
    def forward_propagate(self, data):
        """
        Passes an input data vector (an image grid, a list of tuples, etc) through
        the network and outputs a prediction from the output layer
        
        rewrite eventually to handle amount of layers
        """
                    
        # First, multiply the data and synapse matrices to sum up each
        # input / weight combination
        self.input_to_hidden_sum = np.dot(data,
                                          self.input_to_hidden_synapses.weights)

        # Pass the entire summed matrix into the sigmoid function
        self.input_to_hidden_activated = self.activate(self.input_to_hidden_sum)

        # Multiply the activated input/hidden layer by
        # the second set of weights

        self.hidden_to_output_sum = np.dot(self.input_to_hidden_activated,
                                           self.hidden_to_output_synapses.weights)
        
        # Set the states of each neuron after processing and activation
        for i, n in enumerate(self.hidden_layer.neurons):
            processed = self.input_to_hidden_sum[i]
            activated = self.input_to_hidden_activated[i]
            n.set_state(processed,activated)

        # Finally, pass the last sum of hidden -> output neurons
        # into the sigmoid
        output = self.activate(self.hidden_to_output_sum)
        
        return output
    
    
    # FIXME: Implement learning rate
    def back_propagate(self, data, expected, output, learning_rate):
        """
        Updates the weights of the network based on training data to make the
        network more accurate        
        """
        data = data.reshape(len(data), 1)
            
        output_err = expected - output
        output_delta = output_err*self.activate(output, deriv=True)
        
        # Calculate the hidden layer errors and deltas
        hidden_layer_err = np.dot(output_delta, self.hidden_to_output_synapses.weights.T)
        hidden_layer_delta = hidden_layer_err*self.activate(self.input_to_hidden_activated, deriv=True)
        
        # MARK: Reshape
        # Need to convert into 1-dimensional vectors so the matrix multiplication works
        hidden_layer_delta = hidden_layer_delta.reshape(1, len(hidden_layer_delta))
        output_delta = output_delta.reshape(1, len(output_delta))
        
        hidden_to_input = self.input_to_hidden_activated.T
        hidden_to_input = hidden_to_input.reshape(len(hidden_to_input), 1)
        
        # Final new calculations
        new_input_to_hidden = np.dot(data, hidden_layer_delta)
        new_hidden_to_output = np.dot(hidden_to_input, output_delta)

        # Update the weights
        self.input_to_hidden_synapses.update(new_input_to_hidden)
        self.hidden_to_output_synapses.update(new_hidden_to_output)
    
    
    def activate(self, x, deriv=False):
        """
        activation sigmoid function
        takes in the summed weights * inputs
        """
        if deriv:
            return x*(1-x)
        return 1/(1+np.exp(-x))
    
    
    def vectorize_digit(self, digit):
        """ 
        converts a digit into the 10 dimensional column vector used by the NN
        """
        v = np.zeros(10)
        v[digit] = 1
        return v
    
    ##########################################
    # MARK: Export and visualization functions
    ##########################################
    
    def save_state(self):
        """
        Saves the current state of the network
        Essentially a snapshot of the network throughout a single training example
        """
        current_state = {
            "w0": self.input_to_hidden_synapses.weights,
            "w1": self.hidden_to_output_synapses.weights,
            "hidden": [n.state for n in self.hidden_layer.neurons],
            "data": self.training_sample,
            "prediction": self.prediction,
            "expected": self.expected,
            "cost": self.cost
        }
        self.training_states.append(current_state)

    def save_result(self):
        current_result = {
            "prediction": self.prediction,
            "expected":self.expected,
            "cost": self.cost
        }
        self.results.append(current_result)
    

    def export_predictions(self, file):
        """
        Exports the predictions, expectations, and the cost of the network
        as a CSV 
        """
        df = pd.DataFrame(self.results)
        df.to_csv("{}.csv".format(file))


    def export_network(self, file):
        """
        Exports the network as a list of objects that
        represent the entire state of the network
        
        Can use this for visualization purposes
        
        1. Network object:
        {
           { Initial state }, -> Object that represents the initial weights of the nework
            [ -> List of training states, costs, predictions
                { State 1 }, 
                { State 2 },
                { State 3 },
                ...
                { State n (n = number of epochs) }
            ]
        }
    
            Internal state object:
            {
                 input_to_hidden_synapses: [], -> matrix of weights
                 hidden_to_output_synapses: [], -> matrix of weights
                 hidden_layer: [] -> list of tuples: (sum, activated_sum)
            }
        """
        
        # Store the entire network as an object
        n = {
            "initial_state": self.initial_state,
            "training_states": self.training_states
        }
                
        print("Exporting network")

        # Exports a pretty printed version of the network
        with open("{}-indented.json".format(file), 'w') as f:
            output = json.dump(n, f, cls=NumpyEncoder, indent=4, ensure_ascii=False, separators=(',', ': '))

        # Exports a minified version of the network (about half the size)
        with open("{}-min.json".format(file), 'w') as f:
            output = json.dump(n, f, cls=NumpyEncoder, ensure_ascii=False)
            
        print("Done exporting")

In [4]:
class Layer:
    """
    Stores a list of neurons
    Initialized with the number of neurons in the layer
    """
    def __init__(self, size, name):
        self.name = name
        self.neurons = [Neuron(neuron_id=i) for i in range(size)]
    
    def __str__(self):
        """
        Prints out the name of the layer and the neurons
        """
        s = "Layer: {}\n".format(self.name)
        delim = "\n"           
        return s + delim.join([str(n) for n in self.neurons])

In [5]:
class Neuron:
    """
    Stores all of the previous values for visualization
    This way we can visualize the process of the data actually being passed into the neuron
    and then being processed
    """
    
    # 'states' stores all of the previous values for visualization
    # Format: [(sum, activated_sum)] list of tuples containing the sum, then activated sum
    # This way we can visualize the process of the data actually being passed into the neuron
    # and then being processed
        
    def __init__(self, neuron_id):
        # Stores a single integer as the neuron's ID to identify which weights are relevant to it
        # during forward propogation
        self.neuron_id = neuron_id
        
        # Stores all of the previous values for visualization
        # Format: [(sum, activated_sum)] list of tuples containing the sum, then activated sum
        # This way we can visualize the process of the data actually being passed into the neuron
        # and then being processed
        self.state = { "processed": 0, "activated" : 0 }
    
    def set_state(self, processed, activated):
        """
        Saves the current state of the neuron into the
        states array given the processed and activated values
        """
        self.state["processed"] = processed
        self.state["activated"] = activated     
    
    def __str__(show_data=False):
        """
        Prints out the number of the neuron
        Shows the current sum and activated sum if specified
        """
        space = "    "
        return space + self.state

In [6]:
class Synapses:
    """ 
    The connections in between layers
    Represented as a matrix of weights
    
    The synapses store all of its previous weights to visualize the training process
    """
    def __init__(self, rows, colums):
        # this initializes the weights to a random Column-d array
        self.weights = np.random.randn(rows, colums)
        
    # updates the weights
    def update(self, new_weights):
        self.weights += new_weights

In [7]:
class NumpyEncoder(json.JSONEncoder):
    """ Special json encoder for numpy types """
    def default(self, obj):
        if isinstance(obj, (np.int_, np.intc, np.intp, np.int8,
            np.int16, np.int32, np.int64, np.uint8,
            np.uint16, np.uint32, np.uint64)):
            return int(obj)
        elif isinstance(obj, (np.float_, np.float16, np.float32, 
            np.float64)):
            return float(obj)
        elif isinstance(obj,(np.ndarray,)): #### This is the fix
            return obj.tolist()
        return json.JSONEncoder.default(self, obj)

In [8]:
# Create an example network 
# input = 784 dimensional image vector
# 15 hidden neurons
# output = 10 neurons for each possible digit

nn = NeuralNetwork(input_size=784,
                   hidden_size=15,
                   output_size=10)

In [9]:
# Train the network
nn.train(data=training_data,
         targets=training_targets,
         iterations=60000,
         learning_rate=0.01)

Iteration 0 | cost: 4.4666539435599235
Iteration 1000 | cost: 1.0298489734064151
Iteration 2000 | cost: 0.15361936086584804
Iteration 3000 | cost: 0.4933474858380852
Iteration 4000 | cost: 0.03721624842163593
Iteration 5000 | cost: 0.320524226943579
Iteration 6000 | cost: 0.13110371556643413
Iteration 7000 | cost: 0.026976019033628
Iteration 8000 | cost: 0.0056304523954631555
Iteration 9000 | cost: 0.752487029584936
Iteration 10000 | cost: 0.0017410397266728393
Iteration 11000 | cost: 1.0383672372902366
Iteration 12000 | cost: 0.006072066707377591
Iteration 13000 | cost: 0.4736000157292362
Iteration 14000 | cost: 0.009530840103221874
Iteration 15000 | cost: 0.025570190964187983
Iteration 16000 | cost: 0.0011512122574391405
Iteration 17000 | cost: 0.0005504488141769038
Iteration 18000 | cost: 0.00048519265853783913
Iteration 19000 | cost: 0.02254373945039315
Iteration 20000 | cost: 0.0026891240344524593
Iteration 21000 | cost: 0.0007933383551697551
Iteration 22000 | cost: 0.000447622785

In [10]:
# Test the network
accuracy = nn.test(data=testing_data,
                   targets=testing_targets)

print("Network accuracy: ", accuracy)

Network accuracy:  0.8928


In [11]:
nn.export_predictions("trained")