In [42]:
import numpy as np

In [43]:
class GetMiniBatch:
    """
Iterator to get a mini-batch
    Parameters
    ----------
    X : The following forms of ndarray, shape (n_samples, n_features)
      Training data
    y : The following form of ndarray, shape (n_samples, 1)
      Correct answer value
    batch_size : int
      Batch size
    seed : int
      NumPy random number seed
    """
    def __init__(self, X, y, batch_size = 20, seed=0):
        self.batch_size = batch_size
        np.random.seed(seed)
        shuffle_index = np.random.permutation(np.arange(X.shape[0]))
        self._X = X[shuffle_index]
        self._y = y[shuffle_index]
        self._stop = np.ceil(X.shape[0]/self.batch_size).astype(np.int)

    def __len__(self):
        return self._stop

    def __getitem__(self,item):
        p0 = item*self.batch_size
        p1 = item*self.batch_size + self.batch_size
        return self._X[p0:p1], self._y[p0:p1]        

    def __iter__(self):
        self._counter = 0
        return self
        
    def __next__(self):
        if self._counter >= self._stop:
            raise StopIteration()
        p0 = self._counter*self.batch_size
        p1 = self._counter*self.batch_size + self.batch_size
        self._counter += 1
        return self._X[p0:p1], self._y[p0:p1]

In [44]:
class Initializer:
    def __init__(self, sigma):
        self.sigma = sigma
    def W(self, n_nodes1, n_nodes2):
      """
      Weight initialization
      Parameters
      ----------
      n_nodes1 : int
        Number of nodes in the previous layer
      n_nodes2 : int
        Number of nodes in the later layer
      Returns
      ----------
      W :
      """
      pass
    def B(self, n_nodes2):
      """
      Bias initialization
      Parameters
      ----------
      n_nodes2 : int
        Number of nodes in the later layer
      Returns
      ----------
      B :
      """
      pass
class SimpleInitializer(Initializer):
    def W(self, n_nodes1, n_nodes2):
        return self.sigma * np.random.randn(n_nodes1, n_nodes2)
    
    def B(self, n_nodes2):
        return self.sigma * np.random.randn(n_nodes2)

class XavierInitializer(Initializer):
    def W(self, n_nodes1, n_nodes2):
        return self.sigma * np.random.normal(scale=np.sqrt(1/(n_nodes1+n_nodes2)), size=(n_nodes1, n_nodes2))

    def B(self, n_nodes2):
        return self.sigma * np.random.normal(scale=np.sqrt(1/n_nodes2), size=n_nodes2)


class HeInitializer(Initializer):
    def W(self, n_nodes1, n_nodes2):
        return self.sigma * np.random.normal(scale=np.sqrt(2/(n_nodes1+n_nodes2)), size=(n_nodes1, n_nodes2))
        # return self.sigma * np.random.normal( size=(n_nodes1, n_nodes2))

    def B(self, n_nodes2):
        return self.sigma * np.random.normal(scale=np.sqrt(2/n_nodes2), size=n_nodes2)
        # return self.sigma * np.random.normal(size=n_nodes2)


In [45]:
class Optimizer:
    def __init__(self, lr):
        self.lr = lr
    def update(self, layer):
        """
        Update weights and biases for a layer
        Parameters
        ----------
        layer : Instance of the layer before update
        """
        pass

class SGD(Optimizer):
    def update(self, layer):
        layer.B -= self.lr*layer.dB
        layer.W -= self.lr*layer.dW

class AdaGrad(Optimizer):
    def update(self, layer):
        layer.H += np.power(layer.dB,2)
        layer.B -= self.lr*(np.sqrt(1/(layer.H + 1e-07)))*layer.dB
        layer.W -= self.lr*layer.dW

In [46]:
class Tanh:
    def forward(self, A):
      self.A = A
      return np.tanh(A)
    
    def backward(self, dZ):
      return dZ * (1 - np.power(np.tanh(self.A),2))

class Sigmoid:
    def __sigmoid(self, X):
      return 1 / ( 1 + np.exp(-X) ) 
    def forward(self, A):
      self.A = A
      return self.__sigmoid(A)
    
    def backward(self, dZ):
      return dZ * (1 - self.__sigmoid(self.A)) * self.__sigmoid(self.A)

