# 2. Neural Networks

In this notebook, we will implement a fully-connected feed-forward neural network on the MNIST dataset and regularize it with early stopping, norm penalty, dropout and batch normalization.

### Import Statements

In [2]:
import random
import numpy as np
import Pickle

# Additional Funcitons
def sigmoid(z):
    """The sigmoid function."""
    return 1.0/(1.0+np.exp(-z))

def sigmoid_prime(z):
    """Derivative of the sigmoid function."""
    return sigmoid(z)*(1-sigmoid(z))

# check device
DEVICE = 'cpu'

### Global Variables

In [19]:
# Global Variables
BATCH_SIZE = 10
LEARNING_RATE = 0.5
N_EPOCHS = 50
RANDOM_SEED = 32
N_HIDDEN = 30

N_CLASSES = 10
N_INPUTS = 784

### Neural Network

In [20]:
"""Multi-class classification using Neural Network"""
class NN_iter(object):

    def __init__(self, sizes, l1 = 0, l2 = 0, batch_size = 64, learning_rate = 0.001, 
                 epochs = 100, loss = "cross", esp = 10, dropout_p = 0, batch_n = False):
        """Sizes contains the number of neurons in each layer including
        the input and output layer. """
        self.num_layers = len(sizes)
        self.sizes = sizes
        self.batch_size = batch_size
        self.alpha = learning_rate
        self.epochs = epochs
        self.loss = loss
        self.l1_coeff = l1
        self.l2_coeff = l2
        self.esp = esp
        self.dropout_p = dropout_p
        self.batch_n = batch_n
        self.biases = [np.random.randn(y, 1) for y in sizes[1:]]
        self.weights = [np.random.randn(y, x)
                        for x, y in zip(sizes[:-1], sizes[1:])]

    def feedforward(self, x):
        """Output of the neural network"""
        for b, w in zip(self.biases, self.weights):
            x = sigmoid(np.dot(w, x)+b)
        return x

    def fit(self, train_data):
        """Fit the weights and biases according to training data"""
        n = len(train_data)
        for j in range(self.epochs):
            random.shuffle(train_data)
            mini_batches = [train_data[k:k+ self.batch_size]
                for k in range(0, n, self.batch_size)]
            for mini_batch in mini_batches:
                self.SGD(mini_batch)
    
    def fit_early_stop (self, train_data, valid_data):
        n = len(train_data)
        i = 0;
        for j in range(self.epochs):
            last_loss = self.error(self.predict(valid_data), train_data[:, -1])
            random.shuffle(train_data)
            mini_batches = [training_data[k:k+ self.batch_size]
                for k in range(0, n, self.batch_size)]
            for mini_batch in mini_batches:
                self.SGD(mini_batch)
            curr_loss = self.error(self.predict(valid_data), train_data[:, -1])
            if (i > self.esp): break
            if (curr_loss > last_loss): 
                i += 1
            else:
                i = 0
            
    def predict (self, test_data):
        return [(np.argmax(self.feedforward(x)), y)
                        for (x, y) in test_data]
        
    def evaluate(self, test_data):
        """Check Number of correct outputs in Test Data"""
        test_results = self.predict(test_data)
        return sum(int(x == y) for (x, y) in test_results)

    def SGD(self, mini_batch):
        """Perform Stochstic Gradient Descent"""
        dropout_b = [np.random.binomial(1, 1-self.dropout_p, size = b.shape) for b in self.biases]
        dropout_w = [np.random.binomial(1, 1-self.dropout_p, size = w.shape) for w in self.weights]
        del_b = [np.zeros(b.shape) for b in self.biases]
        del_w = [np.zeros(w.shape) for w in self.weights]
        for x, y in mini_batch:
            addn_del_b, addn_del_w = self.backprop(x, y)
            del_b = [db+adb for db, adb in zip(del_b, addn_del_b)]
            del_w = [dw+adw for dw, adw in zip(del_w, addn_del_w)]
        self.weights = [(w-self.alpha*dw-self.apha*norm_pen_grad(w))*drop_w
                        for w, dw, drop_w in zip(self.weights, del_w, dropout_w)]
        self.biases = [(b-self.alpha*db)*drop_b
                       for b, db, drop_b in zip(self.biases, del_b, dropout_b)]
                         
    def norm_pen_grad (self, weight):
        return self.l1_coeff*np.sign(w) + self.l2_coeff*w
                         
    def backprop(self, x, y):
        """Return the gradient corresponding to one element using backpropogation"""
        del_b = [np.zeros(b.shape) for b in self.biases]
        del_w = [np.zeros(w.shape) for w in self.weights]
        # feedforward
        activation = x
        activations = [x] # list to store all the activations, layer by layer
        zs = [] # list to store all the z vectors, layer by layer
        for b, w in zip(self.biases, self.weights):
            z = np.dot(w, activation)+b
            zs.append(z)
            activation = sigmoid(z)
            activations.append(activation)
        # backward pass
        delta = delta_loss (zs[-1], activations[-1], y)
        del_b[-1] = delta
        del_w[-1] = np.dot(delta, activations[-2].transpose())
        for l in range(2, self.num_layers):
            z = zs[-l]
            sp = sigmoid_prime(z)
            delta = np.dot(self.weights[-l+1].transpose(), delta) * sp
            del_b[-l] = delta
            del_w[-l] = np.dot(delta, activations[-l-1].transpose())
        return (del_b, del_w)
    
    def delta_loss (self, z, a, y):
        """Partial derivative wrt cost function"""
        if (self.loss == "mse"):
            return (a-y) * sigmoid_prime (z)
        else: 
            return (a-y)

