#MNIST Classification using Fully Connected Neural Nets

This follows the same neural nets as implemented in [mnist0.ipynb](https://colab.research.google.com/drive/1gmXteU_qhI1RP6CDhEoHFcSK_MsYfqZX?authuser=1#scrollTo=Iaxyao85dvcX), but using Keras API for loading data only. The construction and training of the neural nets are custom implementations herein. 

In [4]:
import tensorflow as tf
import numpy as np

Load the MNIST dataset. 

In [5]:
mnist = tf.keras.datasets.mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()
# x_train, x_test: uint8 arrays of grayscale image data with shapes  (num_samples, 28, 28).  
# y_train, y_test: uint8 arrays of digit labels (integers in range 0-9) with shapes (num_samples,).  
print('x_train (training data) dimensions: ', x_train.shape)
print('x_test (test data) dimensions: ', x_test.shape)
print('y_train (training labels) dimensions: ', y_train.shape)
print('y_test (test labels) dimensions: ', y_test.shape)

# Reshape 2D input images into 1D vectors.
image_vector_length = x_train.shape[1] * x_train.shape[2]
x_train = x_train.reshape(x_train.shape[0],image_vector_length)
x_test = x_test.reshape(x_test.shape[0], image_vector_length)
print('x_train reshaped dimensions: ', x_train.shape)
print('x_test reshaped dimensions: ', x_test.shape)

# Transform output training label into one-hot encoded vectors.
num_classes = 10;
y_train = tf.keras.utils.to_categorical(y_train, num_classes)
y_test = tf.keras.utils.to_categorical(y_test, num_classes)

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
x_train (training data) dimensions:  (60000, 28, 28)
x_test (test data) dimensions:  (10000, 28, 28)
y_train (training labels) dimensions:  (60000,)
y_test (test labels) dimensions:  (10000,)
x_train reshaped dimensions:  (60000, 784)
x_test reshaped dimensions:  (10000, 784)


In [3]:
# from google.colab import drive
# drive.mount('/content/drive', force_remount=True)

# %cd drive/My\ Drive/mousenets/mnist
# %ls

Mounted at /content/drive
/content/drive/My Drive/mousenets/mnist
__init__.py   mnist1.ipynb      mousenets.py
mnist0.ipynb  mousenets.ipynbs  [0m[01;34m__pycache__[0m/


In [12]:
class Layer:
  """
  A single layer in an MLP, f(h) = f(w.dot(x))
  """
  # Definitions:

  # n = layer width
  # x = input vector
  # n_x = input vector length
  # w = weights matrix
  # h = intermediate variable, w.dot(x)
  # y = output vector

  # activation = activation function name
  # f = the activation function
  # dfdh = the derivative of the activation function

  # dLdx = backpropagation from dLdy to dLdx
  # dLdw = backpropagation from dLdy to dLdw

  def __init__(self, n_x, n, activation):

    assert(isinstance(n, int) and n > 0), 'n should be an integer and >0.'
    assert(isinstance(n_x, int) and n_x > 0), 'n_x should be an integer and >0.'

    self.n_x = n_x
    self.n = n
    self.activation = activation
    if self.activation == 'sigmoid':
      self.f = self._f_sigmoid
      self.dfdh = self._dfdh_sigmoid
    elif self.activation == "softmax":
      self.f = self._f_softmax
      self.dfdh = self._dfdh_softmax
    else:
      print("Error: activation function %s is not implemented.",
            self.activation)

    # Initialize w as a zero matrix/vector
    self.w = np.random.rand(self.n, self.n_x)

  def forward_pass(self, x):
    """Saves input x as the new self.x. Calculate and return y."""
    assert(isinstance(x, np.ndarray)), 'x should be a numpy array.'
    assert(len(x.shape)==1), 'x should be a vector.'
    self.x = x
    self.h = self.w.dot(self.x)
    self.y = self.f(self.h)
    return self.y

  def backprop(self, dLdy):
    """Calculate dLdw and dLdx, update w using dLdw, return dLdx."""
    # dLdy = (n * 1)
    # dydh = dfdh = (n * n) diagonal
    # dhdx = (n * n_x) = w
    # dhdw = (n_x * 1) = x
    # dLdw = dydh * dLdy * dhdw.T
    # dLdx = dhdx.T * dydh * dLdy
    dydh = self.dfdh(self.h)
    self.dLdw = np.outer(dydh @ dLdy, self.x.T)
    self.dLdx = self.w.T @ dydh @ dLdy
    self.w = self.w + self.dLdw
    return self.dLdx

  def get_layer_width(self):
    return self.n

  def get_weights(self):
    return self.w

  def _f_sigmoid(self, h):
    """Evaluate the sigmoid function for h, where h is a vector."""
    assert(len(h.shape)==1), 'Input arg h should be a vector.'
    h = self.normalize_vector(h)
    return 1 / (1 + np.exp(-h))

  def _dfdh_sigmoid(self, h):
    """Evaluate the gradient of sigmoid function at h, where h is a vector."""
    assert(len(h.shape)==1), 'Input arg h should be a vector.'
    f_h = self._f_sigmoid(h)
    return np.diag((1 - f_h)*f_h)

  def _f_softmax(self, h):
    """Evaluate the softmax function for h, where h is a vector."""
    assert(len(h.shape)==1), 'Input arg h should be a vector.'
    h = self.normalize_vector(h)
    exp_h = np.exp(h)
    return exp_h/np.sum(exp_h)

  def _dfdh_softmax(self, h):
    """Evaluate the gradient of softmax function at h, where h is a vector."""
    assert(len(h.shape)==1), 'Input arg h should be a vector.'
    f_h = self._f_softmax(h)
    return np.diag(f_h * (1 - f_h))

  def normalize_vector(self, h):
    """Normalize a vector h for numerical stability"""
    if abs(np.max(h)) != 0:
      return h / abs(np.max(h))
    else:
      return h

class LossFunction:
  """
  Loss function, specific implementations include (categorical) cross-entropy
  """
  # self.f = the loss function
  # self.dfdx = the gradient of the loss function at x
  def __init__(self, loss_fxn_type):
    if loss_fxn_type == "cce":
      self.f = self._f_cce
      self.dfdx = self._dfdx_cce
    else:
      assert(False), 'loss function %s is not implemented.' % loss_fxn_type

  def evaluate(self, x, y):
    return self.f(x, y)

  def get_gradient(self, x, y):
    return self.dfdx(x, y)

  def _f_cce(self, x, y):
    """Evaluate the categorical cross-entropy loss for x against the ground truth y."""
    # TODO: this implementation is for one-hot category only, add multi-class
    return -1*y.dot(np.log(x))

  def _dfdx_cce(self, x, y):
    """Evaluate the gradient of categorical cross-entropy loss at x"""
    # TODO: this implementation is for one-hot category only, add multi-class
    return x - y


class MLP:
  """
  A multi-layer perception
  """
  # Definitions:
  #
  # n_x0 = input data dimension
  # n_y = output and training data dimension
  # x0 = MLP input, (n_x0 * 1)
  # xn = MLP output, (n_y * 1). xn is the output of the n-th layer.
  #      The last layer output as the same dimensions as the training data
  # y = training data, (n_y * 1).
  # layers = a list of Layer objects, dynamically expand using add_layer()
  # loss = the loss function, used to evaluate the output xn

  def __init__(self, input_dimension, output_dimension):
    self.layers = []
    self.n_x0 = input_dimension
    self.n_y = output_dimension
    self.x0 = [0]*self.n_x0
    self.xn = [0]*self.n_y

  def add_layer(self, n, activation):
    """Augment the MLP by a new layer of width n."""
    # Get the last layer's output dimension.
    if len(self.layers) == 0:
      in_dimension = self.n_x0
    else:
      in_dimension = self.layers[-1].get_layer_width()
    # Automatically extend n by +1 for bias term. 
    in_dimension += 1
    # Append the new layer.
    self.layers.append(Layer(in_dimension, n, activation))

  def define_loss_function(self, loss_fxn_type):
    """Define the loss function object, to evaluate the output y."""
    self.loss = LossFunction(loss_fxn_type)

  def forward_pass(self, x0):
    """Perform forward pass using input x0, return output xN."""
    x = x0
    for l in self.layers:
      # Append +1 for the bias term. TODO: keep appending each time is not efficient.
      x = np.append(x, 1)
      # Run forward pass
      x = l.forward_pass(x)
    self.xn = x
    return self.xn

  def backprop(self, dLdxn):
    """Backpropagate loss error dLdxn to update the MLP."""
    dLdx = dLdxn
    for l in reversed(self.layers):
      dLdx = l.backprop(dLdx)
      # The last element of dLdx is the bias term, need not be propagated to the previous layer
      dLdx = dLdx[:-1]

  def calculate_loss_gradient(self, xn, y):
    """Calculate dL/dxn, gradient of loss at xn from the previous data sample, using training outcome y."""
    return self.loss.get_gradient(xn, y)

  def evaluate_loss(self, xn, y):
    """Evaluate the loss of a net output xn against the corresponding desired output y."""
    return self.loss.evaluate(xn, y)

  def get_layer_width(self, i):
    """Get the width of the i-th layer. 0-th is the input layer, -1 for the output layer."""
    if i == 0:
      return self.n_x0
    elif i == -1:
      return self.layers[i].get_layer_width()
    else:
      return self.layers[i-1].get_layer_width()

  def print_weights(self, i):
    """Print weights for the i-th layer. 0-th is the input layer, -1 for the output layer."""
    if i == 0:
      print('Weights for the input layer: N/A.\n')
    elif i == -1:
      print(self.layers[i].get_weights())
    else:
      print(self.layers[i-1].get_weights())



class NetTrainer:
  """
  Train a neural net.
  """
  
  def sgd(self, nn, x_train, y_train, epochs, batch_size, eta):
    """Train a neural net nn using batched stochastic gradient descent."""
    # eta is the learning rate.

    # Check input argument consistency
    assert(isinstance(nn, MLP)), 'Input neural net nn is not an instance of MLP class.'

    assert(x_train.shape[0] == y_train.shape[0]), 'x_train and y_train should have the same number of samples.'

    input_width = nn.get_layer_width(0)
    assert(x_train.shape[1] == input_width), \
      'x_train data has dimension %d, inconsistent with the neural net\'s input dimension %d.' \
      % (x_train.shape[1], input_width)

    output_width = nn.get_layer_width(-1)
    assert(y_train.shape[1] == output_width), \
      'y_train data has dimension %d, inconsistent with the neural net\'s output dimension %d.' \
      % (y_train.shape[1], output_width)

    num_samples = x_train.shape[0]
    assert(batch_size <= num_samples), 'batch_size [%d] > number of samples in x/y_train [%d].' \
      % (batch_size, num_samples)

    # Do batched sgd
    for i in range(epochs):
      cumulative_loss = 0
      cumulative_loss_gradient = [0]*output_width

      # Evaluate loss and loss gradient for a batch
      for j in range(batch_size):
        s = self._select_sample(j, num_samples)
        res = nn.forward_pass(x_train[s])
        cumulative_loss += nn.evaluate_loss(res, y_train[s])
        cumulative_loss_gradient += nn.calculate_loss_gradient(res, y_train[s])

      # Train for this epoch
      cumulative_loss = cumulative_loss / batch_size
      cumulative_loss_gradient = cumulative_loss_gradient / batch_size
      nn.backprop(cumulative_loss_gradient * eta)
      #nn.print_weights(1)
      #nn.print_weights(2)
      print('Epoch #%d: loss = %f\n' % (i, cumulative_loss))

  def _select_sample(self, count, num_samples):
    """Helper function to select sample from num_samples."""
    # Currently round-robin across all num_samples. Can also select at random.
    return count % num_samples


In [16]:
# Construct the MLP
l1_width = 32 # the first layer has 32 nodes
l2_width = 10 # the second and output layer has 10 nodes
mlp = MLP(input_dimension = image_vector_length, output_dimension = num_classes)
mlp.add_layer(l1_width, 'sigmoid')
mlp.add_layer(l2_width, 'softmax')
mlp.define_loss_function('cce')

# Train the MLP using sgd
trainer = NetTrainer()
trainer.sgd(mlp, x_train, y_train, epochs=5, batch_size=128, eta=1)

Epoch #0: loss = 2.303992

Epoch #1: loss = 2.305385

Epoch #2: loss = 2.306727

Epoch #3: loss = 2.308015

Epoch #4: loss = 2.309258

