# リカレントニューラルネットワーク

## 【事前準備】

In [31]:
# scikit-learnの「set_config(display="diagram")」を使用するため、scikitlearnを最新verに更新
# !pip install scikit-learn==0.23.2 --target drive/My\ Drive/MyModule
# !pip install scikit-learn==0.23.2
# !pip install h5py==2.10
# !pip install keras==2.3.1
# !pip install keras-applications==1.0.7
# !pip install tensorflow==1.14
# !pip install -q -U albumentations   # データ拡張用ライブラリ
!pip list
## Google Drive上にインストールしたモジュールのインポート##
import sys
sys.path.append('/content/drive/My Drive/MyModule')

Package                       Version
----------------------------- --------------
absl-py                       0.12.0
alabaster                     0.7.12
albumentations                0.1.12
altair                        4.1.0
appdirs                       1.4.4
argcomplete                   1.12.3
argon2-cffi                   21.1.0
arviz                         0.11.2
astor                         0.8.1
astropy                       4.3.1
astunparse                    1.6.3
atari-py                      0.2.9
atomicwrites                  1.4.0
attrs                         21.2.0
audioread                     2.1.9
autograd                      1.3
Babel                         2.9.1
backcall                      0.2.0
beautifulsoup4                4.6.3
bleach                        4.0.0
blis                          0.4.1
bokeh                         2.3.3
Bottleneck                    1.3.2
branca                        0.4.2
bs4                           0.0.1
CacheControl

In [32]:
## モジュールのインポート ##
# 一般
import os
import glob
import re
import xml.etree.ElementTree as ET
import random
import math
import collections
# データ分析
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.style
%matplotlib inline
matplotlib.style.use('ggplot')
import seaborn as sns
from matplotlib.colors import ListedColormap
import matplotlib.patches as mpatches
# scikit-learn
from sklearn.model_selection import train_test_split
from sklearn.model_selection import KFold
from sklearn.model_selection import StratifiedKFold
from sklearn import metrics
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score
from sklearn.metrics import roc_auc_score
from sklearn.metrics import confusion_matrix
from sklearn.metrics import mean_squared_error
from sklearn.metrics import mean_absolute_error
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import LabelEncoder
from sklearn.pipeline import Pipeline
from sklearn.pipeline import make_pipeline
from sklearn.linear_model import LinearRegression
from sklearn.linear_model import LogisticRegression
from sklearn.linear_model import SGDRegressor
from sklearn.linear_model import SGDClassifier
from sklearn.svm import SVR
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeRegressor
from sklearn.tree import DecisionTreeClassifier, export_graphviz
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.datasets import load_iris
from sklearn.datasets import make_blobs
# 決定木可視化のためのツール
import graphviz
import pydotplus
# from IPython.display import Image
# from sklearn.externals.six import StringIO
# TensorFlow
import tensorflow as tf
# Keras
# from keras.datasets import mnist
# SciPy
from scipy.sparse import csr_matrix, csc_matrix, coo_matrix, lil_matrix
# 画像データ編集
from PIL import Image
import cv2
import albumentations as A
# 自然言語処理
from gensim.models import Word2Vec
# その他
from google.colab import drive #GoogleDriveモジュール
# drive.mount('/content/drive') #GoogleDriveのマウント
# os.chdir('/content/drive/My Drive/DIVE INTO CODE/Sprint/Sprint17/ObjectDetection-master')

## 【問題1】SimpleRNNのフォワードプロパゲーション実装
SimpleRNNのクラスSimpleRNNを作成してください。基本構造はFCクラスと同じになります。


フォワードプロパゲーションの数式は以下のようになります。ndarrayのshapeがどうなるかを併記しています。


バッチサイズをbatch_size、入力の特徴量数をn_features、RNNのノード数をn_nodesとして表記します。活性化関数はtanhとして進めますが、これまでのニューラルネットワーク同様にReLUなどに置き換えられます。


$$
a_t = x_{t}\cdot W_{x} + h_{t-1}\cdot W_{h} + B\\
h_t = tanh(a_t)
$$


a
t
 : 時刻tの活性化関数を通す前の状態 (batch_size, n_nodes)


h
t
 : 時刻tの状態・出力 (batch_size, n_nodes)


x
t
 : 時刻tの入力 (batch_size, n_features)


W
x
 : 入力に対する重み (n_features, n_nodes)


h
t
−
1
 : 時刻t-1の状態（前の時刻から伝わる順伝播） (batch_size, n_nodes)


