In [6]:
from __future__          import division
from scipy.stats         import zscore
import matplotlib.pyplot as plt
import numpy             as np
import array
import math
import struct
import random

TRAIN_IMAGES  = "C:\\Users\\oop\\Desktop\\Winter 2016\\train-images.idx3-ubyte"
TRAIN_LABELS  = "C:\\Users\\oop\\Desktop\\Winter 2016\\train-labels.idx1-ubyte"
TEST_IMAGES = "C:\\Users\\oop\\Desktop\\Winter 2016\\t10k-images.idx3-ubyte"
TEST_LABELS  = "C:\\Users\\oop\\Desktop\\Winter 2016\\t10k-labels.idx1-ubyte"

def read_mnist(images_file, labels_file): 
    f1 = open(labels_file, 'rb')
    magic_number, size = struct.unpack(">II", f1.read(8))
    labels = array.array("b", f1.read())
    f1.close()
    
    f2 = open(images_file, 'rb')
    magic_number, size, rows, cols = struct.unpack(">IIII", f2.read(16))
    raw_images = array.array("B", f2.read())
    f2.close()

    N = len(labels)
    images = np.zeros((N, rows*cols), dtype=np.uint8)
    for i in range(N):
        images[i] = np.array(raw_images[ i*rows*cols : (i+1)*rows*cols ])

    return images, labels

def sigmoid(x):
    if type(x) is np.ndarray or type(x) is list:
        return np.array([sigmoid(ele) for ele in x])
    else:
        return 1.0 / (1.0 + math.exp(-x))

def sigmoid_derivate(x):
    if type(x) is np.ndarray or type(x) is list:
        return np.array([sigmoid_derivate(ele) for ele in x])
    else:
        a = sigmoid(x)
        return a * (1-a)

def tanhx_activation_fn(x):
    return np.tanh(x)

def tanhx_activation_fn_derivative(x):
    return 1 - (np.tanh(x) ** 2)

def max_activation_fn(x):
    if type(x) is np.ndarray or type(x) is list:
        return np.array([max(0, ele) for ele in x])
    else:
        return max(0, x)

def max_activation_fn_derivative(x):
    if type(x) is np.ndarray or type(x) is list:
        return np.array([1 if ele > 0 else 0 for ele in x])
    else:
        return 1 if x > 0 else 0
    