### Importing Dataset

In [32]:
def one_hot (y):
    oh = np.zeros((N_CLASSES, 1))
    oh[y] = 1.0
    return oh

def reshape_inputs_tiny (xtrain, ytrain, xtest, ytest):
    inp_tr = [np.reshape(xi, (N_INPUTS, 1)) for xi in xtrain]
    op_tr = [one_hot(yi) for yi in ytrain]
    train_set = zip (inp_tr, op_tr)
    inp_te = [np.reshape(xi, (N_INPUTS, 1)) for xi in xtest]
    test_set = zip (inp_te, ytest)
    return train_set, test_set

def reshape_inputs_mnist (train, test):
    inp_tr = [np.reshape(xi, (N_INPUTS, 1)) for xi in train[0]]
    op_tr = [one_hot(yi) for yi in train[1]]
    train_set = zip (inp_tr, op_tr)
    inp_te = [np.reshape(xi, (N_INPUTS, 1)) for xi in test[0]]
    test_set = zip (inp_te, test[1])
    return train_set, test_set

In [None]:
'''
import os

# Getting the TinyImageNet Dataset
here = os.path.dirname(os.path.realpath('__file__'))
subdir = "TinyImageNet"

# Training Set  features for Standard Neural Network (NN) 
filepath = os.path.join(here, subdir, "train_x.npy")
xtrain_cnn = np.load(filepath)
xtrain_bnn = xtrain_cnn.reshape(xtrain_cnn.shape[0], -1).tolist()

# Training Set labels 
filepath = os.path.join(here, subdir, "train_y.npy")
ytrain = np.load(filepath).tolist

# Test Set features for Standard Neural Network (NN)
filepath = os.path.join(here, subdir, "test_x.npy")
xtest_cnn = np.load(filepath)
xtest_bnn = xtest_cnn.reshape(xtest_cnn.shape[0], -1).tolist()

# Test Set labels 
filepath = os.path.join(here, subdir, "test_y.npy")
ytest = np.load(filepath)
print (ytest.shape)
ytest = ytest.tolist()

# Training Set and Test Set for NN
train_bnn, test_bnn = reshape_inputs_tiny(xtrain_bnn, ytrain, xtest_bnn, ytest)
#random.shuffle(train_bnn)
#random.shuffle(test_bnn)

#print (len(train_bnn))
#print (xtest_bnn.size)
#print (ytrain)
#print (ytest.size)
#file = open (filepath)
'''