W
h
 : 状態に対する重み。 (n_nodes, n_nodes)


B
 : バイアス項 (n_nodes,)

初期状態 
h
0
 はすべて0とすることが多いですが、任意の値を与えることも可能です。


上記の処理を系列数n_sequences回繰り返すことになります。RNN全体への入力 
x
 は(batch_size, n_sequences, n_features)のような配列で渡されることになり、そこから各時刻の配列を取り出していきます。


分類問題であれば、それぞれの時刻のhに対して全結合層とソフトマックス関数（またはシグモイド関数）を使用します。タスクによっては最後の時刻のhだけを使用することもあります。

### ●ScratchSimpleRNNClassifierクラス

In [54]:
class ScratchSimpleRNNClassifier():
    """
    ディープニューラルネットワーク分類器
    Parameters
    ----------
    Attributes
    ----------
    """
    def __init__(self, layer_list, epoch, batch_size, early_stop=None, random_state=None, verbose=True):
        self.verbose = verbose
        self.flag = 0
        self.layer_list = layer_list    # 各層のリスト（fitメソッドにて各層のクラスをインスタンス化）
        self.layer_num = len(layer_list)   # 層の数
        self.epoch = epoch  # エポック
        self.batch_size = batch_size    # バッチサイズ
        self.early_stop = early_stop    # 早期打ち切りのイテレーション回数
        self.random_state = random_state
        np.random.seed(random_state)   #乱数シードを設定
        self.enc = OneHotEncoder(handle_unknown='ignore', sparse=False)   # OneHotエンコーダ
        self.loss = np.array([])    # 損失関数（学習データ）
        self.val_loss = np.array([])    # 損失関数（検証データ）

    def fit(self, X, y, X_val=None, y_val=None):
        """
        ニューラルネットワーク分類器を学習する。
        Parameters
        ----------
        X : 次の形のndarray, shape (n_samples, n_features)
            訓練データの特徴量
        y : 次の形のndarray, shape (n_samples, )
            訓練データの正解値
        X_val : 次の形のndarray, shape (n_samples, n_features)
            検証データの特徴量
        y_val : 次の形のndarray, shape (n_samples, )
            検証データの正解値
        """
        n_samples = X.shape[0]    # サンプル数
        counter = 0   # 早期終了用のカウンタ

        # 学習に使用する各種変数を初期化
        self._init_variables(X)

        # 学習データの目的変数をOneHotエンコーディング
        Y = self.enc.fit_transform(y[:, np.newaxis])
        # ミニバッチ学習用のクラス作成
        get_mini_batch = GetMiniBatch(X, Y, batch_size=self.batch_size, seed=self.random_state)

        # 検証データも与えられている場合
        if ((X_val is not None) and (y_val is not None)):
            # OneHotエンコーディング
            Y_val = enc.transform(y_val[:, np.newaxis])

         
        # エポックの回数ループ
        for j in range(self.epoch):
            if self.verbose:   print("■エポック{}回目".format(j))   # デバッグ情報出力
            get_mini_batch.__iter__()  # ミニバッチのイテレータをリセット

            # 全バッチデータを学習完了するまでループ（＝イテレーションの回数）
            for i, (X_mini, Y_mini) in enumerate(get_mini_batch):
                if self.verbose:   print("■イテレーション{}回目".format(i))   # デバッグ情報出力
                # 入力層から出力層までの順伝播処理および逆伝播処理
                Zout = self._exec_propagation(X_mini, Y_mini)
                # 損失関数
                self.loss = np.append(self.loss, self._calcurate_loss(Y_mini, Zout))

                # 検証データも与えられている場合は検証データに対しても同様の処理を行う
                if ((X_val is not None) and (y_val is not None)):
                    # 入力層から出力層までの順伝播処理
                    Zout_val = self._exec_forward_propagation(X_val)
                    # 損失関数
                    self.val_loss = np.append(self.val_loss, self._calcurate_loss(Y_val, Zout_val))

                # 早期終了のイテレーション回数が設定されている場合、カウンタをインクリメント
                if (self.early_stop is not None):
                    counter += 1
                    # カウンタが設定値に到達したら学習を強制終了
                    if (counter >= self.early_stop):
                        break

            # カウンタが設定値に到達したら学習を強制終了
            if (self.early_stop is not None):
                if (counter >= self.early_stop):
                    break

        if self.verbose:   print("■学習完了")
        return

    def predict(self, X):
        """
        DNN分類器を使い推定する。
        Parameters
        ----------
        X : 次の形のndarray, shape (n_samples, n_features)
            特徴量データ
        Returns
        -------
        y_pred    次の形のndarray, shape (n_samples, )
            ラベルの推定値
        """
        Z = self._exec_forward_propagation(X)   # 順伝播処理
        y_pred = np.argmax(Z, axis=1)   
        return  y_pred

    def predict_proba(self, X):
        """
        DNN分類器を使い推定値の確率を出力する。
        Parameters
        ----------
        X : 次の形のndarray, shape (n_samples, n_features)
            特徴量データ
        Returns
        -------
        y_pred_proba    次の形のndarray, shape (n_samples, n_output)
            ラベルの各クラスごとの確率の推定値
        """
        y_pred_proba = self._exec_forward_propagation(X)   # 順伝播処理
        return  y_pred_proba

    def plot_learning_curve(self):
        """
        学習曲線をプロットする関数
        Parameters
        ----------
        None
        Returns
        -------
        None
        """
        # 学習データと検証データのそれぞれの損失関数の値をグラフに描画
        fig, ax = plt.subplots(figsize=(15, 10), dpi=50)
        ax.set_title("Learning Curve")
        ax.set_xlabel("Iteration Number")
        ax.set_ylabel("Loss")
        ax.plot(self.loss, color = "blue", label="train")
        ax.plot(self.val_loss, color = "red", label="validation")
        ax.legend(loc='best') # 凡例を最適位置に表示
        plt.show()
        return

    def _init_variables(self, X):
        """
        学習に使用する各種変数を初期化する
        Parameters
        ----------
        X : 次の形のndarray, shape (n_samples, n_features)
            特徴量データ
        Returns
        -------
        None
        """
        if self.verbose:   print("■_init_variablesメソッド開始")  # デバッグ情報出力
        n_samples = X.shape[0]    # サンプル数

        self.iter = int(n_samples / self.batch_size)   # イテレーション数
        self.layer = [LayerClass(**keywards) for LayerClass, keywards in self.layer_list]    # 各層のインスタンスのリスト

        if self.verbose:   print("■_init_variablesメソッド終了")  # デバッグ情報出力
        return

    def _exec_propagation(self, X, Y):
        """
        順伝播処理および逆伝播処理を実行する（学習データ用）
        Parameters
        ----------
        X : 次の形のndarray, shape (batch_size, n_sequence, n_features)
            特徴量データ
        Y : 次の形のndarray, shape (batch_size, n_output)
            訓練データの正解値(OneHotEncoding済み)
        Returns
        -------
        """
        # デバッグ情報出力
        if self.verbose:   print("■_exec_propagationメソッド開始")

        batch_size = X.shape[0]
        n_output = X.shape[0]

        # 各レイヤの出力値の初期化
        Z1 = np.zeros((batch_size, self.layer[0].n_nodes_current))

        ## 順伝播処理 ##
        # Xを(batch_size, n_sequence, n_features)⇒(n_sequence, batch_size, n_features)に軸変換
        X_transposed = np.transpose(X, (1, 0, 2))
        if self.verbose:  print("X_transposed.shape: {}".format(X_transposed.shape))    # デバッグ情報出力

        # 各時刻のXを順番に順伝播
        for X_snapshot in X_transposed:
            A1 = self.layer[0].forward(X_snapshot, Zout)   # SimpleRNNLayer
            Zout = self.layer[1].forward(A1)               # SigmoidLayer or TanhLayer or ReLU
        
        ## 逆伝播処理 ##
        # [TBD]

        if self.verbose:   print("■_exec_propagationメソッド終了")    # デバッグ情報出力
        return  Zout

    def _exec_forward_propagation(self, X):
        """
        入力層から出力層まで順伝播処理を実行する（検証データ用）
        Parameters
        ----------
        X : 次の形のndarray, shape (batch_size, n_features)
            特徴量データ
        Returns
        -------
        Zout : 次の形のndarray, shape (batch_size, n_output)
            出力層の出力
        """
        # デバッグ情報出力
        if self.verbose:   print("■_exec_forward_propagationメソッド開始")

        batch_size = X.shape[0]
        n_output = X.shape[0]

        # 各レイヤの出力値の初期化
        Z1 = np.zeros((batch_size, self.layer[0].n_nodes_current))

        ## 順伝播処理 ##
        # Xを(batch_size, n_sequence, n_features)⇒(n_sequence, batch_size, n_features)に軸変換
        X_transposed = np.transpose(X, (1, 0, 2))
        if self.verbose:  print("X_transposed.shape: {}".format(X_transposed.shape))    # デバッグ情報出力

        # 各時刻のXを順番に順伝播
        for X_snapshot in X_transposed:
            A1 = self.layer[0].forward(X_snapshot, Z1)   # SimpleRNNLayer
            Z1 = self.layer[1].forward(A1)               # SigmoidLayer or TanhLayer or ReLU

        Zout = Z1

        if self.verbose:   print("■_exec_forward_propagationメソッド終了")    # デバッグ情報出力
        return Zout

    def _calcurate_loss(self, Y, Z):
        """
        損失関数（交差エントロピー誤差: L = -1/(batch_size)*ΣΣy_jk*log(Z_jk)）の計算
        Parameters
        ----------
        Y : 次の形のndarray, shape (batch_size, n_output)
            正解ラベルデータ（OneHotEncoding済み）
        Z : 次の形のndarray, shape (batch_size, n_output)
            出力層での出力値
        Returns
        ----------
        loss : numpy.float
          損失関数(交差エントロピー誤差)
        """
        batch_size = Y.shape[0]
        n_output = Y.shape[1]
        sigma = 0

        sigma = (Y * np.log(Z)).sum()   # ΣΣy_jk*log(Z_jk)の計算
        loss = - (1 / batch_size) * sigma
        return  loss

