# 1次元の畳み込みニューラルネットワークスクラッチ

In [1]:
import numpy as np

## 初期化方法のクラス化


In [2]:
class SimpleInitializer:
    """
    ガウス分布によるシンプルな初期化
    Parameters
    ----------
    sigma : float
      ガウス分布の標準偏差
    """
    def __init__(self, sigma = 0.01):
        self.sigma = sigma
    
    def W(self, n_nodes1, n_nodes2):
        """
        重みの初期化
        バイアスの初期化
        Parameters
        ----------
        n_nodes1 : int
          前の層のノード数
        n_nodes2 : int
          後の層のノード数

        Returns
        ----------
        W :
        """
        W = self.sigma * np.random.randn(n_nodes1, n_nodes2)
        
        return W
    
    def B(self,n_nodes2):
        """
        バイアスの初期化
        Parameters
        ----------
        n_nodes2 : int
          後の層のノード数

        Returns
        ----------
        B :
        """
        B = self.sigma * np.random.randn(n_nodes2)

        return B

## 最適化手法のクラス化


In [3]:
class SGD:
    """
    確率的勾配降下法
    Parameters
    ----------
    lr : 学習率
    """
    def __init__(self, lr=0.01):
        self.lr = lr
    
    def update(self, layer):
        """
        ある層の重みやバイアスの更新
        Parameters
        ----------
        layer : 更新前の層のインスタンス

        Returns
        ----------
        layer : 更新後の層のインスタンス
        """
        print(layer.W)
        layer.W = layer.W - self.lr * layer.L_w
        layer.B = layer.B - self.lr * layer.L_b
        
        return layer

## 活性化関数のクラス化

In [4]:
class Relu:
    
    def __init__(self):
        self.mask = None
    
    def forward(self,X):
        self.mask = (X <= 0)
        out = X.copy()
        out[self.mask] = 0
        return out
        
    def backward(self, dout):
        dout[self.mask] = 0
        dx = dout
        return dx


In [5]:
class Softmax:

    def __init__(self):
        self.loss = None #損失
        self.y = None #softmax output
    
    def forward(self, A):
        self.y = np.exp(A) /np.reshape(np.sum(np.exp(A), axis=1),(-1,1))
        return self.y

    def backward(self, z, y):
        self.dA = z - y
        self.loss = -1 * (np.mean(np.sum(y*np.log(z+1e-5),axis=1),axis =0 ))

        return self.dA

## 全結合層のクラス化


In [6]:
class FC:
    """
    ノード数n_nodes1からn_nodes2への全結合層
    Parameters
    ----------
    n_nodes1 : int
      前の層のノード数
    n_nodes2 : int
      後の層のノード数
    initializer : 初期化方法のインスタンス
    optimizer : 最適化手法のインスタンス
    """
    def __init__(self, n_nodes1, n_nodes2, initializer, optimizer=SGD()):
        self.optimizer = optimizer
        # 初期化
        self.W = initializer.W(n_nodes1, n_nodes2)
        self.B = initializer.B(n_nodes2)
        
        
    def forward(self, X):
        """
        フォワード
        Parameters
        ----------
        X : 次の形のndarray, shape (batch_size, n_nodes1)
            入力
        Returns
        ----------
        A : 次の形のndarray, shape (batch_size, n_nodes2)
            出力
        """
        self.X = X
        A = self.X@self.W + self.B
       
        return A
    def backward(self, da):
        """
        バックワード
        Parameters
        ----------
        dA : 次の形のndarray, shape (batch_size, n_nodes2)
            後ろから流れてきた勾配
        Returns
        ----------
        dZ : 次の形のndarray, shape (batch_size, n_nodes1)
            前に流す勾配
        """
        
        self.L_b = np.sum(da, axis = 0)
        self.L_w = self.X.T@da
        dz = da@self.W.T
        
        # 更新
        self = self.optimizer.update(self)
        return dz



## 【問題1】チャンネル数を1に限定した1次元畳み込み層クラスの作成

In [7]:
class SimpleConv1d:
    
    def __init__(self,
                 w, 
                 b,
                 optimizer, 
                 stride=1, 
                 pad = 0):
        
        self.optimizer = optimizer
                
        self.W = w
        self.B = b
        self.stride = stride
        self.pad = pad
        
    def forward(self, X):
        self.X = X
        self.n_out = N_out(self.X,
                      self.pad,
                      self.W,
                      self.stride)
        
        a = np.zeros(self.n_out)
        for i in range(self.n_out):#2
            a_i = 0
            for s in range(len(self.W)):#3
                a_i += self.X[i+s] * self.W[s]
            a_i += self.B
            a[i] = a_i

        return a#(2)
            
    def backward(self, L_a):
        self.L_a = L_a
        for s in range(len(self.W)):#3
            self.L_w = 0
            self.L_b = 0
            for i in range(len(self.W)-1):#3-1
                self.L_w += self.L_a[i] * self.X[i+s]
                self.L_b += self.L_a[i]

        L_x = []
        for j in range(len(self.X)):#4
            x_j = 0
            for s in range(len(self.W)):#3
                if (j-s)<0 or (j-s)>(self.n_out-1):
                    x_j += 0
                else:
                    x_j += self.L_a[j-s] * self.W[s]
            
            L_x.append(x_j)
                
        # 更新
        self = self.optimizer.update(self)
        return L_x#(4)

