# Sprint 深層学習スクラッチ 畳み込みニューラルネットワーク1

In [1]:
import numpy as np

#### データ、クラスの読み込み

In [2]:
x = np.array([1,2,3,4])
w = np.array([3, 5, 7])
b = np.array([1])
delta_a = np.array([10, 20])

In [3]:
class SGD:
    """
    確率的勾配降下法
    Parameters
    ----------
    lr : 学習率
    """
    def __init__(self, lr):
        self.lr = lr
    def update(self, layer):
        """
        ある層の重みやバイアスの更新
        Parameters
        ----------
        layer : 更新前の層のインスタンス

        Returns
        ----------
        layer : 更新後の層のインスタンス
        """
        
        layer.W = layer.W -  self.lr * layer.LW
        
        layer.B = layer.B - self.lr*layer.LB
        
        return layer

### 【問題1】チャンネル数を1に限定した1次元畳み込み層クラスの作成

In [4]:
class CNN_FC:
    """
    ノード数n_nodes1からn_nodes2への全結合層
    Parameters
    ----------
    n_nodes1 : int
      前の層のノード数
    n_nodes2 : int
      後の層のノード数
    initializer : 初期化方法のインスタンス
    optimizer : 最適化手法のインスタンス
    """
    def __init__(self, w, b , optimizer,stride, padding):
        self.optimizer = optimizer
        # 初期化
        # initializerのメソッドを使い、self.Wとself.Bを初期化する
        #init = initializer
        #self.W = init.W(num_filter)
        #self.B = init.B(num_bias)
        self.W = w
        self.B = b
        self.stride = stride
        self.padding = padding
    
    def forward(self, X):
        """
        フォワード
        Parameters
        ----------
        X : 次の形のndarray, shape (batch_size, n_nodes1)
            入力
        Returns
        ----------
        A : 次の形のndarray, shape (batch_size, n_nodes2)
            出力
        """
        self.A = X
        a = np.zeros(len(X)-2)
        for i in range(len(X) - 2):
            sig = 0
            for j in range(len(self.W)):
                sig += X[i+j] * self.W[j]
            sig += self.B
            a[i] = sig

        
        return a

    
    def backward(self, dA):
        """
        バックワード
        Parameters
        ----------
        dA : 次の形のndarray, shape (batch_size, n_nodes2)
            後ろから流れてきた勾配
        Returns
        ----------
        dZ : 次の形のndarray, shape (batch_size, n_nodes1)
            前に流す勾配
        """
        n_out = N_OUT(self.stride, self.padding, self.A, self.W)

        self.LB = np.sum(dA)
        
        #LWの計算
        self.LW = np.zeros_like(self.W)
        for s in range(len(self.W)):
            sigma = 0
            for i in range(len(self.W)-1):
                sigma += dA[i] * self.A[i+s]
            self.LW[s] = sigma
        
        #dZの計算
        self.dZ = np.zeros_like(self.A)
        for j in range(len(self.A)):
            sigma = 0
            for s in range(len(self.W)):
                if j - s < 0 or j - s > n_out-1:
                    pass
                else:
                    sigma += dA[j-s] * self.W[s]
            self.dZ[j] = sigma
        
        # 更新
        self = self.optimizer.update(self)
        return self.dZ

### 【問題2】1次元畳み込み後の出力サイズの計算

In [5]:
def N_OUT(stride, padding, X,  W):
    if X.ndim == 1:
        return int((X.shape[0] + (2*padding) - len(W) / stride) + 1)
    elif X.ndim == 3:
        return int((X.shape[2] + (2*padding) - len(W) / stride) + 1 )

### 【問題3】小さな配列での1次元畳み込み層の実験

In [6]:
cnn = CNN_FC(w, b, SGD(0.1), 1, 0)
print(cnn.forward(x))
print(cnn.backward(delta_a))

[35. 50.]
[ 30 110 170 140]


### 【問題4】チャンネル数を限定しない1次元畳み込み層クラスの作成

