# Sprint12課題 深層学習スクラッチ畳み込みニューラルネットワーク1

## 【問題1】チャンネル数を1に限定した1次元畳み込み層クラスの作成
チャンネル数を1に限定した1次元畳み込み層のクラスSimpleConv1dを作成してください。基本構造はsprint11で作成したFCクラスと同じになります。なお、重みの初期化に関するクラスは必要に応じて作り変えてください。Xavierの初期値などを使う点は全結合層と同様です。

ここではパディングは考えず、ストライドも1に固定します。また、複数のデータを同時に処理することも考えなくて良く、バッチサイズは1のみに対応してください。この部分の拡張はアドバンス課題とします。

フォワードプロパゲーションの数式は以下のようになります。

In [10]:
#データセットの用意
from keras.datasets import mnist
import numpy as np
(X_train, y_train), (X_test, y_test) = mnist.load_data()


Using TensorFlow backend.


In [11]:
#データセットの用意
from keras.datasets import mnist
import numpy as np
(X_train, y_train), (X_test, y_test) = mnist.load_data()
#reshape
test = X_train[0]
print(test.shape)
test = test.reshape(-1, 1)
print(test.shape)

(28, 28)
(784, 1)


In [12]:
#reshape
X_train = X_train.reshape(-1, X_train.shape[1]*X_train.shape[2])
X_test = X_test.reshape(-1, X_test.shape[1]*X_test.shape[2])
#前処理
X_train = X_train.astype(np.float)
X_test = X_test.astype(np.float)
X_train /= 255
X_test /= 255
print(X_train.max())
print(X_train.min())

1.0
0.0


In [95]:
import numpy as np
class XavierInitializer():
        def __init__(self):
                self.sigma = None
        def coef(self, n_nodes1,n_nodes2):
                self.sigma = 1 / np.power(n_nodes1, 1/2)
                W = np.random.normal(loc=0.0, scale=self.sigma, size=(n_nodes1,n_nodes2))
                B = np.random.normal(loc=0.0, scale=self.sigma,  size=(1, n_nodes2))
                return W, B               
        
class SimpleConv1d():
        """
        ノード数n_nodes1からn_nodes2への全結合層
        Parameters
        ----------
        n_nodes1 : int
          前の層のノード数
        n_nodes2 : int
          後の層のノード数
        initializer : 初期化方法のインスタンス
        optimizer : 最適化手法のインスタンス
        """
        def __init__(self, n_nodes1,n_nodes2, initializer, optimizer):
                self.W, self.B = initializer.coef(n_nodes1, n_nodes2)
                self.optimizer = optimizer
                
                self.dB = None
                self.dW = None
                
        def forward(self, X):
                self.pre_Z = np.copy(X)
                A = X @ self.W + self.B
                return A
        
        def backward(self, dA):
                self.dB = dA
                d_pre_Z = dA @ self.W.T
                self.dW = np.mean(self.pre_Z.T, axis=1).reshape(-1, 1) @ dA
                
                self = self.optimizer.update(self)
                return d_pre_Z
            
class StochasticGradientDescent():
        def __init__(self, lr):
                self.lr = lr
        
        def update(self, layer):
                layer.W = layer.W - self.lr * layer.dW
                layer.B = layer.B - self.lr * layer.dB
                return layer
                
class TanH():
        def __init__(self):
                self.A = None
        def forward(self, A):
                self.A = np.copy(A)
                return  1 - 2 / (1+ np.exp(2 * A))
        def backward(self, dZ):
                return dZ * (1- np.power(self.forward(np.mean(self.A, axis=0).reshape(1, -1)), 2))

class Softmax():
        def __init__(self):
                pass
        def forward(self, A):
                e_A = np.exp(A - np.max(A, axis=1, keepdims=True))
                return e_A/np.sum(e_A, axis=1, keepdims=True)
        
        def backward(self, Z, Y):
                return np.mean(Z - Y, axis=0).reshape(1, -1)
        
        def cross_entropy(self, Z, Y):
                return -Y * np.log(Z).sum()/Y.shape[0]