class ReLu:
    def forward(self, A):
      self.A = A
      return np.maximum(A, 0)
    
    def backward(self, dZ):
      return dZ * ((self.A > 0) * 1)

class Softmax:
    def forward(self, X):
      e = np.exp(X)
      return e / np.sum(e, axis=1).reshape(-1,1)
    
    def backward(self, Yhat, Y):
      n_batches = Yhat.shape[0]
      return (Yhat-Y)/n_batches

In [47]:
class FC:
    """
    Number of nodes Fully connected layer from n_nodes1 to n_nodes2
    Parameters
    ----------
    n_nodes1 : int
      Number of nodes in the previous layer
    n_nodes2 : int
      Number of nodes in the later layer
    initializer: instance of initialization method
    optimizer: instance of optimization method
    """
    def __init__(self, n_nodes1, n_nodes2, initializer, optimizer):
        self.optimizer = optimizer
        # Initialize
        # Initialize self.W and self.B using the initializer method
        self.W = initializer.W(n_nodes1, n_nodes2)
        self.B = initializer.B(n_nodes2)
        if type(optimizer) is AdaGrad:
          self.H = 0.0

    def forward(self, X):
        """
        forward
        Parameters
        ----------
        X : The following forms of ndarray, shape (batch_size, n_nodes1)
            入力
        Returns
        ----------
        A : The following forms of ndarray, shape (batch_size, n_nodes2)
            output
        """
        self.Z = X    
        A = np.matmul(X, self.W) + self.B
        return A

    def backward(self, dA):
        """
        Backward
        Parameters
        ----------
        dA : The following forms of ndarray, shape (batch_size, n_nodes2)
            Gradient flowing from behind
        Returns
        ----------
        dZ : The following forms of ndarray, shape (batch_size, n_nodes1)
            Gradient to flow forward
        """
        self.dB = np.sum(dA, axis=0)
        self.dW = np.dot(self.Z.T, dA)
        dZ = np.dot(dA, self.W.T)
        # update
        self = self.optimizer.update(self)
        return dZ

In [48]:
class ScratchDeepNeuralNetrowkClassifier:
    def __init__(self, epoch=20, batch_size=20, verbose = True):
        self.verbose = verbose
        self.epoch = epoch
        self.batch_size = batch_size

    def _feed_forward(self, X):
        A1 = self.FC1.forward(X)
        Z1 = self.activation1.forward(A1)
        A2 = self.FC2.forward(Z1)
        Z2 = self.activation2.forward(A2)
        A3 = self.FC3.forward(Z2)
        Z3 = self.activation3.forward(A3)
        return Z3

    def _backpropagation(self, y_proba, y_true):
        dA3 = self.activation3.backward(y_proba, y_true)
        dZ2 = self.FC3.backward(dA3)
        dA2 = self.activation2.backward(dZ2)
        dZ1 = self.FC2.backward(dA2)
        dA1 = self.activation1.backward(dZ1)
        dZ0 = self.FC1.backward(dA1)

    def _loss_function(self, y_proba, y_true):
        return -np.mean(y_true*np.log(y_proba + 1e-07))

    def fit(self, X, y, X_val=None, y_val=None):
        pass

    def predict_proba(self, X):
        return self._feed_forward(X)

    def predict(self, X):
        y_proba = self.predict_proba(X)
        return np.argmax(y_proba, axis=1)

## Download the dataset

In [49]:
from keras.datasets import mnist
(X_train, y_train), (X_test, y_test) = mnist.load_data()

## Smoothing

In [50]:
X_train = X_train.reshape(-1, 784)
X_test = X_test.reshape(-1, 784)

## Preprocessing

Preprocess X

In [51]:
X_train = X_train.astype(np.float)
X_test = X_test.astype(np.float)
X_train /= 255
X_test /= 255
print(X_train.max()) # 1.0
print(X_train.min()) # 0.0

1.0
0.0


Split train - val data

In [52]:
from sklearn.model_selection import train_test_split
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2)
print(X_train.shape) # (48000, 784)
print(X_val.shape) # (12000, 784)

(48000, 784)
(12000, 784)


