# RNN

## リカレントニューラルネットワークスクラッチ
リカレントニューラルネットワーク（RNN） のクラスをスクラッチで作成していきます。NumPyなど最低限のライブラリのみを使いアルゴリズムを実装していきます。


フォワードプロパゲーションの実装を必須課題とし、バックプロパゲーションの実装はアドバンス課題とします。


クラスの名前はScratchSimpleRNNClassifierとしてください。クラスの構造などは以前のSprintで作成したScratchDeepNeuralNetrowkClassifierを参考にしてください。

### 【問題1】SimpleRNNのフォワードプロパゲーション実装

SimpleRNNのクラスSimpleRNNを作成してください。基本構造はFCクラスと同じになります。


フォワードプロパゲーションの数式は以下のようになります。ndarrayのshapeがどうなるかを併記しています。


バッチサイズをbatch_size、入力の特徴量数をn_features、RNNのノード数をn_nodesとして表記します。活性化関数はtanhとして進めますが、これまでのニューラルネットワーク同様にReLUなどに置き換えられます。

$$
a_t = x_{t}\cdot W_{x} + h_{t-1}\cdot W_{h} + B\\
h_t = tanh(a_t)
$$


at : 時刻tの活性化関数を通す前の状態 (batch_size, n_nodes)


ht : 時刻tの状態・出力 (batch_size, n_nodes)


xt : 時刻tの入力 (batch_size, n_features)


Wx : 入力に対する重み (n_features, n_nodes)


ht-1 : 時刻t-1の状態（前の時刻から伝わる順伝播） (batch_size, n_nodes)


Wh : 状態に対する重み。 (n_nodes, n_nodes)


B : バイアス項 (n_nodes,)

初期状態 h0は全て0とすることが多いですが、任意の値を与えることも可能です。


上記の処理を系列数n_sequences回繰り返すことになります。RNN全体への入力xは(batch_size, n_sequences, n_features)のような配列で渡されることになり、そこから各時刻の配列を取り出していきます。


分類問題であれば、それぞれの時刻のhに対して全結合層とソフトマックス関数（またはシグモイド関数）を使用します。タスクによっては最後の時刻のhだけを使用することもあります。

In [1]:
import numpy as np

In [2]:
class SimpleRNN():
    """
    ノード数n_nodes1からn_nodes2への全結合層
    Parameters
    ----------
    n_nodes1 : int
      前の層のノード数
    n_nodes2 : int
      後の層のノード数
    initializer : 初期化方法のインスタンス
    optimizer : 最適化手法のインスタンス
    """
    def __init__(self, batch_size, initializer, optimizer):
        self.optimizer = optimizer
        # 初期化
        # 【問題1】
        self.w_x = np.array([[1, 3, 5, 7], [3, 5, 7, 8]])/100 # (n_features, n_nodes)
        self.w_h = np.array([[1, 3, 5, 7], [2, 4, 6, 8], [3, 5, 7, 8], [4, 6, 8, 10]])/100 # (n_nodes, n_nodes)
        self.b = np.array([1, 1, 1, 1]) # (n_nodes,)
        
        #n_nodes = self.w_x.shape[1] # 4
        #self.h = np.zeros((batch_size, n_nodes)) # (batch_size, n_nodes)
        
        # initializerのメソッドを使い、self.Wとself.Bを初期化する
        #self.W = initializer.W(n_nodes1, n_nodes2)
        #self.B = initializer.B(n_nodes2)
    
    def forward(self, x, h_prev):
        """
        フォワード
        Parameters
        ----------
        X : 次の形のndarray, shape (batch_size, n_nodes1)
            入力
        Returns
        ----------
        A : 次の形のndarray, shape (batch_size, n_nodes2)
            出力
        """
        #【問題２】
        self.x = x
        
        #入力xと前の層の重みht−１に重みをかけて足す
        A = self.x@self.w_x + h_prev@self.w_h + self.b.T#.reshape(-1,1)
        h_next = np.tanh(A)# 次の計算で使う（コンテクスト）
        
        self.A = A
        self.H = h_next
        
        return h_next
    
    # 【問題３】
    def backward(self, dH):
        """
        バックワード
        Parameters
        ----------
        dA : 次の形のndarray, shape (batch_size, n_nodes2)
            後ろから流れてきた勾配
        Returns
        ----------
        dZ : 次の形のndarray, shape (batch_size, n_nodes1)
            前に流す勾配
        """
        # 更新
        self, dX, dH_prev = self.optimizer.update(self, dH)
        
        return (dX, dH_prev)

