# Sprint22 RNNスクラッチ

In [31]:
import numpy as np

class Activation():
    def __init__(self, name='tanh'):
        self.name = name
        
    def forward(self, X):
        if self.name == 'sigmoid':
            return self.sigmoid(X)
        elif self.name == 'tanh':
            return self.tanh(X)
        elif self.name == 'relu':
            return self.relu(X)
        
    def backward(self, X, dy):
        if self.name == 'sigmoid':
            return self.d_sigmoid(X) * dy
        elif self.name == 'tanh':
            return self.d_tanh(X) * dy
        elif self.name == 'relu':
            return self.d_relu(X) * dy
        
    def sigmoid(self, X):
        return 1 / (1 + np.exp(-X))
    
    def d_sigmoid(self, X):
        return self.sigmoid(X) * (1 - self.sigmoid(X))
    
    def tanh(self, X):
        return (np.exp(X) - np.exp(-X)) / (np.exp(X) + np.exp(-X))
    
    def d_tanh(self, X):
        return 1 - self.tanh(X)**2
    
    def relu(self, X):
        self.mask = X > 0
        return X * self.mask
    
    def d_relu(self, X):
        return 1 * (X > 0)

In [4]:
class FC:
    def __init__(self, units, input_dim, initializer='xavier'):
        self.units = units
        self.input_dim = input_dim
        self.initializer = initializer
        self.W = None
        self.b = None
        self.dW = None
        self.db = None
        self.initialize_weights()
        
    def sigma(self, initializer, layer_in, layer_out):
        if initializer == 'xavier':
            return np.sqrt(2 / (layer_in + layer_out))
        elif initializer == 'he':
            return np.sqrt(2 / layer_in)
        
    def initialize_params(self):
        self.W = np.random.randn(self.input_dim, self.units)*self.sigma(self.initializer, self.input_dim, self.units)
        self.b = np.random.randn(1, self.units)*self.sigma(self.initailizer, self.input_dim, self.units)
        
    def forward(self, X):
        Z = np.dot(X, self.W) + self.b
        return Z
    
    def backward(self, X, delta):
        n_samples = delta.shape[0]
        self.dW = np.dot(X.T, delta) / n_samples
        self.db = np.dot(np.ones([1, n_samples]), delta) / n_samples
        dX = np.dot(delta, self.W.T)
        return dX
    
    def update(self, lr):
        self.W -= lr * self.dW
        self.b -= lr * self.db

### **【問題1】SimpleRNNのフォワードプロパゲーション実装**

In [53]:
class SimpleRNN:
    # def __init__(self, nodes, input_dim, activation='tanh', initializer='xavier'):
    def __init__(self, nodes, input_dim, activation='tanh'):
        self.nodes = nodes
        self.input_dim = input_dim
        self.activation = activation
        # self.initializer = initializer
        self.activate_function = Activation(self.activation)
        self.W_x = None
        self.W_h = None
        self.b = None
        self.X = None
        self.Z = None
        self.X_post = None
        
#     def sigma(self, initializer, layer_in, layer_out):
#         if initializer == 'xavier':
#             return np.sqrt(2 / (layer_in + layer_out))
#         elif initializer == 'he':
#             return np.sqrt(2 / layer_in)
        
#     def initialize_params(self):
#         self.W_x = np.random.randn(self.input_dim, self.nodes)*self.sigma(self.initializer, self.input_dim, self.nodes)
#         self.W_h = np.random.randn(self.nodes, self.nodes)*self.sigma(initializer, self.nodes, self.nodes)
#         self.b = np.random.randn(1, self.nodes)*self.sigma(self.initializer, self.input_dim, self.nodes)
        
    def forward(self, W_x, W_h, b, X):
        self.W_x = W_x
        self.W_h = W_h
        self.b = b
        self.X = X
        batch_size, n_sequences, n_features = X.shape
        Z = np.zeros((batch_size, n_sequences, self.nodes))
        # Z = np.zeros((batch_size, self.nodes))
        X_post = np.zeros(Z.shape)
        
#         for t in range(n_sequences):
#             if t == 0:
#                 Z[:, t, :] = np.dot(X[:, t, :], self.W_x) + self.b
#             else:
#                 Z[:, t, :] = np.dot(X[:, t, :], self.W_x) + np.dot(X_post[:, t-1, :], self.W_h) + self.b
#             X_post[:, t, :] = self.activate_function.forward(Z[:, t, :])
#         # 取り出せるようにZ, X_postを保存
#         self.Z = Z
#         self.X_post = X_post
#         return Z

        for t in range(n_sequences):
            if t == 0:
                Z[:,t,:] = np.dot(X[:,t,:], self.W_x) + self.b
            else:
                Z[:,t,:] = np.dot(X[:,t,:], self.W_x) + np.dot(X_post[:,t-1,:], self.W_h) + self.b
            X_post[:, t, :] = self.activate_function.forward(Z[:, t, :])
        return X_post[:, -1, :]

In [27]:
def sigma(layer_in, layer_out):
    return np.sqrt(2 / (layer_in + layer_out))

sigma(layer_in=3, layer_out=4)

0.5345224838248488

In [23]:
# *self.sigma(self.initializer, self.input_dim, self.units)
np.random.randn(1, 4)

array([[ 1.59052423,  0.75578933, -1.1275052 , -0.55735179]])

In [41]:
def test(w_x, w_h, bias, X):
    a, b, c = X.shape
    Z = np.zeros((a, b, 4))
    for t in range(b):
        Z[:, t, :] = np.dot(X[:, t, :], w_x) + bias
    return Z