class Scratch1dCNNClassifier_test():
        def __init__(self, lr, batch_size, n_epochs):
                self.lr = lr
                self.batch_size = batch_size
                self.Cnv1 = None
                self.activation1 = None
                self.n_epochs = n_epochs
        
        def fit(self, X, y, X_val=None, y_val=None):
                optimizer = StochasticGradientDescent(self.lr)
                self.Cnv1 = SimpleConv1d(n_nodes1=784, n_nodes2=400, initializer=XavierInitializer(), optimizer=optimizer)
                self.activation1 = TanH()
                self.Cnv2 = SimpleConv1d(n_nodes1=400, n_nodes2=200, initializer=XavierInitializer(), optimizer=optimizer)
                self.activation2 = TanH()
                self.Cnv3 = SimpleConv1d(n_nodes1=200, n_nodes2=10, initializer=XavierInitializer(), optimizer=optimizer)
                self.activation3 = Softmax()
                
                mini_batch = GetMiniBatch(X, y, batch_size=self.batch_size)
                eye = np.eye(len(np.unique(y)))
                
                for epoch in range(self.n_epochs):
                        for mini_X, mini_y in mini_batch:
                                #forward
                                self._forward(mini_X)
                                self._backward(eye[mini_y.reshape(-1,)])
                
        
        def predict(self, X):
                X = np.array(X)
                self._forward(X)
                return np.argmax(self.Z3, axis=1)
                
        def _forward(self, X):
                A1 = self.Cnv1.forward(X)
                Z1 = self.activation1.forward(A1)
                A2 = self.Cnv2.forward(Z1)
                Z2 = self.activation2.forward(A2)
                A3 = self.Cnv3.forward(Z2)
                self.Z3 = self.activation3.forward(A3)
                
        def _backward(self, y):
                dA3 = self.activation3.backward(self.Z3, y)
                dZ2 = self.Cnv3.backward(dA3)
                dA2 = self.activation2.backward(dZ2)
                dZ1 = self.Cnv2.backward(dA2)
                dA1 = self.activation1.backward(dZ1)
                dZ0 = self.Cnv1.backward(dA1)  

class GetMiniBatch:
        """
        ミニバッチを取得するイテレータ

        Parameters
        ----------
        X : 次の形のndarray, shape (n_samples, n_features)
          学習データ
        y : 次の形のndarray, shape (n_samples, 1)
          正解値
        batch_size : int
          バッチサイズ
        seed : int
          NumPyの乱数のシード
        """
        def __init__(self, X, y, batch_size=10, seed=0):
                self.batch_size = batch_size
                np.random.seed(seed)
                shuffle_index = np.random.permutation(np.arange(X.shape[0]))
                self.X = X[shuffle_index]
                self.y = y[shuffle_index]
                self._stop = np.ceil(X.shape[0]/self.batch_size).astype(np.int)

        def __len__(self):
                return self._stop

        def __getitem__(self,item):
                p0 = item*self.batch_size
                p1 = item*self.batch_size + self.batch_size
                return self.X[p0:p1], self.y[p0:p1]        

        def __iter__(self):
                self._counter = 0
                return self

        def __next__(self):
                if self._counter >= self._stop:
                        raise StopIteration()
                p0 = self._counter*self.batch_size
                p1 = self._counter*self.batch_size + self.batch_size
                self._counter += 1
                return self.X[p0:p1], self.y[p0:p1]

In [96]:
clf = Scratch1dCNNClassifier_test(lr=0.01, batch_size=20, n_epochs=10)
clf.fit(X_train, y_train)
pred = clf.predict(X_test)
print(y_test)
print(pred)
print(y_test == pred)
print((y_test == pred).sum()/len(y_test))

[7 2 1 ... 4 5 6]
[7 2 1 ... 4 5 6]
[ True  True  True ...  True  True  True]


