### 下ごしらえ

In [1]:
import numpy as np
from keras.datasets import mnist
from sklearn.model_selection import train_test_split
import logging
from datetime import datetime
import time

Using TensorFlow backend.


In [2]:
(X_train, y_train), (X_test, y_test) = mnist.load_data()

# 平坦化
X_train = X_train.reshape(-1, 784)
X_test = X_test.reshape(-1, 784)

# 前処理
X_train = X_train.astype(np.float)
X_test = X_test.astype(np.float)
X_train /= 255
X_test /= 255

# 学習データをスプリット
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2)

## GetMiniBatchクラス

In [3]:
import numpy as np

class GetMiniBatch:
    """
    ミニバッチを取得するイテレータ

    Parameters
    ----------
    X : 次の形のndarray, shape (n_samples, n_features)
      学習データ
    y : 次の形のndarray, shape (n_samples, 1)
      正解値
    batch_size : int
      バッチサイズ
    seed : int
      NumPyの乱数のシード
    """
    def __init__(self, X, y, batch_size = 10, seed=0):
        self.batch_size = batch_size
        np.random.seed(seed)
        shuffle_index = np.random.permutation(np.arange(X.shape[0]))
        self.X = X[shuffle_index]
        self.y = y[shuffle_index]
        self._stop = np.ceil(X.shape[0]/self.batch_size).astype(np.int)

    def __len__(self):
        return self._stop

    def __getitem__(self,item):
        p0 = item*self.batch_size
        p1 = item*self.batch_size + self.batch_size
        return self.X[p0:p1], self.y[p0:p1]        

    def __iter__(self):
        self._counter = 0
        return self

    def __next__(self):
        if self._counter >= self._stop:
            raise StopIteration()
        p0 = self._counter*self.batch_size
        p1 = self._counter*self.batch_size + self.batch_size
        self._counter += 1
        return self.X[p0:p1], self.y[p0:p1]

## 【問題1】全結合層のクラス化


In [4]:
class FC:
    """
    ノード数n_nodes1からn_nodes2への全結合層
    Parameters
    ----------
    n_nodes1 : int
      前の層のノード数
    n_nodes2 : int
      後の層のノード数
    initializer : 初期化方法のインスタンス
    optimizer : 最適化手法のインスタンス
    """

    def __init__(self, n_nodes1, n_nodes2, initializer, optimizer):
        self.optimizer = optimizer
        self.initializer = initializer
        self.n_nodes1 = n_nodes1
        self.n_nodes1 = n_nodes2
        self.W = initializer.W(n_nodes1, n_nodes2)
        self.B = initializer.B(n_nodes2)
        self.Z = None
        self.dA = None 
        

    def forward(self, Z):
        """
        フォワード
        Parameters
        ----------
        Z : 次の形のndarray, shape (batch_size, n_nodes1)
            入力
            
        Returns
        ----------
        A : 次の形のndarray, shape (batch_size, n_nodes2)
            出力
        """        
        
        # backwardで使うためインスタンス変数化
        self.Z = Z
        
        A = (Z @ self.W) + self.B
        
        return A
      

    def backward(self, dA):
        """
        バックワード
        Parameters
        ----------
        dA : 次の形のndarray, shape (batch_size, n_nodes2)
            後ろから流れてきた勾配
        Returns
        ----------
        dZ : 次の形のndarray, shape (batch_size, n_nodes1)
            前に流す勾配
        """
        
        # 重みインスタンス変数化
        self.dA = dA
        
        dZ = (dA @ self.W.T)
        
        # self.W self.Bの更新
        self = self.optimizer.update(self)
        
        return dZ

## 【問題2】初期化方法のクラス化

In [5]:
import numpy as np

class SimpleInitializer:
    """
    ガウス分布によるシンプルな初期化
    Parameters
    ----------
    sigma : float
      ガウス分布の標準偏差
    """
    
    def __init__(self, sigma):
        self.sigma = sigma
        
        
    def W(self, n_nodes1, n_nodes2):
        """
        重みの初期化
        Parameters
        ----------
        n_nodes1 : int
          前の層のノード数
        n_nodes2 : int
          後の層のノード数

        Returns
        ----------
        W : 次の形のndarray, shape (n_nodes1, n_nodes2)
          重さ
        """
        
        W = self.sigma * np.random.randn(n_nodes1, n_nodes2)

        return W


    def B(self, n_nodes2):
        """
        バイアスの初期化
        Parameters
        ----------
        n_nodes2 : int
          後の層のノード数

        Returns
        ----------
        B :次の形のndarray, shape (n_nodes2,)
          バイアス
        """
        
        B = self.sigma * np.random.randn(n_nodes2)
        
        return B

## 【問題3】最適化手法のクラス化

In [6]:
import numpy as np

