In [47]:
import numpy as np
import unittest
import math
from sklearn import datasets
from sklearn import metrics
from sklearn import model_selection
from sklearn import preprocessing
from tqdm.notebook import tqdm_notebook as tqdm

In [2]:
class FC:
  def __init__(self, n_in, n_out, activation = None):
    self.n_in = n_in
    self.n_out = n_out
    self.activation = activation
    self.W = np.random.randn(n_in, n_out)* 0.01
    self.dW = np.zeros((n_in ,n_out))

  @staticmethod
  def stable_sigmoid(z):
    x = np.zeros_like(z)
    x[z >= 0] = 1.0/(1 + np.exp(-z[z>=0]))
    x[z < 0] = np.exp(z[z < 0])
    x[z < 0] = x[z < 0]/(1 + x[z < 0])
    return x

  @staticmethod
  def ReLU(x):
    z = x.copy()
    z[z < 0] = 0
    return z

  def __activation(self, a):
    if self.activation is None:
      return a.copy()
    elif self.activation == 'Sigmoid':
      return self.stable_sigmoid(a)
    elif self.activation == 'ReLU':
      return self.ReLU(a)
    else:
      raise NotImplementedError('Using Sigmoid or ReLu')

  def __dactivation(self, df, f, a):
    if self.activation is None:
      return df.copy()
    elif self.activation == 'Sigmoid':
      return f*(1-f)*df
    elif self.activation == 'ReLU':
      da = df.copy()
      da[a < 0] = 0
      return da

  def forward(self, x):
      self.x = x.copy()
      self.a = np.matmul(x, self.W)
      self.f = self.__activation(self.a)
      return self.f

  def backward(self, df):
      da = self.__dactivation(df, self.f, self.a)
      self.dW = np.einsum('ij,ik ->jk', self.x, da)
      self.dx = np.matmul(da, self.W.T)
      self.df = df
      self.da = da
      return self.dx

  def parameters(self):
      return [self.W]

  def grads(self):
      return [self.dW]

  def train(self):
      pass

  def eval(self):
      pass



In [33]:
class TestFC(unittest.TestCase):
  def test_fc_init(self):
    fc = FC(n_in = 10, n_out = 5, activation = 'Sigmoid')
    self.assertEqual(fc.n_in, 10)
    self.assertEqual(fc.n_out, 5)
    self.assertEqual(fc.W.shape, (10,5))
    self.assertEqual(fc.dW.shape, (10,5))
    self.assertEqual(fc.activation, 'Sigmoid')

  def test_forward_shape(self):
    N, n_in, n_out = 4, 5, 5
    fc = FC(n_in = n_in, n_out = n_out, activation = None)
    x = np.random.randn(N, n_in)
    out = fc.forward(x)
    self.assertEqual(out.shape, (N, n_out))

  def test_forward_linear(self):
    fc = FC(n_in =2 , n_out = 2, activation = None)
    fc.W = np.array([
        [1., 2.,],
        [3., 4.]
    ])
    x = np.array([5. , 6.])
    out = fc.forward(x)
    self.assertEqual(out[0], 23)
    self.assertEqual(out[1], 34)

  def test_relu_activation(self):
    fc = FC(n_in = 3, n_out = 3, activation = 'ReLU')
    a = np.array([[-1, 2, -3]])
    f = fc._FC__activation(a)
    np.testing.assert_array_equal(f, np.array([[0., 2., 0.]]))

  def test_relu_forward(self):
    fc = FC(n_in = 2, n_out =2 , activation = 'ReLU')
    fc.W = np.array([
        [6. ,1. ],
        [2. , 9.]
    ])
    x = np.array([-2, 4])
    out = fc.forward(x)
    np.testing.assert_array_equal(out, np.array([0, 34]))

  def test_sigmoid(self):
    a = np.array([[-1000., 0., 1000.]])
    f = FC.stable_sigmoid(a)
    self.assertTrue(np.all(f >= 0))
    self.assertTrue(np.all(f <= 1))

  def test_relu_backward(self):
    fc = FC(3,3, activation = 'ReLU')
    a = np.array([[-1., 2., -3. ]])
    f = fc._FC__activation(a)
    df = np.array([[10., 20., 30.]])
    da = fc._FC__dactivation(df, f, a)
    np.testing.assert_array_equal(da, np.array([[0., 20., 0.]]))

  def test_dW_computation(self):
    fc = FC(2, 2, activation=None)

    x = np.array([[1., 2.]])
    da = np.array([[3., 4.]])

    fc.x = x
    fc.a = x @ fc.W

    dW = x.T @ da

    np.testing.assert_array_equal(dW,
                                  np.array([[3., 4.],
                                            [6., 8.]]))