In [None]:
f = gzip.open('../data/mnist.pkl.gz', 'rb')
trd1, vd1, ted1 = Pickle.load(f)
f.close()

train_set, test_set = reshape_inputs_mnist (trd1, ted1)


In [None]:
def get_accuracy_train(model, train_data):
    '''
    Function for computing the accuracy of the predictions over the training set
    '''
    n = len(train_data)
    train_results = [(np.argmax(self.feedforward(x)), np.argmax(y))
                        for (x, y) in train_data]
    return float(sum(int(x == y) for (x, y) in train_results))/float(n)


def get_accuracy_test (model, test_data)
    n = len(test_data)
    train_results = [(np.argmax(self.feedforward(x)), y)
                        for (x, y) in train_data]
    return float(sum(int(x == y) for (x, y) in train_results))/float(n)


def plot_losses(train_losses, valid_losses):
    '''
    Function for plotting training and test losses
    '''
    
    # temporarily change the style of the plots to seaborn 
    plt.style.use('seaborn')

    train_losses = np.array(train_losses) 
    valid_losses = np.array(valid_losses)

    fig, ax = plt.subplots(figsize = (8, 4.5))

    ax.plot(train_losses, color='green', label='Training loss') 
    ax.plot(valid_losses, color='orange', label='Testing loss')
    ax.set(title="Loss across epochs", 
            xlabel='Epoch',
            ylabel='Loss') 
    ax.legend()
    fig.show()
    
    # change the plot style to default
    plt.style.use('ggplot')

def train (train_set, model):
    '''
    Function for the training one epoch
    '''

    model.fit(train_set)
    running_loss = 0
    
    for X, y_true in train_set:  
        y_hat = np.argmax(self.feedforward(X)) 
        running_loss += (y_hat-y_true)**2
        
    epoch_loss = running_loss / len(train_set)
    return model, epoch_loss

def train_es (train_set, test_set, model):
    '''
    Function for the training one epoch
    '''

    model.fit_early_stop (train_set, test_set)
    running_loss = 0
    
    for X, y_true in train_set:  
        y_hat = np.argmax(self.feedforward(X)) 
        running_loss += (y_hat-y_true)**2
        
    epoch_loss = running_loss / len(train_set)
    return model, epoch_loss

def test_loss (test_set, model):
    '''
    Function for the getting test loss for each epoch
    '''
    running_loss = 0
    
    for X, y_true in train_set:  
        y_hat = np.argmax(self.feedforward(X)) 
        running_loss += (y_hat-y_true)**2
        
    epoch_loss = running_loss / len(test_set)
    return model, epoch_loss

def training_loop (model, train_set, test_set, epochs, print_every=1, es = False):
    '''
    Function defining the entire training loop
    '''
    
    # set objects for storing metrics
    best_loss = 1e10
    train_losses = []
    test_losses = []
 
    # Train model
    for epoch in range(0, epochs):

        # training
        model, train_loss = train (train_set, model) if (!es) else train_es (train_set, test_set, model)
        train_losses.append(train_loss)

        # test
        model, test_loss = test_loss(test_set, model)
        test_losses.append(valid_loss)

        if epoch % print_every == (print_every - 1):
            
            train_acc = get_accuracy_train(model, train_set)
            valid_acc = get_accuracy_test(model, test_set)
                
            print(f'{datetime.now().time().replace(microsecond=0)}     '
                  f'Epoch: {epoch}\t'
                  f'Train loss: {train_loss:.3f}\t'
                  f'Test loss: {valid_loss:.3f}\t'
                  f'Train accuracy: {100 * train_acc:.3f}\t'
                  f'Test accuracy: {100 * valid_acc:.3f}')
            
    plot_losses(train_losses, valid_losses)
    
    return model, (train_losses, valid_losses)