One-hot encoding Y

In [53]:
from sklearn.preprocessing import OneHotEncoder
enc = OneHotEncoder(handle_unknown='ignore', sparse=False)
y_train_one_hot = enc.fit_transform(y_train[:, np.newaxis])
y_val_one_hot = enc.transform(y_val[:, np.newaxis])
print(y_train.shape)
print(y_train_one_hot.shape) 
print(y_train_one_hot.dtype)

(48000,)
(48000, 10)
float64


##**Problem 9**
##Learning and estimation

In [54]:
from sklearn.metrics import accuracy_score
y_val = enc.inverse_transform(y_val_one_hot)
y_val = y_val.ravel()

##SGD + SimpleInitializer + Sigmoid

In [55]:
class Model1(ScratchDeepNeuralNetrowkClassifier):
    def fit(self, X, y, X_val=None, y_val=None):
        self.sigma      = 0.01
        self.lr         = 0.01
        self.n_features = X.shape[1] # number of features
        self.n_nodes1   = 400
        self.n_nodes2   = 200
        self.n_output   = y.shape[1] # number of output classes (number of nodes in the 3rd layer)

        self.train_loss = []
        self.val_loss = []

        # Initialize
        optimizer = SGD(self.lr)
        self.FC1 = FC(self.n_features, self.n_nodes1, SimpleInitializer(self.sigma), optimizer)
        self.activation1 = Sigmoid()
        self.FC2 = FC(self.n_nodes1, self.n_nodes2, SimpleInitializer(self.sigma), optimizer)
        self.activation2 = Sigmoid()
        self.FC3 = FC(self.n_nodes2, self.n_output, SimpleInitializer(self.sigma), optimizer)
        self.activation3 = Softmax()

        for i in range(self.epoch):
            get_mini_batch = GetMiniBatch(X, y, batch_size=self.batch_size)
            for index, (X_batch, y_batch) in enumerate(get_mini_batch):
                # Forward
                Z3 = self._feed_forward(X_batch)

                # Backprop
                self._backpropagation(Z3, y_batch)

            self.train_loss.append(self._loss_function(self.predict_proba(X), y))
            if X_val is not None and y_val is not None:
                self.val_loss.append(self._loss_function(self.predict_proba(X_val), y_val))

            if self.verbose:
                print(f'epoch: {i}, loss: {self.train_loss[-1]}')

In [56]:
scratch_model_1 = Model1()
scratch_model_1.fit(X_train, y_train_one_hot, X_val, y_val_one_hot)

epoch: 0, loss: 0.23103246471570454
epoch: 1, loss: 0.23095209851645065
epoch: 2, loss: 0.23085663141794022
epoch: 3, loss: 0.2307233878422672
epoch: 4, loss: 0.23048247737548921
epoch: 5, loss: 0.2298169973698218
epoch: 6, loss: 0.22598526363889443
epoch: 7, loss: 0.19431720843375377
epoch: 8, loss: 0.1562846435117943
epoch: 9, loss: 0.12578877017756293
epoch: 10, loss: 0.1070477496767722
epoch: 11, loss: 0.08661182165086774
epoch: 12, loss: 0.07801993674737993
epoch: 13, loss: 0.07336625139136578
epoch: 14, loss: 0.06976712408023678
epoch: 15, loss: 0.06627609995603422
epoch: 16, loss: 0.06257061841373147
epoch: 17, loss: 0.05885497856905892
epoch: 18, loss: 0.055595737898015814
epoch: 19, loss: 0.05296986087107281


In [57]:
y_pred = scratch_model_1.predict(X_val)

In [58]:
accuracy_score(y_pred, y_val)

0.8395

##SGD + HeInitializer + ReLu