### ●SimpleRNNLayerクラス

In [34]:
class SimpleRNNLayer:
    """
    今タイムステップの入力値と前タイムステップの出力値の両方を入力とする、RNNレイヤー
    Parameters
    ----------
    n_nodes_prev : int
      前の層のノード数
    n_nodes_current : int
      後の層のノード数
    initializer : 初期化方法のインスタンス
    optimizer : 最適化手法のインスタンス
    """
    def __init__(self, n_nodes_prev, n_nodes_current, initializer, optimizer):
        self.n_nodes_prev = n_nodes_prev
        self.n_nodes_current = n_nodes_current
        self.initializer = initializer  # WとBの初期化に使用
        self.optimizer = optimizer  # WとBの更新時に使用
        # initializerのメソッドを使い、self.Wとself.Bを初期化する
        # 重み ※FC層と異なり、RNN層の重みWには前タイムステップの出力値に対する重みも含まれる
        self.W =self.initializer.init_W((n_nodes_prev + n_nodes_current), n_nodes_current) 
        # バイアス
        self.B =self.initializer.init_B(n_nodes_prev, n_nodes_current)
        self.W_log = np.array([])
        self.B_log = np.array([])
        self.flag = 1

    def forward(self, Zprev, Alast):
        """
        フォワード
        Parameters
        ----------
        Zprev : 次の形のndarray, shape (batch_size, n_nodes_prev)
            入力
        Zlast : 次の形のndarray, shape (batch_size, n_nodes_current)
            当層での前回の出力A
        Returns
        ----------
        A : 次の形のndarray, shape (batch_size, n_nodes_current)
            出力
        """
        # A = (Zprev @ self.W) + self.B.reshape(1, self.B.shape[0])
        ZA = np.concatenate([Zprev, Alast], axis=1) # 前層からの入力Zprevと前回の出力Alastを横方向に結合
        A = (ZA @ self.W) + self.B.reshape(1, self.B.shape[0])
        return A

    def backward(self, dA, Zprev):
        """
        バックワード
        Parameters
        ----------
        dA : 次の形のndarray, shape (batch_size, n_nodes_current)
            後ろから流れてきた勾配
        Zprev : 次の形のndarray, shape (batch_size, prev_node_num)
            前層の出力Z（当層が第2層の場合、Zprev=Z1）
        Returns
        ----------
        dZprev : 次の形のndarray, shape (batch_size, prev_node_num)
            前に流す勾配
        """
        dW = Zprev.T @ dA
        dB = dA.sum(axis=0)
        dZprev = dA @ self.W.T

        # 更新
        self.W = self.optimizer.update(self.W, dW, "W")
        self.B = self.optimizer.update(self.B, dB, "B")

        # パラメータの一部のみログに記録
        if (self.flag == 1):
            self.W_log = self.W[0, :2].reshape(1, -1)
            self.B_log = self.B[:2].reshape(1, -1)
            self.flag = 0
        else:
            self.W_log = np.append(self.W_log, self.W[0, :2].reshape(1, -1), axis=0)
            self.B_log = np.append(self.B_log, self.B[:2].reshape(1, -1), axis=0)

        return dZprev

