In [1]:
import numpy as np

In [2]:
# [Problem 1] Classification of fully connected layers

In [20]:
class GetMiniBatch:
    """
    Parameters
    ----------
    X : 次の形のndarray, shape (n_samples, n_features)
      訓練データ
    y : 次の形のndarray, shape (n_samples, 1)
      正解値
    batch_size : int
      バッチサイズ
    seed : int
      NumPyの乱数のシード
    """
    def __init__(self, X, y, batch_size = 20, seed=0):
        self.batch_size = batch_size
        np.random.seed(seed)
        shuffle_index = np.random.permutation(np.arange(X.shape[0]))
        self._X = X[shuffle_index]
        self._y = y[shuffle_index]
        self._stop = np.ceil(X.shape[0]/self.batch_size).astype(int)

    def __len__(self):
        return self._stop

    def __getitem__(self,item):
        p0 = item*self.batch_size
        p1 = item*self.batch_size + self.batch_size
        return self._X[p0:p1], self._y[p0:p1]

    def __iter__(self):
        self._counter = 0
        return self

    def __next__(self):
        if self._counter >= self._stop:
            raise StopIteration()
        p0 = self._counter*self.batch_size
        p1 = self._counter*self.batch_size + self.batch_size
        self._counter += 1
        return self._X[p0:p1], self._y[p0:p1]

In [21]:
class FC:
    """
    ノード数n_nodes1からn_nodes2への全結合層
    Parameters
    ----------
    n_nodes1 : int
      前の層のノード数
    n_nodes2 : int
      後の層のノード数
    initializer : 初期化方法のインスタンス
    optimizer : 最適化手法のインスタンス
    """
    def __init__(self, n_nodes1, n_nodes2, initializer, optimizer):
        
        self.optimizer = optimizer
        self.W, self.B = initializer.W(n_nodes1, n_nodes2), initializer.B(n_nodes2)
        
        
    def forward(self, X):
        """
        フォワード
        Parameters
        ----------
        X : 次の形のndarray, shape (batch_size, n_nodes1)
            入力
        Returns
        ----------
        A : 次の形のndarray, shape (batch_size, n_nodes2)
            出力
        """
        A = X @ self.W + self.B
        
        return A
        
    def backward(self, dA):
        """
        バックワード
        Parameters
        ----------
        dA : 次の形のndarray, shape (batch_size, n_nodes2)
            後ろから流れてきた勾配
        Returns
        ----------
        dZ : 次の形のndarray, shape (batch_size, n_nodes1)
            前に流す勾配
        """
        dZ = dA @ self.W.T
        dB = np.sum(dA, axis=0)
        self.W, self.B = self.optimizer.update(self.W, self.B, dZ, dB)
        
        return dZ

In [22]:
# initializer

class SimpleInitializer:
    """
    ガウス分布によるシンプルな初期化
    Parameters
    ----------
    sigma : float
      ガウス分布の標準偏差
    """
    def __init__(self, sigma):
        self.sigma = sigma
        
    def W(self, n_nodes1, n_nodes2):
        """
        重みの初期化
        Parameters
        ----------
        n_nodes1 : int
          前の層のノード数
        n_nodes2 : int
          後の層のノード数

        Returns
        ----------
        W :
        """
        W = self.sigma * np.random.randn(n_nodes1, n_nodes2)
        return W
        
    def B(self, n_nodes2):
        """
        バイアスの初期化
        Parameters
        ----------
        n_nodes2 : int
          後の層のノード数

        Returns
        ----------
        B :
        """
        B = self.sigma * np.random.randn(1, n_nodes2)
        
        return B

In [23]:
# optimizer 

In [42]:
class SGD:
    """
    確率的勾配降下法
    Parameters
    ----------
    lr : 学習率
    """
    def __init__(self, lr):
        self.lr = lr
    def update(self, W, B, dZ, dB):
        """
        ある層の重みやバイアスの更新
        Parameters
        ----------
        layer : 更新前の層のインスタンス
        """
        W -= self.lr * dZ
        B -= self.lr * dB
        return W, B

In [43]:
class activation_func:

    def __init__(self, function_type):
        
        self.function_type=function_type
        
    def forward(self, X):
        
        if self.function_type=="tanh":
            A = self.tanh_function(X)
        elif self.function_type=="softmax":
            A = self.softmax(X)

        return A
        
    def backward(self, X, y=None):

        if self.function_type=="tanh":
            A = self.grad_tanh(X)
        elif self.function_type=="softmax":
            A = self.grad_softmax(X, y)
        return A

    def tanh_function(self, A):
        return (np.exp(A)-np.exp(-A))/(np.exp(A)+np.exp(-A))
    
    def softmax(self, A):
        return np.exp(A)/np.sum(np.exp(A), axis=0)

    def grad_tanh(self, A):
         return (1 - self.tanh_function(self.A)**2)

    def grad_softmax(self, Z3, y):
        return (Z3 - y)

    