class SGD:
    """
    確率的勾配降下法
    Parameters
    ----------
    lr : 学習率
    """
    
    def __init__(self, lr):
        self.lr = lr
        
        
    def update(self, layer):
        """
        ある層の重みやバイアスの更新
        Parameters
        ----------
        layer : 更新前の層のインスタンス

        Returns
        ----------
        layer : 更新後の層のインスタンス
        """
        
        # 重みとバイアスを更新
        layer.W -= self.lr * layer.Z.T @ layer.dA
        layer.B -= np.mean(layer.dA, axis=0)
        
        return layer

## 【問題4】活性化関数のクラス化
## 【問題5】ReLUクラスの作成

In [7]:
import numpy as np

class Sigmoid:
    """
    シグモイド関数汎用クラス
    
    """
      
    def forward(self, A):
        # 入力の最大値
        sigmoid_range = 34.538776394910684
        
        Z = 1 / (1 + np.exp(-np.clip(A, -sigmoid_range, sigmoid_range)))
        
        return Z
      

    def backward(self, Z):
        return self.forward(Z) * (1 - self.forward(Z))
        

In [8]:
import numpy as np

class Softmax:
    """
    ソフトマックス汎用クラス
    
    """
      
    def forward(self, X):
        max_X = np.max(X)
        exp_X = np.exp(X - max_X)
        sum_exp_X = np.sum(exp_X, axis=1).reshape(-1, 1)
        
        Z = exp_X / sum_exp_X
        
        return Z

In [9]:
import numpy as np

# TODO

class ReLU:
    """
    ReLU汎用クラス
    Parameters
    ----------
    lr : 学習率
    """

      

    def backward(Z):
        return self.forward(Z) * (1 - self.forward(Z))
        

## ScratchDeepNeuralNetworkClassifierクラス

In [20]:
import numpy as np
import logging                                                                     # ログ
from datetime import datetime                                        # 時間のやつ
from sklearn.preprocessing import OneHotEncoder       # ワンホットのやつ
from tqdm import tqdm                                                     # 進捗バーを出してくれるやつ


class ScratchDeepNeuralNetworkClassifier():
    """
    ニューラルネットワーク分類器

    Parameters
    ----------
    verbose : bool
        学習過程を出力する場合はTrue
    epochs : int
        エポック数（イテレーション数）
    batch_size : int
        バッチサイズ

    Attributes
    ----------
    self.loss_ : dict
        イテレーションごとのcostとaccuracy
    """

    def __init__(self, batch_size=10, n_epochs=50,  n_nodes=400, layer=4, verbose=True,
                            sigma=1e-2, lr=1e-2, activation=None):
        self.batch_size = batch_size     # バッチサイズ
        self.n_epochs = n_epochs         # エポック数 
        self.n_input = n_nodes              # 初回のノード数
        self.layer = layer                          # 層の数
        self.verbose = verbose               # 学習過程出力フラグ
        self.loss_ = []                              # 学習用データの学習過程格納用
        self.loss_val_ = []                       # 検証用データの学習過程格納用
        self.FCs = []                                  # FCインスタンス格納用
        
        # 初期化・最適化クラスインスタンス作成
        self.initializer = SimpleInitializer(sigma)
        self.optimizer = SGD(lr)
        # ワンホットライブラリのインスタンス作成
        self.enc = OneHotEncoder(handle_unknown='ignore', sparse=False)
        
        # アクティベーションをインスタンス化(デフォルトReLU)
        #self.activation = ReLU()
        # 引数で指定された場合は他の活性化関数クラスのインスタンス作成
        if activation == 'sigmoid':
            self.activation = Sigmoid()
        elif activation == 'tanh':
            self.activation = Tanh()
        # 最後はsoftmaxで
        self.activation_final = Softmax()
        # ログレベルを DEBUG に変更
        time_stamp = datetime.now().strftime('%Y%m%d')
        logging.basicConfig(filename='../log/' + time_stamp + '.log', level=logging.DEBUG)
    
    
    def fit(self, X, y, X_val=None, y_val=None):
        """
        ニューラルネットワーク分類器を学習する。

        Parameters
        ----------
        X : 次の形のndarray, shape (n_samples, n_features)
            学習用データの特徴量
        y : 次の形のndarray, shape (n_samples, )
            学習用データの正解値
        X_val : 次の形のndarray, shape (n_samples, n_features)
            検証用データの特徴量
        y_val : 次の形のndarray, shape (n_samples, )
            検証用データの正解値
        """
        
        # ワンホット化
        y = self.enc.fit_transform(y[:, np.newaxis])
        
        # 検証用データがある場合
        if X_val is not None:
            y_val= self.enc.fit_transform(y_val[:, np.newaxis])
        
        # 学習用データから特徴量とクラス数を取得
        n_features = X.shape[1]
        n_output = y.shape[1]
        
        # 初期化
        n_nodes1 = self.n_input
        
        # 各層のFCインスタンス作成
        for i in range(self.layer):
            if i == 0:                              # 入力層
                fc = FC(n_features, n_nodes1, self.initializer, self.optimizer)
            elif i == (self.layer - 1):     # 出力層
                fc = FC(n_nodes2, n_output, self.initializer, self.optimizer)
            else:
                # TODO: 多分よロしくない
                n_nodes2 = int(n_nodes1 / 2)
                fc = FC(n_nodes1, n_nodes2, self.initializer, self.optimizer)
                n_nodes1 = n_nodes2
                
            self.FCs.append(fc)          # 格納


        # エポックごとに進捗率を計測
        for e in tqdm(range(self.n_epochs)):
            # ミニバッチ化
            get_mini_batch = GetMiniBatch(X, y, batch_size=self.batch_size)
            # ロス格納用
            loss = []
            
            # Xのn_samples / batch_size数分ループ処理
            for mini_X_train, mini_y_train in get_mini_batch:
                # forward propagation
                for i in range(self.layer):
                    if i == 0:                              # 入力層
                        A = self.FCs[i].forward(mini_X_train)
                        Z = self.activation.forward(A)
                    elif i == (self.layer - 1):     # 出力層
                        A = self.FCs[i].forward(Z)
                        Z = self.activation_final.forward(A)
                    else:
                        A = self.FCs[i].forward(Z)
                        Z = self.activation.forward(A)                
                
                # back propagation
                for i in range(self.layer):
                    n_FC = self.layer - i - 1      # FCインスタンス指定用
                    if i == 0:                               # 入力層
                        dA = Z - mini_y_train     # TODO:ソフトマックス関数のバックプロパゲーションに交差エントロピー誤差の計算も含む実装
                        dZ = self.FCs[n_FC].backward(A)
                        loss.append(self._cross_entropy(Z, mini_y_train))
                    else:                                     # 出力層
                        dA = self.activation.backward(dZ)
                        dZ = self.FCs[n_FC].backward(dA)
            

            # 誤差を格納
            self.loss_.append(np.sum(loss))
            
            # 検証用データがある場合
            if X_val is not None:
                pass
                
                loss_val = cross_entropy(y_val, Z3)

                self.loss_val_.append(loss_val)
            
            if self.verbose:
                logging.info('forward propagation %sエポック目 sum: %s shape: %s', e+1, np.sum(A), A.shape)
                logging.info('forward propagation %sエポック目 sum: %s shape: %s', e+1, np.sum(Z), Z.shape)
                logging.info('forward propagation %sエポック目 sum: %s shape: %s', e+1, np.sum(dA), dA.shape)
                logging.info('forward propagation %sエポック目 sum: %s shape: %s', e+1, np.sum(dZ), dZ.shape)
                    
        return self

    
    def _cross_entropy(self, y_pred, y_true):
        return (-1) * np.sum(y_true * np.log(y_pred))
    
    
    def predict(self, X):
        """
        ニューラルネットワーク分類器を使い推定する。

        Parameters
        ----------
        X : 次の形のndarray, shape (n_samples, n_features)
            学習データ

        Returns
        -------
        y_pred :  次の形のndarray, shape (n_samples, 1)
            推定結果
        """
        # フォワードプロパゲーション
        pass
        
        # 一番確率が高いラベルを予測値に
        y_pred = np.argmax(Z3, axis=1)
        
        return y_pred

