In [1]:
import pandas as pd
from sklearn import datasets
import matplotlib.pyplot as plt
import numpy as np
from numpy.random import ranf
from sklearn import svm
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
import sys

import warnings

warnings.filterwarnings('ignore', 'Solver terminated early.*')

In [2]:
def load_mnist(path, kind='train'):
    from numpy import fromfile, uint8
    import os
    import struct

    labels_path = os.path.join(path, '%s-labels-idx1-ubyte' % kind)
    images_path = os.path.join(path, '%s-images-idx3-ubyte' % kind)
    with open(labels_path, 'rb') as lbpath:
        magic, n = struct.unpack('>II', lbpath.read(8))
        labels = fromfile(lbpath, dtype=uint8)
        with open(images_path, 'rb') as imgpath:
            magic, num, rows, cols = struct.unpack(">IIII", imgpath.read(16))
            images = fromfile(imgpath, dtype=uint8).reshape(len(labels), 784)
            images = ((images / 255.) - .5) * 2
    return images, labels

In [3]:
X_train_mnist, y_train_mnist = load_mnist('/Users/mukulsherekar/Downloads/mnist/', kind='train')
print(f'Rows= {X_train_mnist.shape[0]}, columns= {X_train_mnist.shape[1]}')

X_test_mnist, y_test_mnist = load_mnist('/Users/mukulsherekar/Downloads/mnist/', kind='t10k')
print(f'Rows= {X_test_mnist.shape[0]}, columns= {X_test_mnist.shape[1]}')

np.savez_compressed('mnist_scaled.npz',
                    X_train=X_train_mnist,
                    y_train=y_train_mnist,
                    X_test=X_test_mnist,
                    y_test=y_test_mnist)
mnist = np.load('mnist_scaled.npz')
X_train, y_train, X_test, y_test = [mnist[f] for f in ['X_train', 'y_train',
                                                       'X_test', 'y_test']]

print(X_train_mnist.shape)


Rows= 60000, columns= 784
Rows= 10000, columns= 784
(60000, 784)


In [4]:
### Implementing a multi-layer perceptron
class NeuralNetMLP(object):

    def __init__(self, n_hidden=30, epochs=100, eta=0.001, minibatch_size=1, seed=None):
        self.random = np.random.RandomState(seed)  # used to randomize weights
        self.n_hidden = n_hidden  # size of the hidden layer
        self.epochs = epochs  # number of iterations
        self.eta = eta  # learning rate
        self.minibatch_size = minibatch_size  # size of training batch - 1 would not work
        self.w_out, self.w_h1, self.w_h2 = None, None, None

    @staticmethod
    def onehot(_y, _n_classes):  # one hot encode the input class y
        onehot = np.zeros((_n_classes, _y.shape[0]))
        for idx, val in enumerate(_y.astype(int)):
            onehot[val, idx] = 1.0
        return onehot.T

    @staticmethod
    def sigmoid(_z):  # Eq 1
        return 1.0 / (1.0 + np.exp(-np.clip(_z, -250, 250)))

    def _forward(self, _X):  # Eq 2
        z_h1 = np.dot(_X, self.w_h1)  # input into hidden layer-1
        a_h1 = self.sigmoid(z_h1)  # activation of hidden layer-1

        z_h2 = np.dot(a_h1, self.w_h2)  # input into hidden layer-2
        a_h2 = self.sigmoid(z_h2)  # activation of hidden layer-2

        z_out = np.dot(a_h2, self.w_out) # input into output layer
        a_out = self.sigmoid(z_out)      # activation of output layer
        return z_h1, a_h1, z_h2, a_h2, z_out, a_out

    @staticmethod
    def compute_cost(y_enc, output):  # Eq 4
        term1 = -y_enc * (np.log(output))
        term2 = (1.0 - y_enc) * np.log(1.0 - output)
        cost = np.sum(term1 - term2)
        return cost

    def predict(self, _X):
        z_h1, a_h1, z_h2, a_h2, z_out, a_out = self._forward(_X)
        ypred = np.argmax(z_out, axis=1)
        return ypred

    def fit(self, _X_train, _y_train, _X_valid, _y_valid):
        import sys
        n_output = np.unique(_y_train).shape[0]  # number of class labels
        n_features = _X_train.shape[1]
        
        # weights of two hidden layers
        self.w_h1 = self.random.normal(loc=0.0, scale=0.1, size=(n_features, self.n_hidden))
        self.w_h2 = self.random.normal(loc=0.0, scale=0.1, size=(self.n_hidden, n_output))
        
        # weights of output layer
        self.w_out = self.random.normal(loc=0.0, scale=0.1, size=(n_output, self.n_hidden))

        y_train_enc = self.onehot(_y_train, self.n_hidden)  # one-hot encode original y
        
        for ei in range(self.epochs):  # Ideally must shuffle at every epoch
            indices = np.arange(_X_train.shape[0])
            for start_idx in range(0, indices.shape[0] - self.minibatch_size + 1, self.minibatch_size):
                batch_idx = indices[start_idx:start_idx + self.minibatch_size]

                # Backpropagation
                z_h1, a_h1, z_h2, a_h2, z_out, a_out = self._forward(_X_train[batch_idx])  # neural network model

                # output layer into 2nd hidden layer
                delta_out = a_out - y_train_enc[batch_idx]  

                grad_w_out = np.dot(a_h2.T, delta_out)  
                self.w_out -= self.eta * grad_w_out

                # 2nd hidden layer into 1st hidden layer
                sigmoid_derivative_h2 = a_h2 * (1.0 - a_h2)  
                delta_h2 = (np.dot(delta_out, self.w_out.T) * sigmoid_derivative_h2)  
                grad_w_h2 = np.dot(a_h1.T, delta_h2)  
                self.w_h2 -= self.eta * grad_w_h2  

                # 1st hidden layer into input layer
                sigmoid_derivative_h1 = a_h1 * (1.0 - a_h1)  
                delta_h1 = (np.dot(delta_h2, self.w_h2.T) * sigmoid_derivative_h1)  
                grad_w_h1 = np.dot(_X_train[batch_idx].T, delta_h1)  
                self.w_h1 -= self.eta * grad_w_h1

            # Evaluation after each epoch during training
            z_h1, a_h1, z_h2, a_h2, z_out, a_out = self._forward(_X_train)
            cost = self.compute_cost(y_enc=y_train_enc, output=a_out)
            y_train_pred = self.predict(_X_train)  # monitoring training progress through reclassification
            y_valid_pred = self.predict(_X_valid)  # monitoring training progress through validation
            train_acc = ((np.sum(_y_train == y_train_pred)).astype(float) / _X_train.shape[0])
            valid_acc = ((np.sum(_y_valid == y_valid_pred)).astype(float) / _X_valid.shape[0])
            sys.stderr.write('\r%d/%d | Cost: %.2f ' '| Train/Valid Acc.: %.2f%%/%.2f%% ' %
                             (ei + 1, self.epochs, cost, train_acc * 100, valid_acc * 100))
            sys.stderr.flush()
        return self


# Define and fit the neural network
nn = NeuralNetMLP(n_hidden=20, epochs=300, eta=0.0005, minibatch_size=100, seed=123)
nn.fit(X_train[:55000], y_train[:55000], X_train[55000:], y_train[55000:])


def get_acc(_y_test, _y_pred):
    return (np.sum(_y_test == _y_pred)).astype(float) / _y_test.shape[0]


300/300 | Cost: 13503.52 | Train/Valid Acc.: 96.66%/94.98% 