class MultiLayerNeuralNetwork:
    
    def __init__(self, inputs, outputs, learning_rate, layers, 
                 activation_fn, activation_derivative_fn, 
                 validation_size, *args, **kwargs):
        train_data_size = len(inputs) - validation_size
        self.inputs = inputs[:train_data_size]
        self.outputs = outputs[:train_data_size]
        self.cross_validation_inputs = inputs[train_data_size:]
        self.cross_validation_outputs = outputs[train_data_size:]
        self.learning_rate = learning_rate
        self.layers = layers
        self.activation_fn = activation_fn
        self.activation_derivative_fn = activation_derivative_fn
    
    def get_zero_weights(self):
        weights = []
        for i in range(len(self.layers)-1):
            weights.append(np.zeros((self.layers[i]+1, 
                                     self.layers[i+1])))
        return weights
    
    def get_random_weights(self):
        weights = []
        for i in range(len(self.layers)-1):
            weights.append(0.1 * np.random.random((self.layers[i]+1, 
                                             self.layers[i+1])))
        return weights
    
    def get_gradients(self, X, Y, weights):
        # Forward propogation.
        a,z = self.forward_prop(X, Y, weights)
            
        # Backward error propogation.
        deltas = []
        deltas.append((Y - a[-1]))
        for l in reversed(range(1, len(self.layers)-1)):
            g = np.apply_along_axis(self.activation_derivative_fn, axis=1, arr=z[l])
            delta = np.matrix(np.array(np.matmul(deltas[-1], weights[l].transpose()))*np.array(g))
            deltas.append(np.delete(delta, 0, axis=1)) # Note that we remove deltas calculated for bias node.
        deltas.reverse()
            
        gradients = []
        for i in range(len(weights)):
            gradients.append(np.matmul(a[i].transpose(), deltas[i]))
        return gradients
            
    def train(self, 
              weights=None, 
              max_iterations=10000, 
              momentum=0.95, 
              lambda_val=0.001,
              batch_learning=False,
              mini_batch_learning=True,
              online_learning=False):
        """
        Trains the data using multilayered.
        """
        weights     = weights or self.get_random_weights()
        prev_weights = weights
        train_error = []
        cross_validation_error = []
        test_error  = []
        min_cross_validation_error = float("inf")
        optimum_weights = None
        count = 0
        k = 0
        for itr in range(0, max_iterations+1):
            if itr != 0:
                X = self.inputs
                Y = self.inputs
                if online_learning:
                    i = random.randint(0,len(self.inputs)-1)
                    X = self.inputs[i]
                    Y = self.outputs[i]
                    
                if mini_batch_learning:
                    # Mini Batch learning.
                    batch = list(set([random.randint(0,len(self.inputs)-1) 
                                      for i in range(500)]))
                    X = []
                    Y = []
                    for i in batch:
                        X.append(self.inputs[i])
                        Y.append(self.outputs[i])
                    X = np.matrix(np.array(X))
                    Y = np.matrix(np.array(Y))
                
                # Use gradient descent algorithm to update
                # accordingly due to error derivatives.
                gradients = self.get_gradients(X, Y, weights)
                for i in range(len(weights)):
                    gradienti = gradients[i] + lambda_val * weights[i]
                    new_weights     = (weights[i] + (self.learning_rate * gradienti)
                                       + (momentum * (weights[i] - prev_weights[i])))
                    prev_weights[i] = weights[i]
                    weights[i]      = new_weights
            
        
            error = self.test(self.inputs, 
                              self.outputs,
                              weights)
            
            train_error.append(error)

            error = self.test(self.cross_validation_inputs, 
                              self.cross_validation_outputs,
                              weights)
            if error < min_cross_validation_error:
                min_cross_validation_error = error
                count = 0
                optimum_weights = weights
            else:
                count += 1
            cross_validation_error.append(error)

            error = self.test(X_test, 
                              Y_test,
                              weights)
            test_error.append(error)
            
            print train_error[-1], cross_validation_error[-1], test_error[-1]

            # Validation set error is increasing.
            if count > 10:
                break
                
        self.weights     = optimum_weights
        self.train_error = train_error
        self.test_error  = test_error
        
        return weights
    
    def test_gradient(self):
        weights = self.get_random_weights()
        gradients = self.get_gradients(self.inputs,
                                       self.outputs, 
                                       weights)
        
        epsilon = 10**-5
        approximate_gradients = self.get_random_weights()
        for l in range(len(self.layers)-1):
            for i in range(len(weights[l])):
                for j in range(len(weights[l][i])):
                    w = weights[l][i][j]
                    weights[l][i][j] = w + epsilon
                    val1 = self.cross_entropy(weights)
                    weights[l][i][j] = w - epsilon
                    val2 = self.cross_entropy(weights)
                    weights[l][i][j] = w
                    approximate_gradients[l][i][j] = (val1 - val2) / (2*epsilon)
        
        # Find the min,max and avg difference 
        # between actual and approximate gradient
        no_of_units = 0
        diff_sum = 0
        max_diff = float("-inf")
        min_diff = float("inf")
        for l in range(len(gradients)):
            g = np.abs(gradients[l].flatten())
            a = np.abs(approximate_gradients[l].flatten())
            diff = np.abs(g-a)
            max_diff = max(max_diff, diff.max())
            min_diff = min(min_diff, diff.min())
            diff_sum += diff.sum()
            no_of_units += g.shape[0] * g.shape[1]
        avg_diff = diff_sum / no_of_units
        return max_diff, min_diff, avg_diff
            
    def cross_entropy(self, weights):
        a,z = self.forward_prop(self.inputs, self.outputs, weights)
        return -np.multiply(np.log(a[-1]), self.outputs).sum()

    def forward_prop(self, inputs, outputs, weights):
        """
        Calculates the output at each layer of the network.
        """
        a = [np.insert(inputs, 0, 1, axis=1)]
        z = [np.insert(inputs, 0, 1, axis=1)]
        for l in range(len(self.layers)-1):
            zl = np.matmul(z[l], weights[l])
            if l == (len(self.layers)-2):
                output = np.exp(zl)
                output = output / output.sum(axis=1)
                a.append(output)
                z.append(zl)
            else:
                z.append(np.insert(zl, 0, 1, axis=1))
                al = np.apply_along_axis(self.activation_fn, axis=1, arr=zl)
                a.append(np.insert(al, 0, 1, axis=1))
        return a,z
    
    def test(self, test_inputs, test_outputs, weights):
        a,z = self.forward_prop(test_inputs, test_outputs, weights)
        predicted_digits = a[-1].argmax(axis=1)
        actual_digits = test_outputs.argmax(axis=1)
        error = (predicted_digits != actual_digits).sum()
        return error * 100 / len(test_inputs)

