# ディープラーニングをスクラッチする課題

# 【問題1】全結合層のクラス化

In [71]:
import numpy as np

class FC:
    """
    ノード数n_nodes1からn_nodes2への全結合層
    Parameters
    ----------
    n_nodes1 : int
      前の層のノード数
    n_nodes2 : int
      後の層のノード数
    initializer : 初期化方法のインスタンス
    optimizer : 最適化手法のインスタンス
    """
    def __init__(self, n_nodes1, n_nodes2, initializer, optimizer):
        self.optimizer = optimizer
        # 初期化
        # initializerのメソッドを使い、self.Wとself.Bを初期化する
        self.W = initializer.W(n_nodes1, n_nodes2)
        self.B = initializer.B(n_nodes2)
        self.Z = 0
        self.dA = 0
        
    def forward(self, X):
        """
        フォワード
        Parameters
        ----------
        X : 次の形のndarray, shape (batch_size, n_nodes1)
            入力
        Returns
        ----------
        A : 次の形のndarray, shape (batch_size, n_nodes2)
            出力
        """ 
        A = np.dot(X, self.W) + self.B
        self.Z = X
        return A
    
    def backward(self, dA):
        """
        バックワード
        Parameters
        ----------
        dA : 次の形のndarray, shape (batch_size, n_nodes2)
            後ろから流れてきた勾配
        Returns
        ----------
        dZ : 次の形のndarray, shape (batch_size, n_nodes1)
            前に流す勾配
        """
        self.dA = dA 
        dZ = np.dot(dA, self.W.T) 
        # 更新
        self = self.optimizer.update(self)
        
        return dZ


# 【問題2】初期化方法のクラス化

In [7]:
class SimpleInitializer:
    """
    ガウス分布によるシンプルな初期化
    Parameters
    ----------
    sigma : float
      ガウス分布の標準偏差
    """
    
    def __init__(self, sigma=0.01):
        self.sigma = sigma
        
    def W(self, n_nodes1, n_nodes2):
        """
        重みの初期化
        Parameters
        ----------
        n_nodes1 : int
          前の層のノード数
        n_nodes2 : int
          後の層のノード数

        Returns
        ----------
        W : 次の形のndarray, shape(n_nodes1, n_nodes2)
        """
        W = self.sigma * np.random.randn(n_nodes1, n_nodes2)
        return W
    
    def B(self, n_nodes2):
        """
        バイアスの初期化
        Parameters
        ----------
        n_nodes2 : int
          後の層のノード数

        Returns
        ----------
        B : 次の形のndarray, shape(1, nodes2)
        """
        B = self.sigma * np.random.randn(1, n_nodes2)
        return B

# 【問題3】最適化手法のクラス化

In [148]:
class SGD:
    """
    確率的勾配降下法
    Parameters
    ----------
    lr : 学習率
    """
    def __init__(self, lr):
        self.lr = lr
        
    def update(self, layer):
        """
        ある層の重みやバイアスの更新
        Parameters
        ----------
        layer : 更新前の層のインスタンス

        Returns
        ----------
        layer : 更新後の層のインスタンス
        """
        layer.W = layer.W - self.lr * np.dot(layer.Z.T, layer.dA) / len(layer.dA)
        layer.B = layer.B - self.lr * np.mean(layer.dA, axis=0)
        return layer

# 【問題4】活性化関数のクラス化

In [153]:
class sigmoid:
    """
    シグモイド関数
    """
    
    def __init__(self):
        self.Z = 0
    
    def forward(self, A):
        """
        フォワード
        Parameters
        ----------
        A : 次の形のndarray, shape (batch_size, n_nodes)
            入力
        Returns
        ----------
        Z : 次の形のndarray, shape (batch_size, n_nodes)
            出力
        """ 
        Z = 1 / (1 + np.exp(-A))
        self.Z = Z
        return Z
    
    def backward(self, dZ):
        """
        バックワード
        Parameters
        ----------
        dZ : 次の形のndarray, shape (batch_size, n_nodes)
            後ろから流れてきた勾配
        Returns
        ----------
        dA : 次の形のndarray, shape (batch_size, n_nodes)
            前に流す勾配
        """
        dA = dZ  *  (1 - self.Z) * self.Z 
        return dA

class sigmoid:
    """
    シグモイド関数
    """
    
    def __init__(self):
        self.Z = 0
    
    def forward(self, A):
        """
        フォワード
        Parameters
        ----------
        A : 次の形のndarray, shape (batch_size, n_nodes)
            入力
        Returns
        ----------
        Z : 次の形のndarray, shape (batch_size, n_nodes)
            出力
        """ 
        Z = 1 / (1 + np.exp(-A))
        self.Z = Z
        return Z
    
    def backward(self, dZ):
        """
        バックワード
        Parameters
        ----------
        dZ : 次の形のndarray, shape (batch_size, n_nodes)
            後ろから流れてきた勾配
        Returns
        ----------
        dA : 次の形のndarray, shape (batch_size, n_nodes)
            前に流す勾配
        """
        dA = dZ  *  (1 - self.Z) * self.Z 
        return dA
    