## Implementing Neural Network

### Unregularized

In [None]:
model = NN_iter ([N_INPUTS, N_HIDDES, N_OUTPUTS], learning_rate = LEARNING_RATE, epochs = 1, batch_size = BATCH_SIZE)

model = training_loop (model, train_set, test_set, N_EPOCHS, print_every = 1)

### Early Stopping

In [None]:
model = NN_iter ([N_INPUTS, N_HIDDEN, N_OUTPUTS], learning_rate = LEARNING_RATE, epochs = 1, batch_size = BATCH_SIZE)

model = training_loop (model, train_set, test_set, N_EPOCHS, print_every = 1, es = True)

### Norm Penalty

In [None]:
model = NN_iter ([N_INPUTS, N_HIDDEN, N_OUTPUTS], learning_rate = LEARNING_RATE, epochs = 1, batch_size = BATCH_SIZE)

model = training_loop (model, train_set, test_set, N_EPOCHS, print_every = 1, l1 = 5.0)

In [None]:
model = NN_iter ([N_INPUTS, N_HIDDEN, N_OUTPUTS], learning_rate = LEARNING_RATE, epochs = 1, batch_size = BATCH_SIZE)

model = training_loop (model, train_set, test_set, N_EPOCHS, print_every = 1, l2 = 5.0)

### Batch Normalization

In [None]:
model = NN_iter ([N_INPUTS, N_HIDDEN, N_OUTPUTS], learning_rate = LEARNING_RATE, epochs = 1, batch_size = BATCH_SIZE)

model = training_loop (model, train_set, test_set, N_EPOCHS, print_every = 1, batch_n = True)

### Dropout

In [None]:
model = NN_iter ([N_INPUTS, N_HIDDEN, N_OUTPUTS], learning_rate = LEARNING_RATE, epochs = 1, batch_size = BATCH_SIZE)

model = training_loop (model, train_set, test_set, N_EPOCHS, print_every = 1, drop_p = 0.015)

# 3. Learning ++

Here we use our neural network implemented for part 2 of the assignment as one of the two classifiers in 3.1 and the classifier in 3.4

## 3.1 Training on Three MNIST classes

Here we train out neural network on 3 classes of the MNIST dataset, taken in the ratio of 70:25:5 (3500, 1250 and 250 samples each) using Cross Entropy and Mean Squared Error Loss functions and then use L2 regularization to improve on accuracy metrics.

### Importing 3 classes from MNIST

We create a dataset having 70:20:5 split among 3 classes of the MNIST dataset and then do an 80:20 train-test split

In [None]:
def reshape_inputs_mnist_3class(tr_d, te_d):
    tot_data = list()
    trianing_data = list()
    test_data = list()
    c1 = 0
    c2 = 0
    c3 = 0
    for x, y in zip(tr_d[0], tr_d[1]):
        if (y == 1) and (c1 < 3500):
            tot_data.append([np.reshape(x, (784, 1)), y])
            c1 += 1
        if (y == 2) and (c2 < 1250):
            tot_data.append([np.reshape(x, (784, 1)), y])
            c2 += 1
        if (y == 3) and (c3 < 250):
            tot_data.append([np.reshape(x, (784, 1)), y])
            c3 += 1
    random.shuffle (tot_data)
    count = 0
    for x, y in tot_data:
        if (count <= 0.8*len(tot_data)):
            training_data.append([x, one_hot(y)])
        else:
            test_data.append([x, y])
        count += 1
    return (training_data, test_data)

f = gzip.open('../data/mnist.pkl.gz', 'rb')
trd2, vd2, ted2 = Pickle.load(f)
f.close()

train_set_mnist3, test_set_mnist3 = reshape_inputs_mnist_3class (trd2, ted2)

### Unregularized Cross Entropy Loss