### ●FullyConnectedLayerクラス

In [35]:
class FullyConnectedLayer:
    """
    ノード数n_nodes_prevからn_nodes_currentへの全結合層
    Parameters
    ----------
    n_nodes_prev : int
      前の層のノード数
    n_nodes_current : int
      後の層のノード数
    initializer : 初期化方法のインスタンス
    optimizer : 最適化手法のインスタンス
    """
    def __init__(self, n_nodes_prev, n_nodes_current, initializer, optimizer):
        self.n_nodes_prev = n_nodes_prev
        self.n_nodes_current = n_nodes_current
        self.initializer = initializer  # WとBの初期化に使用
        self.optimizer = optimizer  # WとBの更新時に使用
        # initializerのメソッドを使い、self.Wとself.Bを初期化する
        self.W =self.initializer.init_W(n_nodes_prev, n_nodes_current) # 重み
        self.B =self.initializer.init_B(n_nodes_prev, n_nodes_current) # バイアス
        self.W_log = np.array([])
        self.B_log = np.array([])
        self.flag = 1

    def forward(self, Zprev):
        """
        フォワード
        Parameters
        ----------
        Zprev : 次の形のndarray, shape (batch_size, n_nodes_prev)
            入力
        Returns
        ----------
        A : 次の形のndarray, shape (batch_size, n_nodes_current)
            出力
        """
        A = (Zprev @ self.W) + self.B.reshape(1, self.B.shape[0])
        return A

    def backward(self, dA, Zprev):
        """
        バックワード
        Parameters
        ----------
        dA : 次の形のndarray, shape (batch_size, n_nodes_current)
            後ろから流れてきた勾配
        Zprev : 次の形のndarray, shape (batch_size, prev_node_num)
            前層の出力Z（当層が第2層の場合、Zprev=Z1）
        Returns
        ----------
        dZprev : 次の形のndarray, shape (batch_size, prev_node_num)
            前に流す勾配
        """
        dW = Zprev.T @ dA
        dB = dA.sum(axis=0)
        dZprev = dA @ self.W.T

        # 更新
        self.W = self.optimizer.update(self.W, dW, "W")
        self.B = self.optimizer.update(self.B, dB, "B")

        # パラメータの一部のみログに記録
        if (self.flag == 1):
            self.W_log = self.W[0, :2].reshape(1, -1)
            self.B_log = self.B[:2].reshape(1, -1)
            self.flag = 0
        else:
            self.W_log = np.append(self.W_log, self.W[0, :2].reshape(1, -1), axis=0)
            self.B_log = np.append(self.B_log, self.B[:2].reshape(1, -1), axis=0)

        return dZprev

