In [1]:
import pickle
import gzip
import random

from tqdm import tqdm
import numpy as np

## 1. Dataset
In order to train the model we we were use the MNIST data set. 
The first step is to load the input data.

In [2]:
def load_data(path):
    f = gzip.open(path)
    training_data, validation_data, test_data = pickle.load(f,encoding="latin1")
    f.close()
    return (training_data, validation_data, test_data)

training_data, validation_data, test_data = load_data('mnist.pkl.gz')


## 2. Data processing
The MNIST dataset requires minimal processing. We will transform the labels into one hot vectors for convienient comparisons with the model output.

In [3]:
def one_hot(j):
    r = np.zeros((10,1))
    r[j] = 1
    return r

def process_data(data, one_hot_output=False):
    input_data = [np.reshape(i, (1, 784)) for i in data[0]]
    
    if one_hot_output:
        output_data = [one_hot(l) for l in data[1]]
    else:
        output_data = [l for l in data[1]]
    
    return (input_data, output_data)


## 3. Batch generation

We will also define a function that produces random batches of training data, this will be important later for implementing the stochastic gradient descent.

In [4]:
def get_batch(input_data, output_data, batch_size):
    
    n = len(input_data)
    idx = np.random.choice(n, batch_size)
    input_batch = [input_data[i] for i in idx]
    output_batch = [output_data[i] for i in idx]
    
    x_batch = np.reshape(input_batch, [batch_size, 784])
    y_batch = np.reshape(output_batch, [batch_size, 10])
    
    return x_batch, y_batch 

## 4. Model definition

First we need to define a cost functions. For experimentation we will use both MSE and Cross Entropy cost functions.

In [5]:
class MSE(object):
    @staticmethod
    def delta(output_activations, y, z):
        """
        Returns derivative of cost function with respect to the activation 
        multiplied by the derivative of the activation with respect to z(z = w*aL-1 + b).
        delta = dC/daL * daL/dzL
        """
        return (output_activations-y) * sigmoid_prime(z)
    
class CrossEntropy:
    @staticmethod
    def delta(output_activations, y, z):
        """
        Returns derivative of cost function with respect to the activation 
        multiplied by the derivative of the activation with respect to z(z = w*aL-1 + b).
        delta = dC/daL * daL/dzL
        """
        return (output_activations-y)
    

Now we are ready to start building the neural network used for digit recognition. We will first define the model class that will hold information about the cost function, layers arrangement, weights and biases. The weights and biases will be initialized with random numbers on model creation. 

In [6]:
class Model(object):
    def __init__(self, layers, cost=CrossEntropy):
        self.layers_count = len(layers)
        self.layers = layers
        self.cost = cost
        self.biases = [np.random.randn(1, layer) for layer in layers[1:]]
        self.weights = [np.random.randn(y, x) for x, y in zip(layers[:-1], layers[1:])]


In order to define forward propagation and backprogagation we will also need some activation function. The sigmoid function was chosen for this purposes.

In [7]:
def sigmoid(z):
    return 1.0/(1.0+np.exp(-z))

def sigmoid_prime(z):
    return sigmoid(z)*(1-sigmoid(z))


## 5. Forward pass

Now we are ready to define the forward pass:

In [8]:
def call(model, x):
    
    for w, b in zip(model.weights, model.biases):
        z = np.dot(x, w.T) + b
        x = sigmoid(z)
    return x


## 6. Training definition

In order to obtain some meanigful results, we first need to train the model. Here we define a single training step. The function is called with a batch of inputs and labels. Then it uses backpropagation to compute the gradient and update the weights and biases accordingly.

In [9]:
def training_step(model, input_batch, output_batch, learning_rate):
            
    delta_nabla_b, delta_nabla_w = backprop(model, input_batch, output_batch)
        
    model.weights = [w-(learning_rate/len(input_batch))*nw
        for w, nw in zip(model.weights, delta_nabla_w)]
    model.biases = [b-(learning_rate/len(input_batch))*nb
        for b, nb in zip(model.biases, delta_nabla_b)]


The core of the trainig_step function is the backpropagation algorithm, which is implemented by the backprop function.
First we initialize the forward pass trough the network saving the resulting activations. Then in the reverse order we compute the partial derivates and store them in the gradient vector.

In [10]:
def backprop(model, x, y):
        
    nabla_b = [np.zeros(b.shape) for b in model.biases]
    nabla_w = [np.zeros(w.shape) for w in model.weights]
    activation = x
    activations = [x]
    zs = []
    
    
    #Forward pass    
    for w, b in zip(model.weights, model.biases):
        z = np.dot(activation, w.T) + b
        zs.append(z)
        activation = sigmoid(z)
        activations.append(activation)
            
    #Backward pass        
    delta = model.cost.delta(activations[-1], y, zs[-1])
    nabla_b[-1] = np.sum(delta, axis=0)
    nabla_w[-1] = np.dot(delta.T, activations[-2])
    
    for l in range(2, model.layers_count):
        z = zs[-l]
        delta = np.dot(delta, model.weights[-l+1]) * sigmoid_prime(z)
        nabla_b[-l] = np.sum(delta, axis=0)
        nabla_w[-l] = np.dot(delta.T, activations[-l-1])
        
    return (nabla_b, nabla_w)


## 7. Evaluation definition

In order to test our solution we will define a simple function that check model's output against the correct labels.

In [11]:
def evaluate(model, test_inputs, test_outputs):
    n = len(test_inputs)
    return sum([test_outputs[i] == np.argmax(call(model, test_inputs[i])) for i in range(n)])

# 6. Training

Now we are ready to start training our model. We need to specify the batch size, number of iterations and the learning rate, and launch the training.

In [12]:
batch_size = 30
max_iterations = 100000
learning_rate = 2.0
epoch = 5000

prev_correct = 0
break_counter = 0
max_decrease = 2

model = Model([784,300,100,10])
training_inputs, training_outputs = process_data(training_data, one_hot_output=True)
validatation_inputs, validation_outputs = process_data(validation_data, one_hot_output=False)

if hasattr(tqdm, '_instances'): tqdm._instances.clear()
for i in tqdm(range(max_iterations)):
    x, y = get_batch(training_inputs, training_outputs, batch_size)
    training_step(model, x, y, learning_rate)

    #Early stopping, if the accuracy on validation set is decrasing stop the training
    if i % epoch==0:
        correct = evaluate(model, validatation_inputs, validation_outputs)
        if correct < prev_correct:
            break_counter+=1
        else:
            break_counter = 0
        if break_counter == max_decrease:
            print("Stopped early")
            break;
        prev_correct = correct


 55%|████████████████████████████████████████▋                                 | 55000/100000 [02:16<01:51, 403.99it/s]

Stopped early





# 7. Testing
After the training is completed, we can use the test set in order to check how good our model has become at recognizing digits.

In [13]:
test_inputs, test_outputs = process_data(test_data, one_hot_output=False)
correct = evaluate(model, test_inputs, test_outputs)


print(f'The model classified correctly {correct} out of {len(test_inputs)} test examples')


The model classified correctly 9717 out of 10000 test examples