In [7]:
x = np.array([[1, 2, 3, 4], [2, 3, 4, 5]])
w = np.ones((3, 2, 3))
b = np.array([1, 2, 3])
delta_a2 = np.array([[52, 56], [32, 35], [9, 11]])

In [8]:
class CNN2dim_FC:
    """
    ノード数n_nodes1からn_nodes2への全結合層
    Parameters
    ----------
    n_nodes1 : int
      前の層のノード数
    n_nodes2 : int
      後の層のノード数
    initializer : 初期化方法のインスタンス
    optimizer : 最適化手法のインスタンス
    """
    def __init__(self, w, b , optimizer,stride, padding):
        self.optimizer = optimizer
        # 初期化
        # initializerのメソッドを使い、self.Wとself.Bを初期化する
        #init = initializer
        #self.W = init.W(num_filter)
        #self.B = init.B(num_bias)
        self.W = w
        self.B = b
        self.stride = stride
        self.padding = padding
    
    def forward(self, X):
        """
        フォワード
        Parameters
        ----------
        X : 次の形のndarray, shape (batch_size, n_nodes1)
            入力
        Returns
        ----------
        A : 次の形のndarray, shape (batch_size, n_nodes2)
            出力
        """
        self.A = X
        output_size, chanel_size, filter_size = self.W.shape
        feature_size = self.A.shape[1]

        a = np.zeros([output_size, feature_size - 2])
        for output in range(output_size):
            for j in range(filter_size - 1):
                sig = 0
                for chanel in range(chanel_size):
                    for i in range(filter_size):
                        sig += X[chanel, i+j] * self.W[output, chanel, j]
                a[output, j] = sig + b[output]
        
        return a

    def backward(self, dA):
        """
        バックワード
        Parameters
        ----------
        dA : 次の形のndarray, shape (batch_size, n_nodes2)
            後ろから流れてきた勾配
        Returns
        ----------
        dZ : 次の形のndarray, shape (batch_size, n_nodes1)
            前に流す勾配
        """
        self.n_out = N_OUT(self.stride, self.padding, self.W, self.A)
        self.LB = np.sum(dA, axis=1)
        
        output_size, chanel_size, filter_size = self.W.shape
        feature_size = self.A.shape[1]
        
        #LWの計算
        self.LW = np.zeros_like(self.W)
        for output in range(output_size):
            for chanel in range(chanel_size):
                for i in range(filter_size):
                    for j in range(filter_size -1):
                        self.LW[output, chanel, i] += dA[output, j]*self.A[chanel, j+i]
        
        #dZの計算
        dZ = np.zeros_like(self.A)
        for output in range(output_size):
            for chanel in range(chanel_size):
                for j in range(feature_size):
                    sigma=0
                    for s in range(filter_size):
                        if j - s < 0 or j - s > self.n_out -1:
                            pass
                        else:
                            sigma += dA[output,  j-s] * self.W[output, chanel, s]
                    dZ[chanel, j] += sigma
        
        # 更新
        self = self.optimizer.update(self)
        return dZ

### 二次元　実行

In [9]:
cnn_2 = CNN2dim_FC(w, b, SGD(0.1), 1, 0)
print(cnn_2.forward(x))
cnn_2.backward(delta_a2)

[[16. 22.]
 [17. 23.]
 [18. 24.]]


array([[ 93, 195, 195, 102],
       [ 93, 195, 195, 102]])

### 【問題8】学習と推定

In [10]:
from keras.datasets import mnist
from sklearn.preprocessing import OneHotEncoder
(X_train, t_train), (X_test, t_test) = mnist.load_data()

  from ._conv import register_converters as _register_converters
Using TensorFlow backend.


In [11]:
X_train  = X_train.reshape(-1, 784)
X_test = X_test.reshape(-1, 784)
X_train = X_train.astype(np.float)
X_test = X_test.astype(np.float)
X_train /= 255
X_test /= 255
enc = OneHotEncoder(handle_unknown="ignore", sparse=False)
t_train_one_hot = enc.fit_transform(t_train[:, np.newaxis])
t_test_one_hot = enc.fit_transform(t_test[:,  np.newaxis])