In [59]:
class Model2(ScratchDeepNeuralNetrowkClassifier):
    def fit(self, X, y, X_val=None, y_val=None):
        self.sigma      = 0.01
        self.lr         = 0.01
        self.n_features = X.shape[1] # number of features
        self.n_nodes1   = 400
        self.n_nodes2   = 200
        self.n_output   = y.shape[1] # number of output classes (number of nodes in the 3rd layer)

        self.train_loss = []
        self.val_loss = []

        # Initialize
        optimizer = SGD(self.lr)
        self.FC1 = FC(self.n_features, self.n_nodes1, HeInitializer(self.sigma), optimizer)
        self.activation1 = ReLu()
        self.FC2 = FC(self.n_nodes1, self.n_nodes2, HeInitializer(self.sigma), optimizer)
        self.activation2 = ReLu()
        self.FC3 = FC(self.n_nodes2, self.n_output, HeInitializer(self.sigma), optimizer)
        self.activation3 = Softmax()

        for i in range(self.epoch):
            get_mini_batch = GetMiniBatch(X, y, batch_size=self.batch_size)
            for index, (X_batch, y_batch) in enumerate(get_mini_batch):
                # Forward
                Z3 = self._feed_forward(X_batch)

                # Backprop
                self._backpropagation(Z3, y_batch)

            self.train_loss.append(self._loss_function(self.predict_proba(X), y))
            if X_val is not None and y_val is not None:
                self.val_loss.append(self._loss_function(self.predict_proba(X_val), y_val))

            if self.verbose:
                print(f'epoch: {i}, loss: {self.train_loss[-1]}')

In [60]:
scratch_model_2 = Model2()
scratch_model_2.fit(X_train, y_train_one_hot, X_val, y_val_one_hot)

epoch: 0, loss: 0.23012620954951873
epoch: 1, loss: 0.23012714675179516
epoch: 2, loss: 0.23012732204391398
epoch: 3, loss: 0.23012728820433534
epoch: 4, loss: 0.23012719780082685
epoch: 5, loss: 0.23012702740271535
epoch: 6, loss: 0.23012665205688196
epoch: 7, loss: 0.2301255529914926
epoch: 8, loss: 0.23012033065981172
epoch: 9, loss: 0.23004024526639363
epoch: 10, loss: 0.1865439105585739
epoch: 11, loss: 0.14674698648975348
epoch: 12, loss: 0.12998666921571683
epoch: 13, loss: 0.11469202122107941
epoch: 14, loss: 0.06817801718048722
epoch: 15, loss: 0.04720774886683682
epoch: 16, loss: 0.037807099736954904
epoch: 17, loss: 0.03048494604667859
epoch: 18, loss: 0.025246575653172357
epoch: 19, loss: 0.021406559205862017


In [62]:
y_pred = scratch_model_2.predict(X_val)

In [63]:
accuracy_score(y_pred, y_val)

0.9329166666666666

##SGD + XavierInitializer + Tanh

In [64]:
class Model3(ScratchDeepNeuralNetrowkClassifier):
    def fit(self, X, y, X_val=None, y_val=None):
        self.sigma      = 0.01
        self.lr         = 0.01
        self.n_features = X.shape[1] # number of features
        self.n_nodes1   = 400
        self.n_nodes2   = 200
        self.n_output   = y.shape[1] # number of output classes (number of nodes in the 3rd layer)

        self.train_loss = []
        self.val_loss = []

        # Initialize
        optimizer = SGD(self.lr)
        self.FC1 = FC(self.n_features, self.n_nodes1, XavierInitializer(self.sigma), optimizer)
        self.activation1 = Tanh()
        self.FC2 = FC(self.n_nodes1, self.n_nodes2, XavierInitializer(self.sigma), optimizer)
        self.activation2 = Tanh()
        self.FC3 = FC(self.n_nodes2, self.n_output, XavierInitializer(self.sigma), optimizer)
        self.activation3 = Softmax()

        for i in range(self.epoch):
            get_mini_batch = GetMiniBatch(X, y, batch_size=self.batch_size)
            for index, (X_batch, y_batch) in enumerate(get_mini_batch):
                # Forward
                Z3 = self._feed_forward(X_batch)

                # Backprop
                self._backpropagation(Z3, y_batch)

            self.train_loss.append(self._loss_function(self.predict_proba(X), y))
            if X_val is not None and y_val is not None:
                self.val_loss.append(self._loss_function(self.predict_proba(X_val), y_val))

            if self.verbose:
                print(f'epoch: {i}, loss: {self.train_loss[-1]}')

In [65]:
scratch_model_3 = Model3()
scratch_model_3.fit(X_train, y_train_one_hot, X_val, y_val_one_hot)