### ●SigmoidLayerクラス

In [36]:
class SigmoidLayer:
    """
    シグモイド関数の層
    Parameters
    ----------
    """
    def __init__(self, verbose):
        self.verbose = verbose
        
    def forward(self, A):
        """
        順伝播処理
        Parameters
        ----------
        Returns
        ----------
        """
        if self.verbose:   print("■シグモイド関数（順伝播）実行")   # デバッグ情報出力
        Z = self._sigmoid_func(A)
        return Z

    def backward(self, A, dZ):
        """
        逆伝播処理
        Parameters
        ----------
        Returns
        ----------
        """
        if self.verbose:   print("■シグモイド関数（逆伝播）実行")   # デバッグ情報出力
        dA = dZ * (1 - self._sigmoid_func(A)) * self._sigmoid_func(A)
        return dA

    def _sigmoid_func(self, A):
        """
        シグモイド関数(1/(1+exp(-A)))の演算処理
        Parameters
        ----------
        Returns
        -------
        """
        Z = 1/(1 + np.exp(-A))
        return Z

### ●TanhLayerクラス

In [37]:
class TanhLayer:
    """
    ハイパボリックタンジェント関数の層
    Parameters
    ----------
    """
    def __init__(self, verbose):
        self.verbose = verbose
        
    def forward(self, A):
        """
        順伝播処理((exp(A)-exp(-A))/(exp(A)+exp(-A)))
        Parameters
        ----------
        Returns
        ----------
        """
        if self.verbose:   print("■ハイパボリックtan関数（順伝播）実行")   # デバッグ情報出力
        Z = self._tanh_func(A)
        return Z

    def backward(self, A, dZ):
        """
        逆伝播処理
        Parameters
        ----------
        Returns
        ----------
        """
        if self.verbose:   print("■ハイパボリックtan関数（逆伝播）実行")   # デバッグ情報出力
        dA = dZ * (1 - self._tanh_func(A)* self._tanh_func(A)) 
        return dA

    def _tanh_func(self, A):
        """
        ハイパボリックtan関数((exp(A)-exp(-A))/(exp(A)+exp(-A)))の演算処理
        Parameters
        ----------
        Returns
        -------
        """
        Z = (np.exp(A) - np.exp(-A)) / (np.exp(A) + np.exp(-A))
        return Z