In [None]:
model = NN_iter ([N_INPUTS, N_HIDDEN, N_OUTPUTS], learning_rate = LEARNING_RATE, epochs = 1, batch_size = BATCH_SIZE)

model = training_loop (model, train_set_mnist3, test_set_mnist3, N_EPOCHS, print_every = 1)

### Unregulaized MSE Loss

In [None]:
model = NN_iter ([N_INPUTS, N_HIDDEN, N_OUTPUTS], learning_rate = LEARNING_RATE, epochs = 1, batch_size = BATCH_SIZE)

model = training_loop (model, train_set_mnist3, test_set_mnist3, N_EPOCHS, print_every = 1, loss="mse")

### Regularized Cross Entropy Loss

In [None]:
model = NN_iter ([N_INPUTS, N_HIDDEN, N_OUTPUTS], learning_rate = LEARNING_RATE, epochs = 1, batch_size = BATCH_SIZE)

model = training_loop (model, train_set_mnist3, test_set_mnist3, N_EPOCHS, print_every = 1, l2=5.0)

### Regularized MSE Loss

In [None]:
model = NN_iter ([N_INPUTS, N_HIDDEN, N_OUTPUTS], learning_rate = LEARNING_RATE, epochs = 1, batch_size = BATCH_SIZE)

model = training_loop (model, train_set_mnist3, test_set_mnist3, N_EPOCHS, print_every = 1, loss="mse", l2=5.0)

## 3.2 Cross Training on MNIST and SVHN

Here we will train our neural network on MNIST and test on SVHN and vice-versa

### Importing SVHN

In [None]:
def reshape_inputs_svhn (tr_d, te_d):
    training_inputs = [np.reshape(x, (784, 1)) for x in tr_d[0]]
    training_results = [vectorized_result(y) for y in tr_d[1]]
    training_data = zip(training_inputs, training_results)
    test_inputs = [np.reshape(x, (784, 1)) for x in te_d[0]]
    test_data = zip(test_inputs, te_d[1])
    return (training_data, test_data)

train_data_raw = io.loadmat('../data/SVHN/train_32x32.mat')
test_data_raw = io.loadmat('../data/SVHN/test_32x32.mat')
tr_X = train_data_raw['X']
tr_y = train_data_raw['y']
tr_y.flatten()
tr_y[tr_y==10] = 0
te_X = test_data_raw['X']
te_y = test_data_raw['y']
te_y.flatten()
te_y[tr_y==10] = 0
trd3 = [tr_X.astype(float32), tr_y]
ted3 = [te_X.astype(float32), te_y]

train_set_svhn, test_set_svhn = reshape_inputs_svhn (trd3, ted3)

## 3.4 Training on 5 classes

Here we use 5 classes of the MNIST dataset to train the neural network and tested it on the test set. 

### Importing Dataset

We create a dataset such that the training data has 5 classes and the test data has the other five.

In [None]:
def reshape_inputs_mnist_5class():
    tr_d, va_d, te_d = load_data()
    training_data = list()
    test_data = list()
    for x, y in zip(tr_d[0], tr_d[1]):
        if (y < 5):
            training_data.append([np.reshape(x, (784, 1)), one_hot(y)])
    for x, y in zip(te_d[0], te_d[1]):
        if (y >= 5):
            test_data.append([np.reshape(x, (784, 1)), y])
    return (training_data, test_data)

f = gzip.open('../data/mnist.pkl.gz', 'rb')
trd4, vd4, ted4 = Pickle.load(f)
f.close()

train_set_mnist5, test_set_mnist5 = reshape_inputs_mnist_3class (trd4, ted4)

### Training and Testing

In [None]:
model = NN_iter ([N_INPUTS, N_HIDDEN, N_OUTPUTS], learning_rate = LEARNING_RATE, epochs = 1, batch_size = BATCH_SIZE)

model = training_loop (model, train_set_mnist5, test_set_mnist5, N_EPOCHS, print_every = 1)