In [12]:
X_train = X_train.reshape(60000, 1, 784)
X_test = X_test.reshape(-1, 1, 784)

In [13]:
w = np.ones((3, 1, 3))
b = np.array([1, 2, 3])

In [14]:
class CNN_mnist_FC:
    """
    畳み込み層
    Parameters
    ----------
    w:畳み込み層の重み　w.shape  (出力チャネル、入力チャネル、フィルターサイズ)
    b:畳み込み層のバイアス　b.shape (出力チャネル, )
    stride:ストライド数
    padding:パディング数
    optimizer : 最適化手法のインスタンス
    """
    def __init__(self, w, b , optimizer,stride, padding):
        self.optimizer = optimizer
        self.W = w
        self.B = b
        self.stride = stride
        self.padding = padding
    
    def forward(self, X):
        """
        フォワード
        Parameters
        ----------
        X : 次の形のndarray, shape (batch_size, n_nodes1)
            入力
        Returns
        ----------
        A : 次の形のndarray, shape (batch_size, n_nodes2)
            出力
        """
        self.A = X
        output_size, chanel_size, filter_size = self.W.shape
        feature_size = self.A.shape[2]
        sample_size = self.A.shape[0]

        a = np.zeros([sample_size, output_size, feature_size-2])
        for samples in range(sample_size):
            for output in range(output_size):
                for j in range(filter_size - 1):
                    sig = 0
                    for chanel in range(chanel_size):
                        for i in range(filter_size):
                            sig += X[samples, chanel, i+j] * self.W[output, chanel, j]
                    a[samples, output, j] = sig + b[output]
        
        return a

    
    def backward(self, dA):
        """
        バックワード
        Parameters
        ----------
        dA : 次の形のndarray, shape (batch_size, n_nodes2)
            後ろから流れてきた勾配
        Returns
        ----------
        dZ : 次の形のndarray, shape (batch_size, n_nodes1)
            前に流す勾配
        """
        self.n_out = N_OUT(self.stride, self.padding, self.W, self.A)
        
        output_size, chanel_size, filter_size = self.W.shape
        feature_size = self.A.shape[2]
        sample_size = self.A.shape[0]
        
        #LBの計算
        self.LB = dA.sum(axis=0)
        self.LB = self.LB.sum(axis=1)
        
        #LWの計算
        self.LW = np.zeros_like(self.W)
        for samples in range(sample_size):
            for output in range(output_size):
                for chanel in range(chanel_size):
                    for i in range(filter_size):
                        for j in range(filter_size -1):
                            self.LW[output, chanel, i] += dA[samples, output, j]*self.A[samples, chanel, j+i]
                    
        #dZの計算
        dZ = np.zeros_like(self.A)
        for samples in range(sample_size):
            for output in range(output_size):
                for chanel in range(chanel_size):
                    for j in range(feature_size):
                        sigma=0
                        for s in range(filter_size):
                            if j - s < 0 or j - s > self.n_out -1:
                                pass
                            else:
                                sigma += dA[samples, output,  j-s] * self.W[output, chanel, s]
                        dZ[samples, chanel, j] += sigma
        
        # 更新
        self = self.optimizer.update(self)
        return dZ

### クラス

In [16]:
class Relu:
    def forward(self, X):
        self.A = X
        return np.maximum(0, X)
    
    def backward(self, Z):
        
        return Z * np.maximum(np.sign(self.A), 0)