class Tanh:
    """
    ハイパボリックタンジェント関数
    """
    
    def __init__(self):
        self.Z = 0
    
    def forward(self, A):
        """
        フォワード
        Parameters
        ----------
        A : 次の形のndarray, shape (batch_size, n_nodes)
            入力
        Returns
        ----------
        Z : 次の形のndarray, shape (batch_size, n_nodes)
            出力
        """ 
        Z = np.tanh(A)
        self.Z = Z
        return Z
    
    def backward(self, dZ):
        """
        バックワード
        Parameters
        ----------
        dZ : 次の形のndarray, shape (batch_size, n_nodes)
            後ろから流れてきた勾配
        Returns
        ----------
        dA : 次の形のndarray, shape (batch_size, n_nodes)
            前に流す勾配
        """
        dA = dZ  *  (1 - self.Z**2)
        return dA

class Softmax:
    """
    ソフトマックス関数
    """
    
    def __init__(self):
        self.Z = 0
    
    def forward(self, A):
        """
        フォワード
        Parameters
        ----------
        A : 次の形のndarray, shape (batch_size, n_nodes)
            入力
        Returns
        ----------
        Z : 次の形のndarray, shape (batch_size, n_nodes)
            出力
        """ 
        
        c = np.max(A)
        A = A - c
        ex = np.exp(A)
        Z = ex / (np.sum(ex, axis=1))[:, np.newaxis]
        self.Z = Z
        return Z
    
    def backward(self, y):
        """
        バックワード
        Parameters
        ----------
        y : 次の形のndarray, shape (batch_size, n_class)
            正解ラベル
        Returns
        ----------
        dA : 次の形のndarray, shape (batch_size, n_class)
            前に流す勾配
        """
        
        dA = y - self.Z
        
        return dA


In [12]:
class GetMiniBatch:
    """
    ミニバッチを取得するイテレータ

    Parameters
    ----------
    X : 次の形のndarray, shape (n_samples, n_features)
      学習データ
    y : 次の形のndarray, shape (n_samples, 1)
      正解値
    batch_size : int
      バッチサイズ
    seed : int
      NumPyの乱数のシード
    """
    def __init__(self, X, y, batch_size = 10, seed=0):
        self.batch_size = batch_size
        np.random.seed(seed)
        shuffle_index = np.random.permutation(np.arange(X.shape[0]))
        self.X = X[shuffle_index]
        self.y = y[shuffle_index]
        self._stop = np.ceil(X.shape[0]/self.batch_size).astype(np.int)

    def __len__(self):
        return self._stop
    
    #Pythonの特殊メソッドのひとつで、オブジェクトに角括弧でアクセスしたときの挙動を定義できる。
    def __getitem__(self,item):
        p0 = item*self.batch_size
        p1 = item*self.batch_size + self.batch_size
        return self.X[p0:p1], self.y[p0:p1]        

    def __iter__(self):
        self._counter = 0
        return self

    def __next__(self):
        if self._counter >= self._stop:
            raise StopIteration()
        p0 = self._counter*self.batch_size
        p1 = self._counter*self.batch_size + self.batch_size
        self._counter += 1
        return self.X[p0:p1], self.y[p0:p1]

In [152]:
import numpy as np
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt
%matplotlib inline