In [44]:
class ScratchSimpleDeepNeuralNetrowkClassifier():
    def __init__(self,
                 batch_size = 20, #number of data
                 n_features = 784, #input size
                 n_nodes1 = 400, # hidden size of first layer 
                 n_nodes2 = 200, # hidden size of second layer 
                 n_output = 10, # 0-9 total 10 numbers
                 sigma = 0.02, # parameter initialize 
                 lr = 0.01, #learning rate 
                 epoch = 10, #iteration 
                 verbose=True):
        
        self.verbose = verbose
        self.batch_size = batch_size
        self.n_features = n_features
        self.n_nodes1 = n_nodes1
        self.n_nodes2 = n_nodes2 
        self.n_output = n_output
        self.sigma = sigma
        self.lr = lr
        self.epoch = epoch
        self.loss_train = []
        self.loss_val = []

        #neural net initilize
        optimizer = SGD(self.lr)
        self.FC1 = FC(self.n_features, self.n_nodes1, SimpleInitializer(self.sigma), optimizer)
        self.activation1 = activation_func(function_type='tanh')
        self.FC2 = FC(self.n_nodes1, self.n_nodes2, SimpleInitializer(self.sigma), optimizer)
        self.activation2 = activation_func(function_type='tanh')
        self.FC3 = FC(self.n_nodes2, self.n_output, SimpleInitializer(self.sigma), optimizer)
        self.activation3 = activation_func(function_type='softmax')

    def fit(self, X, y, X_val=None, y_val=None):
        
        for e in range(self.epoch):
            get_mini_batch = GetMiniBatch(X, y, batch_size=self.batch_size)
            for mini_X_train, mini_y_train in get_mini_batch:
                Z3 = self.forward(mini_X_train)
                self.backward(Z3, mini_y_train)
                
            self.forward(X)
            self.loss_train.append(self.cross_entropy_error(y, self.Z3))
            
            if X_val is not None:
                self.forward(X_val)
                self.loss_val.append(self.cross_entropy_error(y_val, self.Z3))
                
            if self.verbose:
                if X_val is None:
                    print(self.loss_train)
                else:
                    print(f"Epoch {e}:", "Train loss:", self.loss_train[-1], "Val loss:", self.loss_val[-1])

    
    def forward(self, X):
        
        A1 = self.FC1.forward(X)
        Z1 = self.activation1.forward(A1)
        A2 = self.FC2.forward(Z1)
        Z2 = self.activation2.forward(A2)
        A3 = self.FC3.forward(Z2)
        Z3 = self.activation3.forward(A3)
        return Z3

    def backward(self, Z3, Y):

        dA3 = self.activation3.backward(Z3, Y)
        dZ2 = self.FC3.backward(dA3)
        dA2 = self.activation2.backward(dZ2)
        dZ1 = self.FC2.backward(dA2)
        dA1 = self.activation1.backward(dZ1)
        dZ0 = self.FC1.backward(dA1) #

    
    def cross_entropy_error(self, y, Z):
        L = - np.sum(y * np.log(Z+1e-7)) / len(y)
        return L
        
    def predict(self, X):
        self.forward(X)
        return np.argmax(self.Z3, axis=1)

In [45]:
#test

In [15]:
from sklearn.preprocessing import OneHotEncoder
import keras




In [16]:
#### Running Sctratch ###
(X_train, y_train), (X_test, y_test) = keras.datasets.mnist.load_data()
X_train = X_train.reshape(-1, 784)
X_test = X_test.reshape(-1, 784)

X_train = X_train / 255.0
X_test = X_test / 255.0

X_train = X_train[:500, :] #because of computation
X_test = X_test[:100, :]
y_train = y_train[:500]
y_test = y_test[:100]

In [17]:
enc = OneHotEncoder(handle_unknown='ignore')
y_train_one_hot = enc.fit_transform(y_train[:, np.newaxis])
y_test_one_hot = enc.transform(y_test[:, np.newaxis])

In [46]:
model_nn = ScratchSimpleDeepNeuralNetrowkClassifier(batch_size=8, epoch=50)

In [47]:
model_nn.fit(X_train, y_train_one_hot.toarray(), X_test, y_test_one_hot.toarray())

ValueError: operands could not be broadcast together with shapes (200,10) (8,200) (200,10) 