In [490]:
import numpy as np
class XavierInitializer():
        def __init__(self):
                self.sigma = None
        def coef(self, n_nodes1,n_nodes2):
                self.sigma = 1 / np.power(n_nodes1, 1/2)
                W = np.random.normal(loc=0.0, scale=self.sigma, size=(n_nodes1,n_nodes2))
                B = np.random.normal(loc=0.0, scale=self.sigma,  size=(1, n_nodes2))
                return W, B               
        
class SimpleConv1d():
        """
        ノード数n_nodes1からn_nodes2への全結合層
        Parameters
        ----------
        n_nodes1 : int
          前の層のノード数
        n_nodes2 : int
          後の層のノード数
        initializer : 初期化方法のインスタンス
        optimizer : 最適化手法のインスタンス
        """
        def __init__(self, n_nodes1, n_nodes2, initializer, optimizer):
                self.W, self.B = initializer.coef(n_nodes1=n_nodes1, n_nodes2=n_nodes2)
                self.optimizer = optimizer
                
                self.dB = None
                self.dW = None
                
        def forward(self, X):
                self.pre_Z = np.copy(X)
                A = X @ self.W + self.B
                return A
        
        def backward(self, dA):
                self.dB = dA
                d_pre_Z = dA @ self.W.T
                print("d_pre_Z", d_pre_Z.shape)
                print("dA", dA.shape)
                print("self.W", self.W.shape)
                print("self.pre_Z", self.pre_Z.shape)
                print("pre_Z", self.pre_Z.reshape(self.pre_Z.shape[0] * self.pre_Z.shape[1], -1))
                print("dA", dA.reshape(dA.shape[0] * dA.shape[1], -1))
                self.dW = (self.pre_Z.reshape(self.pre_Z.shape[0] * self.pre_Z.shape[1], -1)).T @ (dA.reshape(dA.shape[0] * dA.shape[1], -1))
                
                self = self.optimizer.update(self)
                return d_pre_Z
            
class StochasticGradientDescent():
        def __init__(self, lr):
                self.lr = lr
        
        def update(self, layer):
                layer.W = layer.W - self.lr * layer.dW
                layer.B = layer.B - self.lr * layer.dB
                return layer
                
class TanH():
        def __init__(self):
                self.A = None
        def forward(self, A):
                self.A = np.copy(A)
                return  1 - 2 / (1+ np.exp(2 * A))
        def backward(self, dZ):
                return dZ * (1- np.power(self.forward(self.A), 2))

class Softmax():
        def __init__(self):
                pass
        def forward(self, A):
                e_A = np.exp(A - np.max(A, axis=1, keepdims=True))
                return e_A/np.sum(e_A, axis=1, keepdims=True)
        
        def backward(self, Z, Y):
                #np.mean(Z - Y, axis=0).reshape(1, -1)
                return Z - Y
        
        def cross_entropy(self, Z, Y):
                return -Y * np.log(Z).sum()/Y.shape[0]

