# Sprint11　Simpleconv1d

# 【問題1】チャンネル数を1に限定した1次元畳み込み層クラスの作成
チャンネル数を1に限定した1次元畳み込み層のクラスSimpleConv1dを作成してください。基本構造は前のSprintで作成した全結合層のFCクラスと同じになります。なお、重みの初期化に関するクラスは必要に応じて作り変えてください。Xavierの初期値などを使う点は全結合層と同様です。


ここでは パディング は考えず、ストライド も1に固定します。また、複数のデータを同時に処理することも考えなくて良く、バッチサイズは1のみに対応してください。この部分の拡張はアドバンス課題とします。

In [1]:
import numpy as np

class Scratch1dCNNClassifier:
    '''
    NN畳み込み層
    '''
    def __init__(self, W, B, stride=1, pad=0):
        '''
        初期化
        ---
        引数

        W: 重み
        B: バイアス
        stride: ストライド数 
        pad: パディング数 #パディングとは特徴マップの縁を0で埋める処理
        ---
        '''
        self.W = W
        self.B = B
        self.stride = stride
        self.pad = pad


    def forward(self, X):
        self.X = X

        if W.ndim == 1:
            FH = 1
        else:
            FH = W.shape[1] #フィルターの高さ
        FW = self.W.shape[0] #フィルターの幅

        if X.ndim == 1:
            XH = 1
        else:
            XH = X.shape[1] #Xの高さ
        XW = X.shape[0] #Xの幅

        F = W.size

        #畳み込み演算
        self.out = np.array([])
        for i in range(FH):
            for j in range(FW - self.stride):
                self.out = np.append(self.out, np.dot(X[j : FW+j ], self.W.T) + self.B)

        return self.out


X = np.array([1,2,3,4])
W = np.array([3, 5, 7])
B = np.array([1])

a = Scratch1dCNNClassifier(W, B)
out = a.forward(X)

# 【問題2】1次元畳み込み後の出力サイズの計算
畳み込みを行うと特徴量の数が変化します。どのように変化するかは以下の数式から求められます。パディングやストライドも含めています。この計算を行う関数を作成してください。

In [2]:
def Conv_output_cal(N_in, pad, F, stride):
    return (N_in + 2 * pad - F) / stride + 1

N_in = X.shape[0]
pad = 1
F = W.shape[0]
stride = 1

Conv_output_cal(N_in, pad, F, stride)

4.0

# 【問題3】小さな配列での1次元畳み込み層の実験
次に示す小さな配列でフォワードプロパゲーションとバックプロパゲーションが正しく行えているか確認してください。