In [35]:
class MLP:
  def __init__(self, n_in , hiddens, activation = None, last_layer_linear = True):
    self.n_in = n_in
    self.hiddens = hiddens
    self.layers = [
        FC(
            n_in = hiddens[i-1] if i > 0 else n_in,
            n_out = hiddens[i],
            activation = activation if i < len(hiddens) - 1 or (not last_layer_linear) else None
        )
        for i in range(len(hiddens))
    ]

  def forward(self,x):
    out = x
    for layer in self.layers:
      out = layer.forward(out)
    return out

  def backward(self, dout):
    for layer in self.layers[::-1]:
      dout = layer.backward(dout)
    return dout

  def parameters(self):
    return sum([layer.parameters() for layer in self.layers], [])

  def grads(self):
    return sum([layer.grads() for layer in self.layers], [])

  def train(self):
    for layer in self.layers:
      layer.train()

  def eval(self):
    for layer in self.layers:
      layer.eval()

In [42]:
class TestMLP(unittest.TestCase):
  def setup(self):
    np.random.seed(42)

  def test_mlp_forward(self):
    mlp = MLP(n_in = 4, hiddens = [10, 5,1 ], activation = 'Sigmoid')
    x = np.random.randn(3,4)
    out = mlp.forward(x)
    self.assertEqual(out.shape, (3,1))

  def test_backward_shape(self):
    mlp = MLP(n_in = 4, hiddens = [10, 5, 1], activation = 'Sigmoid')
    x = np.random.randn(2,4)
    out = mlp.forward(x)
    dout = np.ones_like(out)
    dx = mlp.backward(dout)
    self.assertEqual(dx.shape, x.shape)

  def test_parameters_grads(self):
    mlp = MLP(n_in = 3, hiddens = [4, 2, 1])
    params = mlp.parameters()
    grads = mlp.grads()

    self.assertEqual(len(params), len(grads))

  def test_no_nan_backward(self):
    mlp = MLP(n_in = 4, hiddens = [8, 4, 1], activation = 'Sigmoid')
    x = np.random.randn(5,4)
    out = mlp.forward(x)
    dx = mlp.backward(np.ones_like(out))

    for g in mlp.grads():
      self.assertFalse(np.isnan(g).any())
      self.assertFalse(np.isinf(g).any())

  def test_number_layers(self):
    hiddens = [10, 5, 3, 1]
    mlp = MLP(4, hiddens = hiddens)

    self.assertEqual(len(mlp.layers), len(hiddens))

In [43]:
unittest.main(argv=[''], verbosity=2, exit=False)

test_dW_computation (__main__.TestFC.test_dW_computation) ... ok
test_fc_init (__main__.TestFC.test_fc_init) ... ok
test_forward_linear (__main__.TestFC.test_forward_linear) ... ok
test_forward_shape (__main__.TestFC.test_forward_shape) ... ok
test_relu_activation (__main__.TestFC.test_relu_activation) ... ok
test_relu_backward (__main__.TestFC.test_relu_backward) ... ok
test_relu_forward (__main__.TestFC.test_relu_forward) ... ok
test_sigmoid (__main__.TestFC.test_sigmoid) ... ok
test_backward_shape (__main__.TestMLP.test_backward_shape) ... ok
test_mlp_forward (__main__.TestMLP.test_mlp_forward) ... ok
test_no_nan_backward (__main__.TestMLP.test_no_nan_backward) ... ok
test_number_layers (__main__.TestMLP.test_number_layers) ... ok
test_parameters_grads (__main__.TestMLP.test_parameters_grads) ... ok

----------------------------------------------------------------------
Ran 13 tests in 0.078s

OK


<unittest.main.TestProgram at 0x7c61509a9040>

In [44]:
class CrossEntropy:
  @staticmethod
  def stable_softmax(X):
    exps = np.exp(X - np.max(X , axis = 1, keepdims = True))
    return exps/np.sum(exps, axis =1, keepdims = True)

  def forward(self, y_pred, y_true):
    n, n_out = y_pred.shape
    self.y_pred = y_pred
    self.y_true = y_true
    self.mu = self.stable_softmax(y_pred)
    mu_ytrue = self.mu[range(n), y_true]
    mu_ytrue[mu_ytrue < 1e-8] = 1e-8
    loss = np.sum(-np.log(mu_ytrue))
    return loss

  def backward(self):
    n, n_out = self.y_pred.shape
    d_ypred = self.mu.copy()
    d_ypred[range(n), self.y_true] -= 1
    return d_ypred

In [45]:
class SGD:
  def __init__(self, model, learning_rate, regularization =0.0):
    self.model = model
    self.learning_rate = learning_rate
    self.regularization = regularization
    self.current_step = 0

  def parameters(self):
    return self.model.parameters()

  def grads(self):
    return self.model.grads()

  def zero_grad(self):
    for g in self.grads():
      g.fill(0)

  def step(self):
    self.current_step += 1
    for p, g in zip(self.parameters(), self.grads()):
      g = self.regularization*p + g
      g = np.clip(g, -1, 1)
      p -= 1.0 / math.sqrt(self.current_step)*self.learning_rate