### ●ReLULayerクラス

In [38]:
class ReLULayer:
    """
    ReLU関数の層
    Parameters
    ----------
    """
    def __init__(self, verbose):
        self.verbose = verbose
        
    def forward(self, A):
        """
        順伝播処理
            # df(x)/dx = { x (x > 0)
            #            { 0 (x <= 0)
        Parameters
        ----------
        Returns
        ----------
        """
        if self.verbose:   print("■ReLU関数（順伝播）実行")   # デバッグ情報出力
        # Z = np.where(A > 0, A, 0)
        self.mask = (A <= 0)
        Z = A.copy()
        Z[self.mask] = 0
        return Z

    def backward(self, A, dZ):
        """
        逆伝播処理
            # df(x)/dx = { 1 (x > 0)
            #            { 0 (x <= 0)
        Parameters
        ----------
        Returns
        ----------
        """
        if self.verbose:   print("■ReLU関数（逆伝播）実行")   # デバッグ情報出力
        # dA = np.where(A > 0, 1, 0)
        dZ[self.mask] = 0
        dA = dZ
        return dA

### ●SoftmaxLayerクラス

In [39]:
class SoftmaxLayer:
    """
    ソフトマックス関数の層
    Parameters
    ----------
    """
    def __init__(self, verbose):
        self.verbose = verbose

    def forward(self, A):
        """
        順伝播処理(Z_k = (exp(A_k)/(Σexp(A_i))
        Parameters
        ----------
        Returns
        ----------
        """
        if self.verbose:   print("■ソフトマックス関数実行")   # デバッグ情報出力
        batch_size = A.shape[0]   # バッチサイズ
        n_nodes_current = A.shape[1]    # 当層のノード数（＝目的変数のクラス数）
        Z = np.zeros((batch_size, n_nodes_current))

        sigma = np.exp(A).sum(axis=1)   # "Σexp(A_i) (i=1～10)"の部分の計算
        Z = (np.exp(A)) / sigma.reshape(-1, 1)
        return Z

    def backward(self, Z, Y):
        """
        逆伝播処理
        Parameters
        ----------
          Z : 順伝播処理の出力
          Y : 目的変数（OneHotEncoding済み）
        Returns
        ----------
          dA
        """
        if self.verbose:   print("■ソフトマックス関数（逆伝播）実行")   # デバッグ情報出力
        batch_size = Z.shape[0]   # バッチサイズ
        dA = (1 / batch_size) * (Z - Y)
        return dA

### ●SimpleInitializerクラス

In [40]:
class SimpleInitializer:
    """
    ガウス分布によるシンプルな初期化
    Parameters
    ----------
    sigma : float
      ガウス分布の標準偏差
    """
    def __init__(self, sigma):
        self.sigma = sigma

    def init_W(self, n_nodes_prev, n_nodes_current):
        """
        重みの初期化
        Parameters
        ----------
        n_nodes_prev : int
          前の層のノード数
        n_nodes_current : int
          後の層のノード数
        Returns
        ----------
        W :
        """
        W = self.sigma * np.random.randn(n_nodes_prev, n_nodes_current)
        
        return W

    def init_B(self, n_nodes_prev, n_nodes_current):
        """
        バイアスの初期化
        Parameters
        ----------
        n_nodes_current : int
          後の層のノード数
        Returns
        ----------
        B :
        """
        B = self.sigma * np.random.randn(n_nodes_current)
        return B

### ●XavierInitializerクラス

In [41]:
class XavierInitializer:
    """
    「Xavierの初期値」による初期化（シグモイド関数・ハイパボリックタンジェント関数の層で使用）
    Parameters
    ----------
    sigma : float
      ガウス分布の標準偏差
    """
    def __init__(self):
        pass
        
    def init_W(self, n_nodes_prev, n_nodes_current):
        """
        重みの初期化
        Parameters
        ----------
        n_nodes_prev : int
          前の層のノード数
        n_nodes_current : int
          後の層のノード数
        Returns
        ----------
        W :
        """
        W = (1 / np.sqrt(n_nodes_prev)) * np.random.randn(n_nodes_prev, n_nodes_current)
        return W

    def init_B(self, n_nodes_prev, n_nodes_current):
        """
        バイアスの初期化
        Parameters
        ----------
        n_nodes_prev : int
          前の層のノード数
        n_nodes_current : int
          後の層のノード数
        Returns
        ----------
        B :
        """
        B = (1 / np.sqrt(n_nodes_prev)) * np.random.randn(n_nodes_current)
        return B