In [3]:
class Scratch1dCNNClassifier:
    '''
    NN畳み込み層
    '''
    def __init__(self, W, B, lr, stride, pad):
        '''
        初期化
        ---
        引数

        W: 重み
        B: バイアス
        lr = 学習率
        stride: ストライド数 
        pad: パディング数 #パディングとは特徴マップの縁を0で埋める処理
        ---
        '''
        self.W = W
        self.B = B
        self.lr = lr
        self.stride = stride
        self.pad = pad
        
        # 中間データ（backward時に使用）
        self.X = None   
        self.col = None
        self.col_W = None
        
        # 重み・バイアスパラメータの勾配
        self.dW = []
        self.db = None


    def forward(self, X):
        self.X = X

        if W.ndim == 1:
            FH = 1
        else:
            FH = W.shape[1] #フィルターの高さ
        FW = self.W.shape[0] #フィルターの幅

        if X.ndim == 1:
            XH = 1
        else:
            XH = X.shape[1] #Xの高さ
        XW = X.shape[0] #Xの幅


        #畳み込み演算
        self.out = np.array([])
        for i in range(FH):
            for j in range(FW - self.stride):
                self.out = np.append(self.out, np.dot(X[j : FW+j ], self.W.T) + self.B)

        return self




    def backward(self , dA):
        X_index = np.arange(dA.shape[0]) #インデックス値
        n_iter = int(self.X.shape[0] - dA.shape[0] // self.stride + 1)
        
        #バイアス勾配計算
        self.dB = np.sum(dA)

        #重み勾配計算
        for k in range(n_iter):
            self.dW.append( np.dot(self.X[X_index + k].T, dA) )

        #出力値計算
        dZ = np.zeros(self.X.shape[0])

        for i in range(dA.shape[0]):
            dZ[i : i + self.W.shape[0] ] += self.W * dA[i]

        #重み、バイアス更新
        self.dW = np.array(self.dW)
        self.W = self.W - self.lr * self.dW
        self.B = self.B - self.lr * self.dW
        
        return dZ


X = np.array([1,2,3,4])
W = np.array([3, 5, 7])
B = np.array([1])
lr = 0.01
stride = 1
pad = 0

b = Scratch1dCNNClassifier(W, B, lr, stride, pad).forward(X)
dA = np.array([10, 20]) #流れてきた誤差
dZ = b.backward(dA)

print(dZ)

[ 30. 110. 170. 140.]


# 【問題4】チャンネル数を限定しない1次元畳み込み層クラスの作成
チャンネル数を1に限定しない1次元畳み込み層のクラスConv1dを作成してください。

In [4]:
class Conv1d:
    '''
    NN畳み込み層
    '''
    def __init__(self, W, B, lr, stride, pad):
        '''
        初期化
        ---
        引数

        W: 重み
        B: バイアス
        lr = 学習率
        stride: ストライド数 
        pad: パディング数 #パディングとは特徴マップの縁を0で埋める処理
        ---
        '''
        self.W = W
        self.B = B
        self.lr = lr
        self.stride = stride
        self.pad = pad
        
        # 中間データ（backward時に使用）
        self.X = None   
        self.col = None
        self.col_W = None
        
        # 重み・バイアスパラメータの勾配
        self.dW = []
        self.db = None


    def forward(self, X):
        self.X = X

        if self.W.ndim == 1:
            FH = 1
        else:
            FH = 1 #フィルターの高さ
        FW = self.W.shape[2] #フィルターの幅
        C = self.W.shape[1] #フィルター出力チャンネル数
        FN = self.W.shape[0] #フィルター入力チャンネル数

        if X.ndim == 1:
            XH = 1
        else:
            XH = 1
        XH = 1 #Xの高さ
        XW = X.shape[1] #Xの幅
        XC = X.shape[0] #Xの入力チャンネル数


        #畳み込み演算
        self.out = []
        for i in range(FN):
            temp_out_2 = []
            for k in range(C):
                temp_out_1 = []
                for j in range(FW - self.stride):
                    temp_out_1.append( np.dot( X[k ,  j : FW + j ], self.W[i, k].T ) )
                temp_out_2.append(sum(temp_out_1) + self.B[i])
            self.out.append(temp_out_2)

        return self.out


X = np.array([[1, 2, 3, 4], [2, 3, 4, 5]]) # shape(2, 4)で、（入力チャンネル数、特徴量数）である。
W = np.ones((3, 2, 3)) # 例の簡略化のため全て1とする。(出力チャンネル数、入力チャンネル数、フィルタサイズ)である。
B = np.array([1, 2, 3]) # （出力チャンネル数）
lr = 0.01
stride = 1
pad = 0

c = Conv1d(W, B, lr, stride, pad)
c_out = c.forward(X)
print(c_out)

[[16.0, 22.0], [17.0, 23.0], [18.0, 24.0]]


# 【問題8】学習と推定
これまで使ってきたニューラルネットワークの全結合層の一部をConv1dに置き換えてMNISTを学習・推定し、Accuracyを計算してください。


出力層だけは全結合層をそのまま使ってください。ただし、チャンネルが複数ある状態では全結合層への入力は行えません。その段階でのチャンネルは1になるようにするか、 平滑化 を行なってください。


画像に対しての1次元畳み込みは実用上は行わないことのため、精度は問いません。

In [5]:
from keras.datasets import mnist
import numpy as np

(X_train, y_train), (X_test, y_test) = mnist.load_data()

X_train = X_train.reshape(-1, 784)
X_test = X_test.reshape(-1, 784)


from sklearn.model_selection import train_test_split

X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2)