In [17]:
class FC:
    """
    ノード数n_nodes1からn_nodes2への全結合層
    Parameters
    ----------
    n_nodes1 : int
      前の層のノード数
    n_nodes2 : int
      後の層のノード数
    initializer : 初期化方法のインスタンス
    optimizer : 最適化手法のインスタンス
    """
    def __init__(self, n_nodes1, n_nodes2, initializer, optimizer):
        self.optimizer = optimizer
        # 初期化
        # initializerのメソッドを使い、self.Wとself.Bを初期化する
        init = initializer
        self.n_nodes1 = n_nodes1
        self.W = init.W(n_nodes1, n_nodes2)
        self.B = init.B(n_nodes2)
    

    
    def forward(self, X):
        """
        フォワード
        Parameters
        ----------
        X : 次の形のndarray, shape (batch_size, n_nodes1)
            入力
        Returns
        ----------
        A : 次の形のndarray, shape (batch_size, n_nodes2)
            出力
        """
        self.z = X
        self.a = X@self.W + self.B
        
        return self.a

    
    def backward(self, dA):
        """
        バックワード
        Parameters
        ----------
        dA : 次の形のndarray, shape (batch_size, n_nodes2)
            後ろから流れてきた勾配
        Returns
        ----------
        dZ : 次の形のndarray, shape (batch_size, n_nodes1)
            前に流す勾配
        """
        dZ = dA @ self.W.T
        self.LW = self.z.T @ dA
        self.LB = np.sum(dA, axis=0)
        
        
        # 更新
        self = self.optimizer.update(self)
        return dZ

In [18]:
class SimpleInitializer:
    """
    ガウス分布によるシンプルな初期化
    Parameters
    ----------
    sigma : float
      ガウス分布の標準偏差
    """
    def __init__(self, sigma):
        self.sigma = sigma
    def W(self, n_nodes1, n_nodes2):
        """
        重みの初期化
        Parameters
        ----------
        n_nodes1 : int
          前の層のノード数
        n_nodes2 : int
          後の層のノード数

        Returns
        ----------
        W :
        """
        W = self.sigma * np.random.randn(n_nodes1, n_nodes2)
        
        return W
    
    def B(self, n_nodes2):
        """
        バイアスの初期化
        Parameters
        ----------
        n_nodes2 : int
          後の層のノード数

        Returns
        ----------
        B :
        """
        B  = self.sigma * np.random.randn(n_nodes2)
        return B

In [19]:
class Softmax:
    def forward(self, A):
        exp_a = np.exp(A)
        softmax_result = np.empty((A.shape[0], A.shape[1]))
        exp_sum = np.sum(exp_a, axis=1)
        for i in range(A.shape[0]):
            softmax_result[i] = exp_a[i] / exp_sum[i]
            
        return softmax_result
    
    def backward(self, Z, Y):
        
        L_A = Z - Y
        self.cross_entropy = -np.average(np.sum(Y*np.log(Z), axis=1))
        
        
        return L_A

### 学習

In [20]:
cnn_mnist = CNN_mnist_FC(w, b, SGD(0.1), 1, 0)
A = cnn_mnist.forward(X_train)
relu = Relu()
A_relu = relu.forward(A)
A_flat = A_relu.reshape(A_relu.shape[0], -1)
FC_1 = FC(2346, 10, SimpleInitializer(0.1), SGD(0.1))
A_FC_1 = FC_1.forward(A_flat)
softmax = Softmax()
A_soft = softmax.forward(A_FC_1)
A_delta = softmax.backward(A_soft, t_train_one_hot)
delta_Z = FC_1.backward(A_delta)
delta_Z_reshape = delta_Z.reshape(A_relu.shape)
delta_Z_relu = relu.backward(delta_Z_reshape)
dZ = cnn_mnist.backward(delta_Z_relu)

### 推定

In [22]:
X_test = X_test.reshape(-1, 1, 784)
t_A = cnn_mnist.forward(X_test)
t_A = relu.forward(t_A)
t_A  = t_A.reshape(t_A.shape[0], -1)
t_A = FC_1.forward(t_A)
C = np.max(t_A, axis=1)
for i in range(t_A.shape[0]):
    t_A[i] = np.exp(t_A[i] - C[i])
t_A = softmax.forward(t_A)
y = np.argmax(t_A, axis=1)
from sklearn.metrics import accuracy_score
print(accuracy_score(t_test, y))

0.0974