In [21]:
sdnnc = ScratchDeepNeuralNetworkClassifier(n_epochs=1, activation='sigmoid')
sdnnc.fit(X_train, y_train)
print(sdnnc.loss_)

100%|██████████| 1/1 [00:12<00:00, 12.43s/it]

[110524.31400166763]





### 寄り道

In [12]:
# ログを出す
import logging
from datetime import datetime

# タイムスタンプ作成
time_stamp = datetime.now().strftime('%Y%m%d')

# ログレベルを DEBUG に変更
logging.basicConfig(filename='../log/' + time_stamp + '.log', level=logging.DEBUG)

logging.info('info %s %s', 'hoge', 'fuga')

In [13]:
def sigmoid(X):
    sigmoid_range = 34.538776394910684     # 入力の最大値
    return 1 / (1 + np.exp(-np.clip(X, -sigmoid_range, sigmoid_range)))

X = np.array([1,5,6,1,3])

start = time.time()
X = sigmoid(X)
process_time = time.time() - start
logging.info('sigmoid %s', process_time)

def sigmoid_derivative(X):
    return sigmoid(X) * (1 - sigmoid(X))

start = time.time()
X = sigmoid(X)
process_time = time.time() - start
logging.info('sigmoid_derivative %s', process_time)

X = np.array([[1,5,6,1,3],[2,4,5,1,2]])

def softmax(X):
    max_X = np.max(X)
    exp_X = np.exp(X - max_X)
    sum_exp_X = np.sum(exp_X, axis=1).reshape(-1, 1)
    return exp_X / sum_exp_X

start = time.time()
X = softmax(X)
process_time = time.time() - start
logging.info('softmax %s', process_time)

X = np.array([1,5,6,1,3])
y = np.array([4,5,2,1,3])

def cross_entropy(y_true, y_pred):
    return (-1) * np.sum(y_true * np.log(y_pred))

start = time.time()
X = cross_entropy(X, y)
process_time = time.time() - start
logging.info('cross_entropy %s', process_time)