## 【問題2】1次元畳み込み後の出力サイズの計算


In [8]:
def N_out(X, P, F, S):
    if X.ndim == 1:
        n_out = ((X.shape[0] + 2*P - len(F)) / S) + 1
    elif X.ndim > 1:
        n_out = ((X.shape[-1] + 2*P - F.shape[-1]) / S) + 1
    
    return int(n_out)

## 【問題3】小さな配列での1次元畳み込み層の実験


In [9]:
x = np.array([1,2,3,4])
w = np.array([3, 5, 7])
b = np.array([1])
delta_a = np.array([10, 20])

In [10]:
N_out(x,0,w,1)

2

In [11]:
sc = SimpleConv1d(w,b,SGD(0.01))
sc.forward(x)

array([35., 50.])

In [12]:
print(sc.backward(delta_a))

[30, 110, 170, 140]


## 【問題4】チャンネル数を限定しない1次元畳み込み層クラスの作成


In [94]:
class Conv1d_nChanel:
    
    def __init__(self,
                 w, 
                 b,
                 optimizer=SGD(0.01), 
                 stride=1, 
                 pad = 0):
        
        self.optimizer = optimizer
                
        self.W = w
        self.B = np.reshape(b,(-1,1))
        self.stride = stride
        self.pad = pad
        
    def forward(self, X):
        self.X = X
        out_cha, in_cha, fil_size = self.W.shape#3,2,3
        feature_size = self.X.shape[1]
        
        self.n_out = N_out(self.X,
                      self.pad,
                      self.W,
                      self.stride)
        a = np.zeros((out_cha,self.n_out))#3,2
        a_i = np.zeros(self.n_out)
        for i in range(out_cha):#3
            for s in range(self.n_out):#2
                temp = np.sum(self.X[:,s:(fil_size+s)]*self.W[i],axis=1)#
                temp = np.sum(temp,axis=0) 
                a_i[s] = temp
            a[i] = a_i
        
        a += self.B

        return a#3,2
            
    def backward(self, L_a):
        self.L_a = L_a
        out_cha, in_cha, fil_size = self.W.shape#3,2,3
        feature_size = self.X.shape[1]
        
        self.L_b = np.sum(self.L_a, axis=1)
        
        self.L_w = np.zeros_like(self.W)
        for g in range(out_cha):
            for h in range(in_cha):
                for i in range(fil_size):
                    for j in range(fil_size -1):
                        self.L_w[g, h, i] += L_a[g, j]*self.X[h, j+i]
                        
        L_x = np.zeros_like(self.X)
        for g in range(out_cha):
            for h in range(in_cha):
                for j in range(feature_size):
                    x_j=0
                    for s in range(fil_size):
                        if j-s < 0 or j-s > self.n_out -1:
                            x_j += 0
                        else:
                            x_j += L_a[g, j-s] * self.W[g, h, s]
                    L_x[h, j] += x_j
    
        # 更新
        self = self.optimizer.update(self)
        return L_x

In [95]:
x = np.array([[1, 2, 3, 4], 
              [2, 3, 4, 5]]) # shape(2, 4)で、（入力チャンネル数、特徴量数）である。
w = np.ones((3, 2, 3)) # (出力チャンネル数、入力チャンネル数、フィルタサイズ)である。
b = np.array([1, 2, 3]) # （出力チャンネル数）
delta_a2 = np.array([[21, 42], [32, 62], [5, 4]])

In [96]:
cc = Conv1d_nChanel(w,b,SGD(0.01))
cc.forward(x)

array([[16., 22.],
       [17., 23.],
       [18., 24.]])

In [97]:
cc.backward(delta_a2)

array([[ 58, 166, 166, 108],
       [ 58, 166, 166, 108]])

# 検証

## 【問題8】学習と推定