class ScratchDeepNeuralNetrowkClassifier:
    """
    """
    
    def __init__(self, n_nodes1, n_nodes2, n_output, sigma=0.01, lr=0.01, batch_size=50, epoch=10, verbose=True, metrics="acc"):
        self.sigma = sigma
        self.lr = lr
        self.n_features = 0
        self.n_nodes1 = n_nodes1
        self.n_nodes2 = n_nodes2
        self.n_output = n_output
        self.batch_size = batch_size
        self.epoch = epoch
        self.metrics = metrics
        self.verbose = verbose
        self.train_loss = np.zeros(epoch)
        self.val_loss = np.zeros(epoch)
    
    def fit(self, X, y, X_val=None, y_val=None):
        """
        ニューラルネットワーク分類器を学習する。

        Parameters
        ----------
        X : 次の形のndarray, shape (n_samples, n_features)
            学習用データの特徴量
        y : 次の形のndarray, shape (n_samples, )
            学習用データの正解値
        X_val : 次の形のndarray, shape (n_samples, n_features)
            検証用データの特徴量
        y_val : 次の形のndarray, shape (n_samples, )
            検証用データの正解値
        """
        
        # self.sigma : ガウス分布の標準偏差
        # self.lr : 学習率
        # self.n_nodes1 : 1層目のノード数
        # self.n_nodes2 : 2層目のノード数
        # self.n_output : 出力層のノード数
        self.n_features = X.shape[1]

        optimizer = SGD(self.lr)
        self.FC1 = FC(self.n_features, self.n_nodes1, SimpleInitializer(self.sigma), optimizer)
        self.activation1 = sigmoid()
        self.FC2 = FC(self.n_nodes1, self.n_nodes2, SimpleInitializer(self.sigma), optimizer)
        self.activation2 = sigmoid()
        self.FC3 = FC(self.n_nodes2, self.n_output, SimpleInitializer(self.sigma), optimizer)
        self.activation3 = Softmax()
    
        for i in range(self.epoch):

            #バッチ作成
            get_mini_batch = GetMiniBatch(X, y, batch_size=self.batch_size, seed=0)

            for mini_X_train, mini_y_train in get_mini_batch:
                
                #FP
                A1 = self.FC1.forward(mini_X_train)
                Z1 = self.activation1.forward(A1)
                A2 = self.FC2.forward(Z1)
                Z2 = self.activation2.forward(A2)
                A3 = self.FC3.forward(Z2)
                Z3 = self.activation3.forward(A3)

                #BP
                dA3 = self.activation3.backward(mini_y_train) # 交差エントロピー誤差とソフトマックスを合わせている
                dZ2 = self.FC3.backward(dA3)
                dA2 = self.activation2.backward(dZ2)
                dZ1 = self.FC2.backward(dA2)
                dA1 = self.activation1.backward(dZ1)
                dZ0 = self.FC1.backward(dA1) # dZ0は使用しない

            #評価値の表示
            train_pred = self.FP(X)
            self.train_loss[i] = self._cross_entropy_loss(train_pred, y)
            
            if np.any(X_val):
                val_pred = self.FP(X_val)
                self.val_loss[i] = self._cross_entropy_loss(val_pred, y_val)
                met = self.accuracy(np.argmax(y_val, axis=1), np.argmax(val_pred, axis=1))
                      
                if self.verbose:
                    print("epoch:{0} train_loss: {1} val_loss: {2} {3}: {4}".format(i+1, self.train_loss[i], self.val_loss[i], self.metrics, met))
                    
            else:
                if self.verbose:
                      print("epoch:{0} loss: {1}".format(i+1, self.train_loss[i]))
                            
        
    def FP(self, X):
        
        #FP
        A1 = self.FC1.forward(X)
        Z1 = self.activation1.forward(A1)
        A2 = self.FC2.forward(Z1)
        Z2 = self.activation2.forward(A2)
        A3 = self.FC3.forward(Z2)
        Z3 = self.activation3.forward(A3)

        return Z3 

    def _cross_entropy_loss(self,z, y):

        return - sum(sum(y * np.log(z))) / len(y)
    
    def accuracy(self, y, y_pred):
        # accuracyを計算して返す
        return accuracy_score(y, y_pred)

In [140]:
from keras.datasets import mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()

In [141]:
#フラットにする
X_train = x_train.reshape(-1, 784)
X_test = x_test.reshape(-1, 784)

In [142]:
#スケール合わせ
X_train = X_train.astype(np.float)
X_test = X_test.astype(np.float)
X_train /= 255.0
X_test /= 255.0

In [143]:
#onehot
from sklearn.preprocessing import OneHotEncoder
enc = OneHotEncoder(handle_unknown='ignore', sparse=False)
y_train_one_hot = enc.fit_transform(y_train[:, np.newaxis])
y_test_one_hot = enc.transform(y_test[:, np.newaxis])
print(y_train.shape) # (60000,)
print(y_train_one_hot.shape) # (60000, 10)
print(y_train_one_hot.dtype) # float64

(60000,)
(60000, 10)
float64


In [144]:
from sklearn.model_selection import train_test_split
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train_one_hot, test_size=0.2)
print(X_train.shape) # (48000, 784)
print(X_val.shape) # (12000, 784)

(48000, 784)
(12000, 784)


In [159]:
n_nodes1 = 400
n_nodes2 = 200
n_output = y_train_one_hot.shape[1]

model = ScratchDeepNeuralNetrowkClassifier( n_nodes1, n_nodes2, n_output, sigma=0.01, lr=0.001, batch_size=50, epoch=1, metrics="acc")

In [160]:
model.fit(X_train, y_train, X_val, y_val)

epoch:1 train_loss: 61.20113471646312 val_loss: 61.24853035625702 acc: 0.08966666666666667