In [3]:
class Tanh():
    """
    活性化関数（シグモイド関数）のクラス

    Attributes
    ----------
    """
    def forward(self, A):
        """
        順伝播用

        Parameters
        ----------
        X : 次の形のndarray, shape (n_samples, n_features)
            訓練データ
        """
        H = np.tanh(A)
        
        return H
    

    def backward(self, X, dZ):
        """
        逆伝播用

        Parameters
        ----------
        X : 次の形のndarray, shape (n_samples, n_features)
            訓練データ
        """
        dA = dZ*(1 - np.tanh(X)**2)
        
        return dA

In [4]:
class SimpleInitializer():
    """
    ガウス分布によるシンプルな初期化
    Parameters
    ----------
    sigma : float
      ガウス分布の標準偏差
    """
    def __init__(self, sigma):
        self.sigma = sigma
        
    def W(self, n_nodes1, n_nodes2):
        """
        重みの初期化
        Parameters
        ----------
        n_nodes1 : int
          前の層のノード数
        n_nodes2 : int
          後の層のノード数

        Returns
        ----------
        W :
        """
        W = self.sigma * np.random.randn(n_nodes1, n_nodes2)
        
        return W
    
    def B(self, n_nodes2):
        """
        バイアスの初期化
        Parameters
        ----------
        n_nodes2 : int
          後の層のノード数

        Returns
        ----------
        B :
        """
        #【問題2】
        B = self.sigma * np.random.randn(n_nodes2,)
        
        return B

In [5]:
class SGD():
    """
    確率的勾配降下法
    Parameters
    ----------
    lr : 学習率
    """
    def __init__(self, lr):
        self.lr = lr
        
    def update(self, layer, dH):#Z3, Y, 
        """
        ある層の重みやバイアスの更新
        Parameters
        ----------
        layer : 更新前の層のインスタンス
        """
        #【問題3】
        
        dAt = dH*(1 - np.tanh(layer.A)**2)
        
        dWx = layer.x.reshape(-1,1)@dAt
        dWh = layer.H.T@dAt
        
        layer.w_x = layer.w_x - self.lr*(dWx)
        layer.w_h = layer.w_h - self.lr*(dWh)
        layer.b = layer.b - self.lr*(dAt)
        
        dX = dAt@layer.w_x.T
        dH_prev = dAt@layer.w_h.T
        
        return (layer, dX, dH_prev)

In [6]:
class Softmax():
    """
    活性化関数（シグモイド関数）のクラス

    Attributes
    ----------
    """
    def forward(self, X):
        """
        準伝播用

        Parameters
        ----------
        X : 次の形のndarray, shape (n_samples, n_features)
            訓練データ
        """
        Z = np.exp(X)/(np.sum(np.exp(X), axis=1).reshape(-1,1))
        
        return Z
    

    def backward(self, X, y):
        """
        逆伝播用
        Parameters
        ----------
        X : 次の形のndarray, shape (n_samples, n_features)
            訓練データ
        """
        dA =X - y
        
        # 目的関数（損失関数）　交差エントロピー誤差
        nb = y.shape[0]#バッチサイズ
        L = -(1/nb)*(np.sum(y*np.log(X)))
        
        return (dA, L)