### ●HeInitializerクラス

In [42]:
class HeInitializer:
    """
    「Heの初期値」による初期化（ReLU関数の層で使用）
    Parameters
    ----------
    sigma : float
      ガウス分布の標準偏差
    """
    def __init__(self):
        pass
        
    def init_W(self, n_nodes_prev, n_nodes_current):
        """
        重みの初期化
        Parameters
        ----------
        n_nodes_prev : int
          前の層のノード数
        n_nodes_current : int
          後の層のノード数
        Returns
        ----------
        W :
        """
        W = (np.sqrt(2 / n_nodes_prev)) * np.random.randn(n_nodes_prev, n_nodes_current)
        return W
        
    def init_B(self, n_nodes_prev, n_nodes_current):
        """
        バイアスの初期化
        Parameters
        ----------
        n_nodes_prev : int
          前の層のノード数
        n_nodes_current : int
          後の層のノード数
        Returns
        ----------
        B :
        """
        B = (np.sqrt(2 / n_nodes_prev)) * np.random.randn(n_nodes_current)
        return B

### ●SGDクラス

In [43]:
class SGD:
    """
    確率的勾配降下法
    Parameters
    ----------
    lr : 学習率
    """
    def __init__(self, lr):
        self.lr = lr

    def update(self, param, grad, param_name):
        """
        ある層の重みやバイアスの更新
        Parameters
        ----------
        param : 更新前の層のインスタンス (shape: (n_nodes_prev, n_nodes_current))
        grad : 勾配 (shape: (n_nodes_prev, n_nodes_current))
        param_name : パラメータの名称   ※SGDでは未使用
        """
        ret_param = param - self.lr * grad
        return ret_param

### ●AdaGradクラス

In [44]:
class AdaGrad:
    """
    AdaGradによるパラメータの更新
    Parameters
    ----------
    lr : 学習率
    """
    def __init__(self, lr):
        self.lr = lr
        self.flag = 1
        self.H = {}

    def update(self, param, grad, param_name):
        """
        ある層の重みやバイアスの更新
        Parameters
        ----------
        param : 更新前の層のインスタンス (shape: (n_nodes_prev, n_nodes_current))
        grad : 勾配 (shape: (n_nodes_prev, n_nodes_current))
        param_name : パラメータの名称
        """
        # 初回のみ
        if (param_name not in self.H):
            dic = {param_name : grad * grad + 1e-7}   # 0割防止のため1e-7を加算
        # 2回目以降
        else:
            dic = {param_name : (self.H[param_name] + grad * grad)}   # 前回までのself.H[param_name]にgrad * gradを加算
        self.H.update(dic)    # self.H[param_name]の値を更新
        ret_param = param -  self.lr * (1 / np.sqrt(self.H[param_name])) *grad
        # print("AdaGradの勾配の2乗和:")
        # print((1 / np.sqrt(self.H[param_name]))[0])
        return ret_param

### ●GetMiniBatchクラス

In [45]:
class GetMiniBatch:
    """
    ミニバッチを取得するイテレータ
    Parameters
    ----------
    X : 次の形のndarray, shape (n_samples, n_features)
      訓練データ
    y : 次の形のndarray, shape (n_samples, 1)
      正解値
    batch_size : int
      バッチサイズ
    seed : int
      NumPyの乱数のシード
    """
    def __init__(self, X, y, batch_size = 20, seed=0):
        self.batch_size = batch_size
        np.random.seed(seed)
        shuffle_index = np.random.permutation(np.arange(X.shape[0]))
        self._X = X[shuffle_index]
        self._y = y[shuffle_index]
        self._stop = np.ceil(X.shape[0]/self.batch_size).astype(np.int)
        self._counter = 0
    def __len__(self):
        return self._stop
    def __getitem__(self,item):
        p0 = item*self.batch_size
        p1 = item*self.batch_size + self.batch_size
        return self._X[p0:p1], self._y[p0:p1]        
    def __iter__(self):
        self._counter = 0
        return self
    def __next__(self):
        if self._counter >= self._stop:
            raise StopIteration()
            # print("Warning: バッチデータが最後に達しました！　データの先頭に戻ります!!!(self._counter = {})".format(self._counter))
            # self._counter = 0
        p0 = self._counter*self.batch_size
        p1 = self._counter*self.batch_size + self.batch_size
        self._counter += 1
        return self._X[p0:p1], self._y[p0:p1]