In [7]:
# Read training data.
images_train, labels_train = read_mnist(TRAIN_IMAGES, TRAIN_LABELS)

# Read Test data.
images_test, labels_test = read_mnist(TEST_IMAGES, TEST_LABELS)

X_train = np.matrix(zscore(images_train, axis=1))
Y_train = np.matrix([np.array([1 if i == label else 0 
                               for i in range(10)]) 
                     for label in labels_train])
X_test = np.matrix(zscore(images_test, axis=1))
Y_test = np.matrix([np.array([1 if i == label else 0 
                              for i in range(10)]) 
                    for label in labels_test])

def get_mnn_nework():
    network = MultiLayerNeuralNetwork(inputs=X_train,
                                      outputs=Y_train,
                                      learning_rate=0.0003,
                                      layers=[784,150,10],
                                      activation_fn=sigmoid,
                                      activation_derivative_fn=sigmoid_derivate,
                                      validation_size=10000)
    return network

def test_cases():
    
    #Case d iii)
    network = get_mnn_nework()
    weights = network.train(mini_batch_learning=True, 
                            momentum=0, lambda_val=0)

    # Case e) 
    network = get_mnn_nework()
    weights = network.train(mini_batch_learning=True, 
                            momentum=0, 
                            lambda_val=0.001)
    weights = network.train(mini_batch_learning=True, 
                            momentum=0, lambda_val=0.0001)

    # Case f)
    network = get_mnn_nework()
    weights = network.train(mini_batch_learning=True, 
                            momentum=0.9, lambda_val=0)

    # Case g)
    network = get_mnn_nework()
    network.activation_fn = tanhx_activation_fn
    network.activation_derivative_fn = tanhx_activation_fn_derivative
    weights = network.train(mini_batch_learning=True, momentum=0, lambda_val=0)

    network.activation_fn = max_activation_fn
    network.activation_derivative_fn = max_activation_fn_derivative
    weights = network.train(mini_batch_learning=True, momentum=0, lambda_val=0)

    # Case h)
    # Hidden units are reduced by half i.e to 75.
    network = get_mnn_nework()
    network.layers=[784,75,10]
    weights = network.train(mini_batch_learning=True, momentum=0, lambda_val=0)

    # Hidden units are increased by double i.e to 300
    network = get_mnn_nework()
    network.layers=[784,300,10]
    weights = network.train(mini_batch_learning=True, momentum=0, lambda_val=0)


    # Hidden units are too low i.e 5(assuming that 5 is very low)
    network = get_mnn_nework()
    network.layers=[784,5,10]
    weights = network.train(mini_batch_learning=True, momentum=0, lambda_val=0)

    # Hidden units are too high i.e 10000(assuming that 10000 is high)
    network = get_mnn_nework()
    network.layers=[784,10000,10]
    weights = network.train(mini_batch_learning=True, momentum=0, lambda_val=0)
    
    # 2 hidden layers.
    network = get_mnn_nework()
    network.layers=[784,100,100,10]
    weights = network.train(mini_batch_learning=True, momentum=0, lambda_val=0)
    