In [7]:
class ScratchSimpleRNNClassifier():
    """
    RNNのクラス
    Parameters
    ----------
    self.n_features 
    
    self.n_nodes1
    
    self.sigma
    
    """
    def __init__(self, lr, sigma = 0.01):
        self.lr = lr
        self.sigma = sigma
    
    def fit(self, x, epoch):
        #initializer = SimpleInitializer(n_nodes)
        initializer = SimpleInitializer(1)
        optimizer = SGD(self.lr)
        batch_size = x.shape[0] # 1

        
        self.RNN = SimpleRNN(batch_size, initializer, optimizer)
        self.activation = Softmax()
        
        
        n_nodes = 4#self.w_x.shape[1] # 4
        h = np.zeros((batch_size, n_nodes)) # (batch_size, n_nodes)
        
        
        for i in range(epoch):

            for b in range(batch_size):
                # フォワード処理
                H1 = self.RNN.forward(x[0, 0], h)
                y1= self.activation.forward(H1)
                H2 = self.RNN.forward(x[0, 1], H1)
                y2= self.activation.forward(H2)
                self.H3 = self.RNN.forward(x[0, 2], H2)
                y3= self.activation.forward(H3)

                # 問題３
                # バックワード処理
                self.dX2, dH2 = self.RNN.backward(H3)
                self.dX1, dH1= self.RNN.backward(dH2)
                self.dX0, self.dH0 = self.RNN.backward(dH1)
                
        
#    def predict(self,X):
#
#        # 最初のZはXなので代入
#        Z = X
#
#        # フォワード処理
#        for l in range(self.layer):
#            A = self.FC_list[l].forward(Z)
#            Z = self.activation_list[l].forward(A)
#            
#        y = self.activation_list[-1].forward(A)
#        pred = np.argmax(y, axis=1)
#        
#        #A1 = self.FC_list[0].forward(X)
#        #Z1 = self.activation_list[0].forward(A1)
#        #A2 = self.FC_list[1].forward(Z1)
#        #Z2 = self.activation_list[1].forward(A2)
#        #A3 = self.FC_list[2].forward(Z2)
#        #y = self.activation_list[2].forward(A3)
#        #pred = np.argmax(y, axis=1)
#        
#        return pred

### 【問題2】小さな配列でのフォワードプロパゲーションの実験

小さな配列でフォワードプロパゲーションを考えてみます。


入力x、初期状態h、重みw_xとw_h、バイアスbを次のようにします。


ここで配列xの軸はバッチサイズ、系列数、特徴量数の順番です。

In [9]:
x = np.array([[[1, 2], [2, 3], [3, 4]]])/100 # (batch_size, n_sequences, n_features)
#w_x = np.array([[1, 3, 5, 7], [3, 5, 7, 8]])/100 # (n_features, n_nodes)
#w_h = np.array([[1, 3, 5, 7], [2, 4, 6, 8], [3, 5, 7, 8], [4, 6, 8, 10]])/100 # (n_nodes, n_nodes)
batch_size = x.shape[0] # 1
n_sequences = x.shape[1] # 3
#n_features = x.shape[2] # 2
#n_nodes = w_x.shape[1] # 4
n_nodes = 4
h = np.zeros((batch_size, n_nodes)) # (batch_size, n_nodes)
#b = np.array([1, 1, 1, 1]) # (n_nodes,)

In [10]:
#RNN層
lr = 0.001
#initializer = SimpleInitializer(n_nodes)
initializer = SimpleInitializer(1)
optimizer = SGD(lr)

batch_size = 1#x.shape[0] # 1

RNN = SimpleRNN(batch_size, initializer, optimizer)

In [11]:
# 活性化関数
#activation1 = Tanh()
#activation2 = Tanh()
#activation3 = Tanh()
activation = Softmax()

In [12]:
# フォワードプロパゲーション

H1 = RNN.forward(x[0,0], h)
y1= activation.forward(H1)
H2 = RNN.forward(x[0,1], H1)
y2= activation.forward(H2)
H3 = RNN.forward(x[0,2], H2)
y3= activation.forward(H3)

print(H3)

# DIVERに記載された正解
# h = np.array([[0.79494228, 0.81839002, 0.83939649, 0.85584174]]) # (batch_size, n_nodes)

[[0.79494228 0.81839002 0.83939649 0.85584174]]


In [13]:
# クラスから実行
RNN_class = ScratchSimpleRNNClassifier(lr=0.001, sigma = 0.01)
RNN_class.fit(x, 1)
print("h", RNN_class.H3)
#print("Z1", RNN_class.Z1)

h [[0.79494228 0.81839002 0.83939649 0.85584174]]