from sklearn.preprocessing import OneHotEncoder

enc = OneHotEncoder(handle_unknown='ignore', sparse=False)
y_train_one_hot = enc.fit_transform(y_train[:, np.newaxis])
y_val_one_hot = enc.transform(y_val[:, np.newaxis])

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz


In [6]:
class FC():
    """
    ノード数n_nodes1からn_nodes2への全結合層
    Parameters
    ----------
    n_nodes1 : int
      前の層のノード数
    n_nodes2 : int
      後の層のノード数
    initializer : 初期化方法のインスタンス
    optimizer : 最適化手法のインスタンス
    """
    def __init__(self, input, output, initializer, optimizer, activation):
        self.optimizer = optimizer
        self.input = input
        self.output = output
        self.initializer = initializer

         # 初期化
        # initializerのメソッドを使い、self.Wとself.Bを初期化する       
        self.W = self.initializer.W(self.input, self.output, activation)
        self.B = self.initializer.B(self.output)
        self.W_Adagrad = Adagrad()
        self.B_Adagrad = Adagrad()


    def forward(self, X):
        """
        フォワード
        Parameters
        ----------
        X : 次の形のndarray, shape (batch_size, n_nodes1)
            入力
        Returns
        ----------
        A : 次の形のndarray, shape (batch_size, n_nodes2)
            出力
        """
        self.X = X

        A = np.dot(X, self.W ) + self.B

        return A



    def backward(self, dA):
        """
        バックワード
        Parameters
        ----------
        dA : 次の形のndarray, shape (batch_size, n_nodes2)
            後ろから流れてきた勾配
        Returns
        ----------
        dZ : 次の形のndarray, shape (batch_size, n_nodes1)
            前に流す勾配
        """
        
        Z = np.dot(dA, self.W.T)
        self.dA = dA

        # 更新
        self = self.optimizer.update(self)

        return Z