In [130]:
# バッチサイズを考慮する
class Conv1d_nChanel2:
    
    def __init__(self,
                 w, 
                 b,
                 optimizer=SGD(0.01), 
                 stride=1, 
                 pad = 0):
        
        self.optimizer = optimizer
                
        self.W = w
        self.B = b####np.reshape(b,(-1,1))####(-1,1)
        self.stride = stride
        self.pad = pad
        
    def forward(self,X):
        self.X = X
        out_cha, in_cha, fil_size = self.W.shape#3,2,3
        feature_size = self.X.shape[2]
        sample_size = self.X.shape[0]
        
        self.n_out = N_out(self.X,
                      self.pad,
                      self.W,
                      self.stride)
        
        a = np.zeros((sample_size,out_cha,self.n_out))
        a_i = np.zeros(self.n_out)
        for h in range(sample_size):
            X = self.X[h]
            for i in range(out_cha):
                for s in range(self.n_out):
                    temp = np.sum(X[:,s:(fil_size+s)]*self.W[i],axis=1)#
                    temp = np.sum(temp,axis=0) 
                    a_i[s] = temp 
                a[h,i,:] = a_i
            a[h] += np.reshape(self.B,(-1,1))####

        return a
            
    def backward(self, L_a):
        self.L_a = L_a
        out_cha, in_cha, fil_size = self.W.shape#3,2,3
        feature_size = self.X.shape[2]
        sample_size = self.X.shape[0]
        
        self.L_b = np.sum(self.L_a, axis=0)
        self.L_b = np.sum(self.L_b, axis=1)#####
        self.L_w = np.zeros_like(self.W)
        for f in range(sample_size):
            for g in range(out_cha):
                for h in range(in_cha):
                    for i in range(fil_size):
                        for j in range(fil_size -1):
                            self.L_w[g, h, i] += L_a[f, g, j]*self.X[f, h, j+i]

        L_x = np.zeros_like(self.X)
        for f in range(sample_size):
            for g in range(out_cha):
                for h in range(in_cha):
                    for j in range(feature_size):
                        x_j=0
                        for s in range(fil_size):
                            if j-s < 0 or j-s > self.n_out-1:
                                x_j += 0
                            else:
                                x_j += L_a[f, g, j-s] * self.W[g, h, s]
                        L_x[f, h, j] += x_j

        # 更新
        self = self.optimizer.update(self)
        return L_x

## 学習

In [131]:
from sklearn.metrics import accuracy_score
from keras.datasets import mnist
from sklearn.preprocessing import OneHotEncoder
(X_train, y_train), (X_test, y_test) = mnist.load_data()

In [132]:
X_train  = X_train.reshape(-1, 784)
X_test = X_test.reshape(-1, 784)
X_train = X_train.astype(np.float)
X_test = X_test.astype(np.float)
X_train /= 255
X_test /= 255
enc = OneHotEncoder(handle_unknown="ignore", sparse=False)
y_train_one_hot = enc.fit_transform(y_train[:, np.newaxis])
y_test_one_hot = enc.fit_transform(y_test[:,  np.newaxis])

In [133]:
X_train = X_train.reshape(60000, 1, 784)
X_test = X_test.reshape(-1, 1, 784)

In [134]:
w = np.ones((3, 1, 3))
b = np.array([1, 1, 1])

In [135]:
cc2 = Conv1d_nChanel2(w,b)

In [136]:
a = cc2.forward(X_train)

In [137]:
a.shape

(60000, 3, 782)

In [138]:
relu = Relu()

In [139]:
a_relu = relu.forward(a)

In [140]:
a_relu.shape

(60000, 3, 782)

In [141]:
a_flatten = a_relu.reshape(a_relu.shape[0], -1)

In [142]:
a_flatten.shape

(60000, 2346)

In [143]:
FC1 = FC(2346, 10, SimpleInitializer(), SGD())

In [144]:
a_FC1 = FC1.forward(a_flatten)

In [145]:
softmax = Softmax()

In [146]:
a_softmax = softmax.forward(a_FC1)

In [147]:
a_delta = softmax.backward(a_softmax, y_train_one_hot)

In [148]:
a_delta.shape

(60000, 10)

In [149]:
d_L_a = FC1.backward(a_delta)

In [150]:
d_L_a.shape

(60000, 2346)

In [151]:
d_L_a_reshape = d_L_a.reshape(a_relu.shape)

In [152]:
a_relu.shape

(60000, 3, 782)

In [153]:
d_L_a_relu = relu.backward(d_L_a_reshape)

In [154]:
d_L_a_relu.shape

(60000, 3, 782)

In [155]:
L_x = cc2.backward(d_L_a_relu)

In [156]:
cc2.L_b.shape

(3,)

In [157]:
cc2.B

array([-63.40827951, -39.28621084, -81.31902672])

## 推定

In [158]:
X_test = X_test.reshape(-1, 1, 784)

In [159]:
ya = cc2.forward(X_test)

In [160]:
ya = relu.forward(ya)

In [161]:
ya  = ya.reshape(ya.shape[0], -1)

In [162]:
ya = FC1.forward(ya)

In [163]:
d = np.max(ya, axis=1)

In [164]:
for i in range(ya.shape[0]):
    ya[i] = np.exp(ya[i] - d[i])

In [165]:
ya = softmax.forward(ya)

In [166]:
y = np.argmax(ya, axis=1)

In [167]:
print(accuracy_score(y_test, y))

0.0974