### 【問題3】（アドバンス課題）バックプロパゲーションの実装

バックプロパゲーションを実装してください。


RNNの内部は全結合層を組み合わせた形になっているので、更新式は全結合層などと同様です。

$$
W_x^{\prime} = W_x - \alpha \frac{\partial L}{\partial W_x} \\
W_h^{\prime} = W_h - \alpha \frac{\partial L}{\partial W_h} \\
B^{\prime} = B - \alpha \frac{\partial L}{\partial B}
$$

$$
α : 学習率
$$

$$
\frac{\partial L}{\partial W_x}: 
W
x
 に関する損失 
L
 の勾配
$$

$$
\frac{\partial L}{\partial W_h}: 
W
h
 に関する損失 
L
 の勾配
$$
 
$$
\frac{\partial L}{\partial B}:B
  に関する損失 
L
 の勾配
$$

勾配を求めるためのバックプロパゲーションの数式が以下です。

$$
\frac{\partial h_t}{\partial a_t} = \frac{\partial L}{\partial h_t} ×(1-tanh^2(a_t))
$$

$$
\frac{\partial L}{\partial B} = \frac{\partial h_t}{\partial a_t}
$$

$$
\frac{\partial L}{\partial W_x} = x_{t}^{T}\cdot \frac{\partial h_t}{\partial a_t}
$$

$$
\frac{\partial L}{\partial W_h} = h_{t-1}^{T}\cdot \frac{\partial h_t}{\partial a_t}
$$

$$
*\frac{\partial L}{\partial h_t}は前の時刻からの状態の誤差と出力の誤差の合計です。hは順伝播時に出力と次の層に伝わる状態双方に使われているからです。
$$


前の時刻や層に流す誤差の数式は以下です。

$$
\frac{\partial L}{\partial h_{t-1}} = \frac{\partial h_t}{\partial a_t}\cdot W_{h}^{T}
$$

$$
\frac{\partial L}{\partial x_{t}} = \frac{\partial h_t}{\partial a_t}\cdot W_{x}^{T}
$$

ScratchSimpleRNNClassifierクラスfit関数内に以下のコードを作成。

dX2, dH2 = self.RNN.backward(H3)

dX1, dH1= self.RNN.backward(dH2)

dX0, dH0 = self.RNN.backward(dH1)

SGD()クラス内にupdate関数を作成

In [14]:
def update(self, layer, dH):#Z3, Y, 
        """
        ある層の重みやバイアスの更新
        Parameters
        ----------
        layer : 更新前の層のインスタンス
        """
        #【問題3】
        
        dAt = dH*(1 - np.tanh(layer.A)**2)
        
        dWx = layer.x.reshape(-1,1)@dAt
        dWh = layer.H.T@dAt
        
        layer.w_x = layer.w_x - self.lr*(dWx)
        layer.w_h = layer.w_h - self.lr*(dWh)
        layer.b = layer.b - self.lr*(dAt)
        
        dX = dAt@layer.w_x.T
        dH_prev = dAt@layer.w_h.T
        
        return (layer, dX, dH_prev)

In [15]:
# バックワードプロパゲーション

dX2, dH2 = RNN.backward(H3)
dX1, dH1= RNN.backward(dH2)
dX0, dH0 = RNN.backward(dH1)

print("dX2", dX2)
print("dX1", dX1)
print("dX0", dX0)
print("dH0", dH0)

dX2 [[0.03945182 0.05795526]]
dX1 [[0.00280725 0.0039533 ]]
dX0 [[0.00019248 0.00027198]]
dH0 [[0.0001915  0.00023764 0.00027096 0.00032993]]


In [16]:
# クラスから実行
RNN_class = ScratchSimpleRNNClassifier(lr=0.001, sigma = 0.01)
RNN_class.fit(x, 1)
print("x2", RNN_class.dX2)
print("x1", RNN_class.dX1)
print("x0", RNN_class.dX0)
print("dH0", RNN_class.dH0)

x2 [[0.03945182 0.05795526]]
x1 [[0.00280725 0.0039533 ]]
x0 [[0.00019248 0.00027198]]
dH0 [[0.0001915  0.00023764 0.00027096 0.00032993]]