In [61]:
class AdaGrad:
  def __init__(self, model, learning_rate, regularization = 0.0):
    self.model = model
    self.learning_rate = learning_rate
    self.regularization = regularization
    self.current_step = 0
    self.sum_grad = [np.zeros_like(p) for p in model.parameters()]


  def parameters(self):
    return self.model.parameters()

  def grads(self):
    return self.model.grads()

  def zero_grad(self):
    for g in self.grads():
      g.fill(0)

  def step(self):
    eps = 1e-8
    for p, g, G in zip(self.parameters(), self.grads() , self.sum_grad):
      g = self.regularization*p + g
      g = np.clip(g, -1, 1)
      G += g*g
      p -= (self.learning_rate/np.sqrt(G + eps)) * g


In [62]:
class Accumulator:
  def __init__(self):
    self.total_sample = 0
    self.key_values = {}

  def __call__(self, n_sample, **kwargs):
    for k, v in kwargs.items():
      if k not in self.key_values:
        self.key_values[k] = v
      else:
        self.key_values[k] += v
    self.total_sample += n_sample

  def mean(self, key):
    return self.key_values[key] / self.total_sample

In [65]:
def prepare_data():
  X, y = datasets.load_digits(return_X_y= True)
  X, Xtest, y, ytest = model_selection.train_test_split(X, y, test_size = 0.3, random_state = 42)
  transform = preprocessing.MinMaxScaler()
  X = transform.fit_transform(X)
  X_test = transform.transform(Xtest)
  return X, Xtest, y, ytest

def prepare_trainer(model):
  sgd = AdaGrad(model= model, learning_rate = 0.01, regularization = 0.03)
  loss_func = CrossEntropy()
  return sgd, loss_func

def prepare_data_loader(X, y, batch_size):
  n = X.shape[0]
  permutation = np.random.permutation(n)
  for i in range(0, n, batch_size):
    j = i + batch_size if i + batch_size <= n else n
    batch_x = X[permutation[i:j]]
    batch_y = y[permutation[i:j]]
    yield batch_x, batch_y

def get_model(n_in ,n_out):
  np.random.seed(42)
  model = MLP(n_in = n_in, hiddens = [128, 64, 10], activation = 'ReLU')
  return model

class Config:
  n_epoch = 5000
  batch_size = 128

def main():
  X, Xtest, y, ytest = prepare_data()
  config = Config()

  model = get_model(n_in = X.shape[1], n_out = 10)
  sgd, loss_func = prepare_trainer(model)

  pbar = tqdm(range(config.n_epoch))
  val_acc = 0
  for epoch in pbar:
    data_loader = prepare_data_loader(X, y, config.batch_size)

    # set train mode
    model.train()
    accumulator = Accumulator()
    for step, (batch_x, batch_y) in enumerate(data_loader):
      # forward pass
      batch_yp = model.forward(batch_x)
      loss = loss_func.forward(batch_yp, batch_y)

      # backward pass and an optimization step
      sgd.zero_grad()
      dout = loss_func.backward()
      dx = model.backward(dout)
      sgd.step()

      # log training progress
      n_correct = np.sum(np.argmax(batch_yp, axis = 1) == batch_y)
      accumulator(len(batch_y), correct=n_correct, loss=loss)
      pbar.set_description(f"epoch {epoch} step {step+1} train_loss {accumulator.mean('loss'):.4f}\
                train_acc {accumulator.mean('correct')*100:.2f}% val_acc {val_acc*100:.2f}")
    model.eval() # set evaluation mode
    val_acc = np.sum(np.argmax(model.forward(Xtest), axis =1)== ytest)/len(ytest)

  ypred = np.argmax(model.forward(Xtest), axis=1)
  print(metrics.classification_report(ytest, ypred))
  print(metrics.confusion_matrix(ytest, ypred))

main()







  0%|          | 0/5000 [00:00<?, ?it/s]

              precision    recall  f1-score   support

           0       0.96      0.98      0.97        53
           1       0.89      0.98      0.93        50
           2       0.94      0.96      0.95        47
           3       0.93      0.96      0.95        54
           4       0.98      0.93      0.96        60
           5       0.97      0.92      0.95        66
           6       0.96      0.98      0.97        53
           7       0.98      0.98      0.98        55
           8       0.95      0.91      0.93        43
           9       0.95      0.92      0.93        59

    accuracy                           0.95       540
   macro avg       0.95      0.95      0.95       540
weighted avg       0.95      0.95      0.95       540

[[52  0  0  0  1  0  0  0  0  0]
 [ 0 49  0  0  0  0  1  0  0  0]
 [ 0  1 45  1  0  0  0  0  0  0]
 [ 0  0  1 52  0  1  0  0  0  0]
 [ 0  2  0  0 56  0  1  0  0  1]
 [ 0  1  1  0  0 61  0  1  1  1]
 [ 1  0  0  0  0  0 52  0  0  0]
 [ 0  0  0