epoch: 0, loss: 0.2301262029303276
epoch: 1, loss: 0.2301271367793693
epoch: 2, loss: 0.2301273087525046
epoch: 3, loss: 0.2301272721256824
epoch: 4, loss: 0.2301271802079799
epoch: 5, loss: 0.2301270130612046
epoch: 6, loss: 0.23012666208175805
epoch: 7, loss: 0.23012570257492856
epoch: 8, loss: 0.23012134278267224
epoch: 9, loss: 0.230037044552603
epoch: 10, loss: 0.17436836494453914
epoch: 11, loss: 0.1597825454640234
epoch: 12, loss: 0.11662698389643039
epoch: 13, loss: 0.0785070195644493
epoch: 14, loss: 0.05351389451465963
epoch: 15, loss: 0.0408307089162808
epoch: 16, loss: 0.03586514924011039
epoch: 17, loss: 0.031843088247308415
epoch: 18, loss: 0.029138915162482903
epoch: 19, loss: 0.027014474344283026


In [66]:
y_pred = scratch_model_3.predict(X_val)

In [67]:
accuracy_score(y_pred, y_val)

0.9154166666666667

##AdaGrad

In [72]:
class Model4(ScratchDeepNeuralNetrowkClassifier):
    def fit(self, X, y, X_val=None, y_val=None):
        self.sigma      = 0.01
        self.lr         = 0.01
        self.n_features = X.shape[1] # number of features
        self.n_nodes1   = 400
        self.n_nodes2   = 200
        self.n_output   = y.shape[1] # number of output classes (number of nodes in the 3rd layer)

        self.train_loss = []
        self.val_loss = []

        # Initialize
        optimizer = AdaGrad(self.lr)
        self.FC1 = FC(self.n_features, self.n_nodes1, SimpleInitializer(self.sigma), optimizer)
        self.activation1 = ReLu()
        self.FC2 = FC(self.n_nodes1, self.n_nodes2, SimpleInitializer(self.sigma), optimizer)
        self.activation2 = ReLu()
        self.FC3 = FC(self.n_nodes2, self.n_output, SimpleInitializer(self.sigma), optimizer)
        self.activation3 = Softmax()

        for i in range(self.epoch):
            get_mini_batch = GetMiniBatch(X, y, batch_size=self.batch_size)
            for index, (X_batch, y_batch) in enumerate(get_mini_batch):
                # Forward
                Z3 = self._feed_forward(X_batch)

                # Backprop
                self._backpropagation(Z3, y_batch)

            self.train_loss.append(self._loss_function(self.predict_proba(X), y))
            if X_val is not None and y_val is not None:
                self.val_loss.append(self._loss_function(self.predict_proba(X_val), y_val))

            if self.verbose:
                print(f'epoch: {i}, loss: {self.train_loss[-1]}')

In [73]:
scratch_model_4 = Model4()
scratch_model_4.fit(X_train, y_train_one_hot, X_val, y_val_one_hot)

epoch: 0, loss: 0.07967009044415699
epoch: 1, loss: 0.04255954382292831
epoch: 2, loss: 0.03391847934328141
epoch: 3, loss: 0.028812227874073126
epoch: 4, loss: 0.024678401079650435
epoch: 5, loss: 0.021238665029150634
epoch: 6, loss: 0.018480415987126887
epoch: 7, loss: 0.016255077315290026
epoch: 8, loss: 0.014410275508767938
epoch: 9, loss: 0.01289089654572756
epoch: 10, loss: 0.011592085948214351
epoch: 11, loss: 0.01049111519894418
epoch: 12, loss: 0.00951967136177652
epoch: 13, loss: 0.008674772317749452
epoch: 14, loss: 0.007933935855088203
epoch: 15, loss: 0.007281655159436782
epoch: 16, loss: 0.006695754264384957
epoch: 17, loss: 0.006170667294685989
epoch: 18, loss: 0.005685922581067491
epoch: 19, loss: 0.005255216938358371


In [74]:
y_pred = scratch_model_4.predict(X_val)

In [75]:
accuracy_score(y_pred, y_val)

0.9710833333333333