# ## ミニバッチデータを取得するサンプルコード ##
# get_mini_batch = GetMiniBatch(X_train1, y_train1, batch_size=20, seed=0)
# # get_mini_batch = GetMiniBatch(X_test1, y_test1, batch_size=20, seed=0)
# print("len(get_mini_batch):")
# print(len(get_mini_batch)) # 2400
# print("get_mini_batch[5]:")
# print(get_mini_batch[5]) # 5番目のミニバッチが取得できる
# # for i, (mini_X_train, mini_y_train) in enumerate(get_mini_batch):
# #     # このfor文内でミニバッチが使える
# #     # print("■{}番目のmini_X_train".format(i))
# #     # print(mini_X_train[0])
# #     pass

# get_mini_batch.__iter__()
# for i in range(2):
#     mini_X_train, mini_y_train = get_mini_batch.__next__()
#     print(mini_X_train, mini_y_train)

## 【問題2】小さな配列でのフォワードプロパゲーションの実験
小さな配列でフォワードプロパゲーションを考えてみます。


入力x、初期状態h、重みw_xとw_h、バイアスbを次のようにします。


ここで配列xの軸はバッチサイズ、系列数、特徴量数の順番です。

In [46]:
x = np.array([[[1, 2], [2, 3], [3, 4]]])/100 # (batch_size, n_sequences, n_feature)
print(x.shape)
w_x = np.array([[1, 3, 5, 7], [3, 5, 7, 8]])/100 # (n_features, n_nodes)
print(w_x.shape)
w_h = np.array([[1, 3, 5, 7], [2, 4, 6, 8], [3, 5, 7, 8], [4, 6, 8, 10]])/100 # (n_nodes, n_nodes)
print(w_h.shape)
batch_size = x.shape[0] # 1
n_sequences = x.shape[1] # 3
n_features = x.shape[2] # 2
n_nodes = w_x.shape[1] # 4
h = np.zeros((batch_size, n_nodes)) # (batch_size, n_nodes)
b = np.array([1, 1, 1, 1]) # (n_nodes,)

(1, 3, 2)
(2, 4)
(4, 4)


フォワードプロパゲーションの出力が次のようになることを作成したコードで確認してください。

In [47]:
h = np.array([[0.79494228, 0.81839002, 0.83939649, 0.85584174]]) # (batch_size, n_nodes)

### 【問題2 解答】↓↓↓

In [55]:
# %%time
epoch = 1   # エポック
lr=1    # 学習率
# batch_size = 20   # バッチサイズ
early_stop = None   # 早期終了する場合のイテレーション回数（デバッグ用）
verbose = False  # デバッグ情報出力のスイッチ

             # LayerClass,          {keywards}
layer_list = [(SimpleRNNLayer,      {"n_nodes_prev":n_features, "n_nodes_current":n_nodes, "initializer":SimpleInitializer(sigma=0.01), "optimizer":SGD(lr)}),
              (TanhLayer,           {"verbose":verbose})]

# ScratchSimpleRNNClassifierをインスタンス化
clf = ScratchSimpleRNNClassifier(    #推定器
        layer_list=layer_list, 
        epoch=epoch,
        batch_size=batch_size, 
        early_stop=early_stop,
        random_state=0, verbose = verbose)

clf._init_variables(x)  # 各レイヤのインスタンス化等
clf.layer[0].W = np.concatenate([w_x, w_h], axis=0)   # 重み
clf.layer[0].B = b  # バイアス

h = clf._exec_forward_propagation(x)  # 順伝播処理実行
print(h)

[[0.79494228 0.81839002 0.83939649 0.85584174]]


## 【問題3】（アドバンス課題）バックプロパゲーションの実装
バックプロパゲーションを実装してください。


RNNの内部は全結合層を組み合わせた形になっているので、更新式は全結合層などと同様です。

$$
W_x^{\prime} = W_x - \alpha \frac{\partial L}{\partial W_x} \\
W_h^{\prime} = W_h - \alpha \frac{\partial L}{\partial W_h} \\
B^{\prime} = B - \alpha \frac{\partial L}{\partial B}
$$