class Scratch1dCNNClassifier():
        def __init__(self, lr, batch_size, n_epochs, filter_size=[5, 3]):
                self.lr = lr
                self.batch_size = batch_size
                self.Cnv1 = None
                self.activation1 = None
                self.n_epochs = n_epochs
                self.filter_size = filter_size
                self.n_features = None
                self.stride_size = 1
                self.Z1 = None
                self.output_size = None

        
        def fit(self, X, y, X_val=None, y_val=None):
                optimizer = StochasticGradientDescent(self.lr)
                self.Cnv1 = SimpleConv1d(n_nodes1=self.filter_size[0], n_nodes2=self.filter_size[1], initializer=XavierInitializer(), optimizer=optimizer)
                self.activation1 = TanH()
                self.Cnv2 = SimpleConv1d(n_nodes1=self.filter_size[1], n_nodes2=1, initializer=XavierInitializer(), optimizer=optimizer)
                self.activation2 = Softmax()
                
                eye = np.eye(len(np.unique(y)))
                self.n_features = X.shape[1]
                
                self._forward(X)
                self._backward(y)
                """
                for epoch in range(self.n_epochs):
                        for mini_X, mini_y in mini_batch:
                                #forward
                                self._forward(mini_X)
                                self._backward(eye[mini_y.reshape(-1,)])
                """
        
        def predict(self, X):
                X = np.array(X)
                self._forward(X)

                return np.argmax(self.Z2, axis=1)
        
        def _gen_window_indices(self, filter_size):
                indices = np.empty((self.output_size, filter_size))#2, 1, 3
                for slide in range(self.output_size):
                        indices[slide] = np.arange(slide, filter_size + slide)
                return indices.astype(np.int64)
                
        def _get_output_size(self, X, input_size, padding_size, filter_size, stride_size):
                    self.output_size = int((input_size - filter_size + 1) / self.stride_size) #scaler
                    return
        
        def _forward(self, X):
                #A1 = np.empty((self.output_size, ))
                #slideごとにX上にできる窓を作る
                #X_windows = self._gen_windows(X)#shape(output_size, batch_size, filter_size)
                
                #filter size分のindexをoutput_size分作ってshape(output_size, filter_size)に
                #X = [a, b, c, d, e, f, g], filter_size= 3なら [0,1,2], [1,2,3], [2,3,4],[3,4,5], [4,5,6], [5,6,7]を作る
                
                #slide数(output_size)
                self._get_output_size(
                        X=X,
                        input_size=self.n_features,
                        padding_size=0,
                        filter_size=self.filter_size[0],
                        stride_size=1
                )
                
                window_indices_1 = self._gen_window_indices(self.filter_size[0])#shape(output_size, filter_size)
                A1 = np.empty((self.output_size, self.batch_size, self.filter_size[1]))
                
                A2 = np.empty((self.output_size, self.batch_size, 1))   
                for slide, window in enumerate(window_indices_1):
                        A1[slide] = self.Cnv1.forward(X[:, window])
                
                Z1 = self.activation1.forward(A1)
                self._get_output_size(
                        X=X,
                        input_size=self.n_features,
                        padding_size=0,
                        filter_size=self.filter_size[0],
                        stride_size=1
                )
                window_indices_2 = self._gen_window_indices(self.filter_size[1])
                for slide, window in enumerate(window_indices_1):
                        A1[slide] = self.Cnv1.forward(X[:, window])
                A2 = self.Cnv2.forward(Z1)
                self.Z2 = self.activation2.forward(A2)
        
        def _backward(self, y):
                dA2 = self.activation2.backward(self.Z2, y)
                print("dA2", dA2.shape)
                dZ1 = self.Cnv2.backward(dA2)  
                dA1 = self.activation1.backward(dZ1)

class GetMiniBatch:
        """
        ミニバッチを取得するイテレータ

        Parameters
        ----------
        X : 次の形のndarray, shape (n_samples, n_features)
          学習データ
        y : 次の形のndarray, shape (n_samples, 1)
          正解値
        batch_size : int
          バッチサイズ
        seed : int
          NumPyの乱数のシード
        """
        def __init__(self, X, y, batch_size=10, seed=0):
                self.batch_size = batch_size
                np.random.seed(seed)
                shuffle_index = np.random.permutation(np.arange(X.shape[0]))
                self.X = X[shuffle_index]
                self.y = y[shuffle_index]
                self._stop = np.ceil(X.shape[0]/self.batch_size).astype(np.int)

        def __len__(self):
                return self._stop

        def __getitem__(self,item):
                p0 = item*self.batch_size
                p1 = item*self.batch_size + self.batch_size
                return self.X[p0:p1], self.y[p0:p1]        

        def __iter__(self):
                self._counter = 0
                return self

        def __next__(self):
                if self._counter >= self._stop:
                        raise StopIteration()
                p0 = self._counter*self.batch_size
                p1 = self._counter*self.batch_size + self.batch_size
                self._counter += 1
                return self.X[p0:p1], self.y[p0:p1]

In [492]:
cnn = Scratch1dCNNClassifier(lr=0.01, batch_size=2, n_epochs=10, filter_size=[5,3])
cnn.fit(X, y)
cnn.predict( np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]))