w_x = np.array([[1, 3, 5, 7], [3, 5, 7, 8]])/100
w_h = np.array([[1, 3, 5, 7], [2, 4, 6, 8], [3, 5, 7, 8], [4, 6, 8, 10]])/100
bias = np.array([1, 1, 1, 1])
X = np.array([[[1, 2], [2, 3], [3, 4]]])/100

test(w_x, w_h, bias, X)

array([[[1.0007, 1.0013, 1.0019, 1.0023],
        [1.0011, 1.0021, 1.0031, 1.0038],
        [1.0015, 1.0029, 1.0043, 1.0053]]])

In [19]:
def forward(X):
    batch_size, n_sequences, n_features = X.shape
    Z = np.zeros((batch_size, n_sequences, 4))
    return Z

x = np.array([[[1, 2], [2, 3], [3, 4]]]) / 100

forward(x)

array([[[0., 0., 0., 0.],
        [0., 0., 0., 0.],
        [0., 0., 0., 0.]]])

### **【問題2】小さな配列でのフォワードプロパゲーションの実験**

In [54]:
x = np.array([[[1, 2], [2, 3], [3, 4]]])/100 # (batch_size, n_sequences, n_features)
w_x = np.array([[1, 3, 5, 7], [3, 5, 7, 8]])/100 # (n_features, n_nodes)
w_h = np.array([[1, 3, 5, 7], [2, 4, 6, 8], [3, 5, 7, 8], [4, 6, 8, 10]])/100 # (n_nodes, n_nodes)
batch_size = x.shape[0] # 1
n_sequences = x.shape[1] # 3
n_features = x.shape[2] # 2
n_nodes = w_x.shape[1] # 4
h = np.zeros((batch_size, n_nodes)) # (batch_size, n_nodes)
b = np.array([1, 1, 1, 1]) # (n_nodes,)

In [45]:
print(x[:, 0, :])
print(x[:, 1, :])
print(x[:, 2, :])

[[0.01 0.02]]
[[0.02 0.03]]
[[0.03 0.04]]


In [55]:
rnn = SimpleRNN(units=4, input_dim=(1, 3, 2))
h = rnn.forward(w_x, w_h, b, x)
h

array([[0.79494228, 0.81839002, 0.83939649, 0.85584174]])

### **【問題3】（アドバンス課題）バックプロパゲーションの実装**

In [56]:
class SimpleRNN2:
    def __init__(self, nodes, input_dim, activation='tanh', initializer='xavier'):
        self.nodes = nodes
        self.input_dim = input_dim
        self.activation = activation
        self.activate_function = Activation(activation)
        self.W_x = None
        self.W_h = None
        self.b = None
        self.dW_x = None
        self.dW_h = None
        self.db = None
        self.X = None
        self.Z = None
        self.X_post = None
        
        def sigma(self, initializer, layer_in, layer_out):
            if initializer == 'xavier':
                return np.sqrt(2 / (layer_in + layer_out))
            elif initializer == 'he':
                return np.sqrt(2 / layer_in)
            
        def initialize_params(self):
            self.W_x = np.random.randn(self.input_dim, self.nodes)*self.sigma(self.initializer, self.input_dim, self.nodes)
            self.W_h = np.random.randn(self.nodes, self.nodes)*self.sigma(initializer, self.nodes, self.nodes)
            self.b = np.random.randn(1, self.nodes)*self.sigma(self.initializer, self.input_dim, self.nodes)
        
        def forward(self, X):
            self.W_x = W_x
            self.W_h = W_h
            self.b = b
            self.X = X
            batch_size, n_sequences, n_features = X.shape
            Z = np.zeros((batch_size, n_sequences, self.nodes))
            X_post = np.zeros(Z.shape)
            
            for t in range(n_sequences):
                if t == 0:
                    Z[:, t, :] = np.dot(X[:, t, :], self.W_x) + self.b
                else:
                    Z[:, t, :] = np.dot(X[:, t, :], self.W_x) + np.dot(X_post[:, t-1, :], self.W_h) + self.b
                X_post[:, t, :] = self.activate_function.forward(Z[:, t, :])
            self.Z = Z
            self.X_post = X_post
            return X[:, -1, :]
        
        def backward(self, _dX_post):
            batch_size, n_sequences, n_features = _dX_post.shape
            
            dX_post = np.zeros((batch_size, n_sequences, n_features))
            dX_post[:-1,:] = _dX_post.copy()
            
            self.dW_x = np.zeros(self.W_x.shape)
            self.dW_h = np.zeros(self.W_h.shape)
            self.b = np.zeros(self.b.shape)
            delta = np.zeros(dX_post.shape)
            dX = np.zeros(self.X.shape)
            
            for t in range(n_sequences-1,-1,-1):
                if t == n_sequences-1:
                    delta[:,t,:] = self.activate_function.backward(self.Z[:,t,:], dX_post[:,t,:])
                else:
                    delta[:,t,:] = self.activate_function.backward(self.Z[:,t,:], dX_post[:,t,:] + np.dot(delta[:,t+1,:], self.W_h))
                dX[:,t,:] = np.dot(delta[:,t,:], self.W_x.T)
                
            for t in range(n_sequences):
                if t != 0:
                    self.W_h += np.dot(self.X_post[:,t-1,:].T, delta[:,t,:]) / batch_size
                self.dW_x += np.dot(self.X[:,t,:].T, delta[:,t,:]) / batch_size
                self.db += np.dot(np.ones((1, batch_size)), delta[:,t,:]) / batch_size
                
            return dX
        
        def update(self, lr):
            self.W_x -= lr * self.dW_x
            self.W_h -= lr * self.dW_h
            self.b -= lr * self.db