In [29]:
class Convolutional:
    '''
    NN畳み込み層
    '''
    def __init__(self, filter_size, input_shape, stride, pad, initializer, optimizer, activation):
        self.filter_size = filter_size
        self.input_shape = input_shape
        self.input_C = input_shape[0]
        self.input_W = input_shape[1]
        self.input_H = input_shape[2]
        self.stride = stride
        self.pad = pad
        self.initializer = initializer
        self.optimizer = optimizer
        self.activation = activation

        # initializerのメソッドを使い、self.Wとself.Bを初期化する
        self.FN = 3
        self.FH = 1
        self.FW = filter_size #1次元データの為
        self.W = self.initializer.W(self.input_C, self.FW, activation)
        self.B = self.initializer.B(self.input_C) #仮（本来チャネル数）
        self.W_Adagrad = Adagrad()
        self.B_Adagrad = Adagrad()

        # 重み・バイアスパラメータの勾配
        self.dW = []
        self.db = None        



    def forward(self, X):
        self.X = X

        #畳み込み演算
        # A = np.array([])
        # for i in range(self.FH):
        #     for j in range(self.FW - self.stride):
        #         A = np.append(A, np.dot(X[ j : self.FW+j ].T, self.W.T) + self.B)

        # for i in range(self.FN):
        #     temp_out_2 = []

        self.OW = int( 1 + (self.input_W + 2 * pad - self.FW) / self.stride ) #ループ回数
        A = np.zeros((self.input_C, self.OW))
        

        for k in range(self.input_C):
            temp_out_1 = []

            for j in range(self.OW):
                rows = np.sum(np.dot( X[k ,  j : self.FW + j ].reshape(16,1), self.W[k, :].reshape(1,16) ) )
                temp_out_1.append(rows)

            cols = temp_out_1 + self.B[:,k]
            A[0] = cols

            #アウトプット：768

        return A


    def backward(self , dA):
        print(f'da   {dA.shape}')
        self.dA = dA

        X_index = np.arange(dA.shape[0]) #インデックス値
        n_iter = int(self.X.shape[0] - dA.shape[0] // self.stride + 1)
        
        #バイアス勾配計算
        self.dB = np.sum(dA, axis=1)

        #重み勾配計算
        for k in range(n_iter):
            self.dW.append( np.dot(self.X[X_index + k].T, dA) )

        #出力値計算
        dZ = np.zeros_like(self.X, dtype='float64')

        for k in range(self.input_C):
            for j in range(self.OW - self.FW):
                dZ[k , j : self.FW + j ] += self.W[k] * dA[k, j : self.FW + j ]

        
        # 更新
        self = self.optimizer.update(self)



        # #重み、バイアス更新
        # self.dW = np.array(self.dW)
        # self.W = self.W - self.lr * self.dW
        # self.B = self.B - self.lr * self.dW
        
        return dZ

In [8]:
class SimpleInitializer:
    """
    ガウス分布によるシンプルな初期化
    Parameters
    ----------
    sigma : float
      ガウス分布の標準偏差
    """
    def __init__(self, sigma):
        self.sigma = sigma

    def W(self, n_node1, n_node2, activation):
        """
        重みの初期化
        Parameters
        ----------
        n_nodes1 : int
          前の層のノード数
        n_nodes2 : int
          後の層のノード数
        Returns
        ----------
        W :
            個別の重み
        """
        if activation == 'Tanh' or 'sigmoid':
            return self.sigma * XavierInitializer().cal(n_node1, n_node2)
        else:
            return self.sigma * HeInitializer().cal(n_node1, n_node2)

    def B(self, n_node1):
        """
        バイアスの初期化
        Parameters
        ----------
        n_nodes2 : int
          後の層のノード数
        Returns
        ----------
        B :
            個別のバイアス
        """
        return np.random.randn(1, n_node1)

In [25]:
class SGD():
    """
    確率的勾配降下法
    Parameters
    ----------
    lr : 学習率
    """
    def __init__(self, lr):
        self.lr = lr

    def update(self, layer):
        """
        ある層の重みやバイアスの更新
        Parameters
        ----------
        layer : 更新前の層のインスタンス
        """
        #重み
        print(layer.X.shape, layer.dA.shape)
        W_grads = np.dot(layer.X.T, layer.dA) #XはAのこと
        h = layer.W_Adagrad.update(layer.W, W_grads)
        layer.W -= self.lr * W_grads / (np.sqrt(h[-1]) + 1e-7)

        #バイアス
        B_grads = np.sum(layer.dA, axis=0)
        h = layer.B_Adagrad.update(layer.B, B_grads)
        layer.B -= self.lr * B_grads / (np.sqrt(h[-1]) + 1e-7)

In [10]:
class Softmax:
    '''
    ソフトマックス関数の計算を行う
    '''
    def __init__(self):
        return
    
    def forward(self, A):
        self.Z = np.exp(A) / np.sum(np.exp(A), axis=1).reshape(-1, 1)
        return  self.Z

    def backward(self, Y):
        #ロス計算（クロスエントロピー）
        delta = 1e-7
        loss = -np.sum( Y * np.log(self.Z) + delta ) / len(Y)

        return self.Z - Y,  loss


class Tanh:
    '''
    ハイパボリックタンジェント関数の計算を行う
    '''
    def __init__(self):
        return
    
    def forward(self, A):
        self.A = A
        self.Z = np.tanh(A)
        
        return self.Z
    
    def backward(self, Z):
        self.Z = Z * (1 - np.tanh(self.A) ** 2)
        return self.Z

In [11]:
class ReLU:
    '''
    ReLu関数の計算を行う
    '''
    def __init__(self):# インスタンス変数　maskの初期化
        self.mask = None
        
    def forward(self,A):#信号の大きさをxとして引数として渡す
        self.mask = (A <=0 ) #  x <=0　の場合、True, それ以外はFalse を渡す
        out = A.copy() #  xの値（配列）をコピーする
        out[self.mask] = 0 # True の要素の値のみを０のに変換する
        
        return out
    
    def backward(self, Z):
        Z[self.mask] = 0 # x<=0がtrue のものは、逆伝播の微分のdoutも0で流す。それ以外はそのまま流す。
        dx = Z
        
        return dx

In [12]:
class XavierInitializer:
    '''
    シグモイド関数・ハイパボリックタンジェント関数を使用する際の重みの初期値
    '''
    def __init__(self):
        return
    
    def cal(self, node_input, node_output):
        return np.random.randn(node_input, node_output) / np.sqrt(node_input)
    
    
    
class HeInitializer:
    '''
    ReLU関数を使用する際の重みの初期値
    '''
    def __init__(self):
        return

    def cal(self, node_input, node_output):
        return np.random.randn(node_input, node_output) * np.sqrt(1 / node_input)
        
        #/ np.sqrt(node_input) * np.sqrt(2)

In [26]:
class Adagrad:
    '''
    学習率の最適化
    '''
    def __init__(self):
        self.h = None
        
    def update(self, params, grads):
        if self.h is None:
            self.h = np.zeros_like(params)

        self.h += grads * grads

        return self.h

In [14]:
class GetMiniBatch:
    """
    ミニバッチを取得するイテレータ
    Parameters
    ----------
    X : 次の形のndarray, shape (n_samples, n_features)
      訓練データ
    y : 次の形のndarray, shape (n_samples, 1)
      正解値
    batch_size : int
      バッチサイズ
    seed : int
      NumPyの乱数のシード
    """
    def __init__(self, X, y, batch_size = 20, seed=0):
        self.batch_size = batch_size
        np.random.seed(seed)
        shuffle_index = np.random.permutation(np.arange(X.shape[0]))
        self._X = X[shuffle_index]
        self._y = y[shuffle_index]
        self._stop = np.ceil(X.shape[0]/self.batch_size).astype(np.int)
    def __len__(self):
        return self._stop
    def __getitem__(self,item):
        p0 = item*self.batch_size
        p1 = item*self.batch_size + self.batch_size
        return self._X[p0:p1], self._y[p0:p1]        
    def __iter__(self):
        self._counter = 0
        return self
    def __next__(self):
        if self._counter >= self._stop:
            raise StopIteration()
        p0 = self._counter*self.batch_size
        p1 = self._counter*self.batch_size + self.batch_size
        self._counter += 1
        return self._X[p0:p1], self._y[p0:p1]

In [30]:
class ScratchDeepNeuralNetrowkClassifier():
    '''
    ベースのクラス
    ---

    '''
    def __init__(self, filter_size1, filter_size2, stride, pad, input_shape, sigma, lr, n_epoch, batch_size, activation, n_node1, n_note2, n_output, verbose=False):
        self.filter_size1 = filter_size1
        self.filter_size2 = filter_size2
        self.stride = stride
        self.pad = pad
        self.input_shape = input_shape
        self.sigma = sigma
        self.lr = lr
        self.n_epoch = n_epoch
        self.batch_size = batch_size
        self.activation = activation
        self.n_node1 = n_node1
        self.n_node2 = n_node2
        self.n_output = n_output
        self.verbose = verbose
    

    def fit(self, X, y, X_val=None, y_val=None):
        '''
        NN分類器を学習する
        ----------
        X : 次の形のndarray, shape (n_samples, n_features)
            訓練データの特徴量
        y : 次の形のndarray, shape (n_samples, )
            訓練データの正解値
        X_val : 次の形のndarray, shape (n_samples, n_features)
            検証データの特徴量
        y_val : 次の形のndarray, shape (n_samples, )
            検証データの正解値
        '''
        self.n_features = input_shape[1] - self.filter_size1 + self.stride #全結合層接続時のinput数
        self.loss_train = []
        self.loss_val = []


        optimizer = SGD(self.lr)

        #畳み込み層インスタンス化
        self.Conv1 = Convolutional(self.filter_size1, self.input_shape, self.stride, self.pad, SimpleInitializer(self.sigma), optimizer, self.activation)
        self.activation1 = ReLU()

        #全結合層インスタンス化
        self.FC1 = FC(self.n_features, self.n_node1, SimpleInitializer(self.sigma), optimizer, self.activation)
        self.activation2 = self.activation()
        self.FC2 = FC(self.n_node1, self.n_node2, SimpleInitializer(self.sigma), optimizer, self.activation)
        self.activation3 = self.activation()
        self.FC3 = FC(self.n_node2, self.n_output, SimpleInitializer(self.sigma), optimizer, self.activation)
        self.activation4 = Softmax()


        
        #エポックループ
        for epoch in range(n_epoch):
            self.forward_propagation(X)
            self.backward_propagation(y)

        self.forward_propagation(X)
        dA4, loss = self.activation4.backward(y)
        self.loss_train.append(loss)

        if X_val is not None:
            self.forward_propagation(X_val)
            dA3, loss = self.activation3.backward(y_val)
            self.loss_val.append(loss)


        if self.verbose:
            #verboseをTrueにした際は学習過程などを出力する
            print()



    def forward_propagation(self, X):
        A1 = self.Conv1.forward(X)
        Z1 = self.activation1.forward(A1)
        A2 = self.FC1.forward(Z1)
        Z2 = self.activation2.forward(A2)
        A3 = self.FC2.forward(Z2)
        Z3 = self.activation3.forward(A3)
        A4 = self.FC3.forward(Z3)
        Z4 = self.activation4.forward(A4)
        # print(vars(self.Conv1))



    def backward_propagation(self, y):
        dA4, _ = self.activation4.backward(y) # 交差エントロピー誤差とソフトマックスを合わせている
        dZ3 = self.FC3.backward(dA4)
        dA3 = self.activation3.backward(dZ3)
        dZ2 = self.FC2.backward(dA3)
        dA2 = self.activation2.backward(dZ2)
        dZ1 = self.FC1.backward(dA2)
        dA1 = self.activation1.backward(dZ1)
        dZ0 = self.Conv1.backward(dA1) # dZ0は使用しない




    def predict(self, X):
        '''
        ニューラルネットワーク分類器を使い推定する。
        Parameters
        ----------
        X : 次の形のndarray, shape (n_samples, n_features)
            サンプル
        Returns
        -------
        次の形のndarray, shape (n_samples, 1)
            推定結果
        '''

        self.forward_propagation(X)
        return np.argmax(self.activation3.__dict__['Z'], axis=1)


sigma= 0.01
lr = 0.01
n_node1 = 400
n_node2 = 200
n_output = 10
n_epoch = 1
batch_size = 20
activation = ReLU #Tanh or ReLU
stride = 1
pad = 0
filter_size1 = 16
filter_size2 = 32
input_shape = [3, 784, 1]

d = ScratchDeepNeuralNetrowkClassifier(filter_size1, filter_size2, stride, pad, input_shape, sigma, lr, n_epoch, batch_size, activation, n_node1, n_node2, n_output)
d.fit(X_train[:3], y_train_one_hot[:3], X_val[:3], y_val_one_hot[:3])

(3, 200) (3, 10)
(3, 400) (3, 200)
(3, 769) (3, 400)
da   (3, 769)
(3, 784) (3, 769)


ValueError: ignored