dA2 (0, 2, 1)
d_pre_Z (0, 2, 3)
dA (0, 2, 1)
self.W (3, 1)
self.pre_Z (0, 2, 3)


ValueError: cannot reshape array of size 0 into shape (0,newaxis)

In [491]:

X = np.array([
    [1, 2, 3, 4]
])
"""
X = np.array([
    [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
    [2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
])
"""
y = np.array([
    [0],
    [1]
])
X_size = X.shape[1]
print(X)
print(X.shape)
print(X_size)

[[1 2 3 4]]
(1, 4)
4


In [191]:
y = [1]

In [192]:
b = 1

In [193]:
X[:, 0:2] 

array([[ 1,  2],
       [ 5,  6],
       [ 9, 10],
       [13, 14],
       [17, 18]])

In [194]:
w = np.array([
    [3],
    [5],
    [7]
])
f_size = w.shape[0]
print(w)
print(w.shape)
print(f_size)

[[3]
 [5]
 [7]]
(3, 1)
3


In [205]:
a = np.empty((X_size - f_size + 1, len(X), 1))

for s in range(X_size - f_size + 1):
        a[s] = X[:, s:(s + f_size)] @ w + b
a

array([[[ 35.],
        [ 95.],
        [155.],
        [215.],
        [275.]],

       [[ 50.],
        [110.],
        [170.],
        [230.],
        [290.]]])

## 【問題2】1次元畳み込み後の出力サイズの計算
畳み込みを行うと特徴量の数が変化します。どのように変化するかは以下の数式から求められます。パディングやストライドも含めています。この計算を行う関数を作成してください
$$
N_{out} =  \frac{N_{in}+2P-F}{S} + 1\\
$$
$N_{out}$: 出力のサイズ（特徴量の数）

$N_in$ : 入力のサイズ（特徴量の数）

$P$ : ある方向へのパディングの数

$F$ : フィルタのサイズ

$S$ : ストライドのサイズ

## 【問題3】小さな配列での1次元畳み込み層の実験
次に示す小さな配列でフォワードプロパゲーションとバックプロパゲーションが正しく行えているか確認してください。

入力x、重みw、バイアスbを次のようにします。

## 【問題4】チャンネル数を限定しない1次元畳み込み層クラスの作成

チャンネル数を1に限定しない1次元畳み込み層のクラスConv1dを作成してください。

紙やホワイトボードを使い計算グラフを書きながら考えてください。

## 【問題5】学習・推定
これまで使ってきたニューラルネットワークの全結合層の一部をConv1dに置き換えて学習と推定を行ってください。出力層だけは全結合層をそのまま使ってください。

チャンネルが複数ある状態では全結合層への入力は行えません。その段階でのチャンネルは1になるようにするか、平滑化を行います。平滑化はNumPyのreshapeが使用できます。

numpy.reshape — NumPy v1.15 Manual

画像に対しての1次元畳み込みは実用上は行わないことのため、精度は問いません。

## 【問題6】（アドバンス課題）パディングの実装
畳み込み層にパディングを加えてください。1次元配列の場合、前後にn個特徴量を増やせるようにしてください。

最も単純なパディングは全て0で埋めるゼロパディングであり、CNNでは一般的です。他に端の値を繰り返す方法などもあります。

フレームワークによっては、元の入力のサイズを保つようにという指定をすることができます。この機能も持たせておくと便利です。

なお、NumPyにはパディングの関数が存在します。

numpy.pad — NumPy v1.15 Manual

## 【問題7】（アドバンス課題）ミニバッチへの対応
ここまでの課題はバッチサイズ1で良いとしてきました。しかし、実際は全結合層同様にミニバッチ学習が行われます。Conv1dクラスを複数のデータが同時に計算できるように変更してください。

## 【問題8】（アドバンス課題）任意のストライド数
ストライドは1限定の実装をしてきましたが、任意のストライド数に対応できるようにしてください。