# Sprint 深層学習スクラッチ 畳み込みニューラルネットワーク1

In [1]:
import numpy as np

## 1.このSprintについて
### Sprintの目的
- スクラッチを通してCNNの基礎を理解する

### どのように学ぶか
スクラッチで1次元用畳み込みニューラルネットワークを実装した後、学習と検証を行なっていきます。

## 2.1次元の畳み込みニューラルネットワークスクラッチ

**畳み込みニューラルネットワーク（CNN）** のクラスをスクラッチで作成していきます。NumPyなど最低限のライブラリのみを使いアルゴリズムを実装していきます。

このSprintでは1次元の **畳み込み層** を作成し、畳み込みの基礎を理解することを目指します。次のSprintでは2次元畳み込み層とプーリング層を作成することで、一般的に画像に対して利用されるCNNを完成させます。

クラスの名前はScratch1dCNNClassifierとしてください。クラスの構造などは前のSprintで作成したScratchDeepNeuralNetrowkClassifierを参考にしてください。

### 1次元畳み込み層とは
CNNでは画像に対しての2次元畳み込み層が定番ですが、ここでは理解しやすくするためにまずは1次元畳み込み層を実装します。1次元畳み込みは実用上は自然言語や波形データなどの 系列データ で使われることが多いです。

畳み込みは任意の次元に対して考えることができ、立体データに対しての3次元畳み込みまではフレームワークで一般的に用意されています。

### データセットの用意
検証には引き続きMNISTデータセットを使用します。1次元畳み込みでは全結合のニューラルネットワークと同様に平滑化されたものを入力します。

In [2]:
# MNISTデータ　ロード
from tensorflow.keras.datasets import mnist
(X_train, y_train), (X_test, y_test) = mnist.load_data()
# 平滑化
X_train = X_train.reshape(-1, 784)
X_test = X_test.reshape(-1, 784)
# 前処理
X_train = X_train.astype(np.float)
X_test = X_test.astype(np.float)
X_train /= 255
X_test /= 255
print(X_train.max()) # 1.0
print(X_train.min()) # 0.0

1.0
0.0


### 【問題1】チャンネル数を1に限定した1次元畳み込み層クラスの作成
チャンネル数を1に限定した1次元畳み込み層のクラスSimpleConv1dを作成してください。基本構造は前のSprintで作成した全結合層のFCクラスと同じになります。なお、重みの初期化に関するクラスは必要に応じて作り変えてください。Xavierの初期値などを使う点は全結合層と同様です。

ここでは **パディング** は考えず、**ストライド** も1に固定します。また、複数のデータを同時に処理することも考えなくて良く、バッチサイズは1のみに対応してください。この部分の拡張はアドバンス課題とします。

フォワードプロパゲーションの数式は以下のようになります。
$$
a_i = \sum_{s=0}^{F-1}x_{(i+s)}w_s+b
$$

$a_i$  : 出力される配列のi番目の値

$F$ : フィルタのサイズ

$x_(i+s)$ : 入力の配列の(i+s)番目の値

$w_s$ : 重みの配列のs番目の値

$b$ : バイアス項

全てスカラーです。

次に更新式です。ここがAdaGradなどに置き換えられる点は全結合層と同様です。

$\alpha$ : 学習率

$\frac{\partial L}{\partial w_s}$ : $w_s$ に関する損失 $L$ の勾配

$\frac{\partial L}{\partial b}$ : $b$ に関する損失 $L$ の勾配

勾配 $\frac{\partial L}{\partial w_s}$ や $\frac{\partial L}{\partial b}$ を求めるためのバックプロパゲーションの数式が以下です。

$$
\frac{\partial L}{\partial w_s} = \sum_{i=0}^{N_{out}-1} \frac{\partial L}{\partial a_i}x_{(i+s)}\\
\frac{\partial L}{\partial b} = \sum_{i=0}^{N_{out}-1} \frac{\partial L}{\partial a_i}
$$

$\frac{\partial L}{\partial a_i}$ : 勾配の配列のi番目の値

$N_{out}$ : 出力のサイズ

前の層に流す誤差の数式は以下です。

$$
\frac{\partial L}{\partial x_j} = \sum_{s=0}^{F-1} \frac{\partial L}{\partial a_{(j-s)}}w_s
$$

$\frac{\partial L}{\partial x_j}$ : 前の層に流す誤差の配列のj番目の値

ただし、 $j-s<0$ または $j-s>N_{out}-1$ のとき $\frac{\partial L}{\partial a_{(j-s)}} =0$ です。

全結合層との大きな違いは、重みが複数の特徴量に対して共有されていることです。この場合は共有されている分の誤差を全て足すことで勾配を求めます。計算グラフ上での分岐はバックプロパゲーションの際に誤差の足し算をすれば良いことになります。

In [3]:
class SimpleConv1d:
    """
    チャンネル数を1に限定した1次元畳み込み層
    Parameters
    ----------
    initializer : 初期化方法のインスタンス
    optimizer : 最適化手法のインスタンス
    input_channels : int
      入力のチャンネル数
    filter _sizes : int
      フィルタのサイズ
    stride_sizes : int
      スライドのサイズ
    output_channels : int
      出力のチャンネル数
    """
    # def __init__(self, n_nodes1, n_nodes2, initializer, optimizer):
    def __init__(self, input_channels, filter_sizes, stride_sizes, initializer, optimizer):
        self.input_channels = input_channels
        self.filter_sizes = filter_sizes
        self.stride_sizes = stride_sizes
        self.optimizer = optimizer
        # 初期化
        # initializerのメソッドを使い、self.Wとself.Bを初期化する
        # self.W = initializer.W(input_channels, output_channels, strides)
        # self.B = initializer.B(output_channels)
        if initializer is None:
            self.W = np.array([3, 5, 7])
            self.B = np.array([1])
    
    def forward(self, X):
        """
        フォワード
        Parameters
        ----------
        X : 次の形のndarray, shape (input_channels, n_features)
            入力
        Returns
        ----------
        A : 次の形のndarray, shape (output_channels, filter_sizes-1)
            出力
        """        
        # A = np.dot(X, self.W) + self.B
        if self.stride_sizes == 1:
            A = np.convolve(X, self.W[::-1], mode='valid') + self.B
        return A
    def backward(self, dA, X):
        """
        バックワード
        Parameters
        ----------
        dA : 次の形のndarray, shape (batch_size, n_nodes2)
            後ろから流れてきた勾配
        Z : 次の形のndarray, shape (batch_size, n_nodes1)
            一層前の活性化関数の出力結果
        Returns
        ----------
        dZ : 次の形のndarray, shape (batch_size, n_nodes1)
            前に流す勾配
        """
        # dZを計算
        # dZ = np.dot(dA, self.W.T)
        
        # 更新
        #  self = self.optimizer.update(self, dA, Z)
        # return dZ
        self = self.optimizer.update(self, dA, X)

- optimizer_class

In [4]:
class SGD:
    """
    確率的勾配降下法
    Parameters
    ----------
    lr : 学習率
    """
    def __init__(self, lr=0.1):
        self.lr = lr
    def update(self, layer, dA, X):
        """
        ある層の重みやバイアスの更新
        Parameters
        ----------
        layer : 更新前の層のインスタンス
        dA : 後ろから流れてきた勾配
        """
        if len(dA.shape) == 1:
            # 各勾配の計算
            # dB = dA.sum(axis=0)
            dB = np.convolve(dA, np.array([1]*dA.shape[0]), mode='valid')
            print(dB.shape, dB)
            
            dW = np.convolve(X, dA[::-1], mode='valid')
            print(dW.shape, dW)
            self.dW = dW
            
            dX = np.zeros(X.shape[0])
            diff = X.shape[0] - dA.shape[0]
            if diff > 0:
                dA_trans = np.concatenate([np.array([0]*diff), dA, np.array([0]*diff)], axis=0)
                # 重みの数
                w_num = layer.W.shape[0]
#                 print(diff)
#                 print(w_num)
#                 print(dA_trans)
                for i in range(X.shape[0]):
                    dX[i] = np.dot(dA_trans[i: w_num+i], layer.W[::-1])
                print(dX.shape, dX)
            
            # バイアス、重みの更新
            # layer.B -= dB * self.lr
            layer.B = layer.B - dB * self.lr
            # layer.W -= dW * self.lr
            layer.W = layer.W - dW * self.lr

## 【問題2】1次元畳み込み後の出力サイズの計算
畳み込みを行うと特徴量の数が変化します。どのように変化するかは以下の数式から求められます。パディングやストライドも含めています。この計算を行う関数を作成してください。
$$
N_{out} =  \frac{N_{in}+2P-F}{S} + 1\\
$$

$N_{out}$ : 出力のサイズ（特徴量の数）

$N_{in}$ : 入力のサイズ（特徴量の数）

$P$ : ある方向へのパディングの数

$F$ : フィルタのサイズ

$S$ : ストライドのサイズ

In [5]:
def calc_output_nums(input_nums, paddings, filter_sizes, stride_sizes):
    output_nums = (input_nums + 2*paddings - filter_sizes) // stride_sizes + 1
    return output_nums

## 【問題3】小さな配列での1次元畳み込み層の実験
次に示す小さな配列でフォワードプロパゲーションとバックプロパゲーションが正しく行えているか確認してください。

入力x、重みw、バイアスbを次のようにします。

In [6]:
# x = np.array([1,2,3,4])
# w = np.array([3, 5, 7])
# b = np.array([1])

In [7]:
initializer=None
optimizer=SGD()
input_channels =1
filter_sizes=3
stride_sizes = 1
simple_conv1d = SimpleConv1d(input_channels, filter_sizes, stride_sizes, initializer, optimizer)

In [8]:
simple_conv1d.W, simple_conv1d.B

(array([3, 5, 7]), array([1]))

In [9]:
X = np.array([1,2,3,4])

フォワードプロパゲーションをすると出力は次のようになります。

In [10]:
# a = np.array([35, 50])

In [11]:
# フォワードプロぱゲーション
simple_conv1d.forward(X)

array([35, 50])

次にバックプロパゲーションを考えます。誤差は次のようであったとします。

In [12]:
# delta_a = np.array([10, 20])

バックプロパゲーションをすると次のような値になります。

In [13]:
# delta_b = np.array([30])
# delta_w = np.array([50, 80, 110])
# delta_x = np.array([30, 110, 170, 140])

In [14]:
# バックプロパゲーション 
dA = np.array([10, 20])
simple_conv1d.backward(dA, X)

(1,) [30]
(3,) [ 50  80 110]
(4,) [ 30. 110. 170. 140.]


## 実装上の工夫
畳み込みを実装する場合は、まずはfor文を重ねていく形で構いません。しかし、できるだけ計算は効率化させたいため、以下の式を一度に計算する方法を考えることにします。
$$
a_i = \sum_{s=0}^{F-1}x_{(i+s)}w_s+b
$$
バイアス項は単純な足し算のため、重みの部分を見ます。
$$
\sum_{s=0}^{F-1}x_{(i+s)}w_s
$$
これは、xの一部を取り出した配列とwの配列の内積です。具体的な状況を考えると、以下のようなコードで計算できます。この例では流れを分かりやすくするために、各要素同士でアダマール積を計算してから合計を計算しています。これは結果的に内積と同様です。

In [15]:
# x = np.array([1, 2, 3, 4])
# w = np.array([3, 5, 7])
# a = np.empty((2, 3))
# indexes0 = np.array([0, 1, 2]).astype(np.int)
# indexes1 = np.array([1, 2, 3]).astype(np.int)
# a[0] = x[indexes0]*w # x[indexes0]は([1, 2, 3])である
# a[1] = x[indexes1]*w # x[indexes1]は([2, 3, 4])である
# a = a.sum(axis=1) # array([34., 49.,])

ndarrayは配列を使ったインデックス指定ができることを利用した方法です。

また、二次元配列を使えば一次元配列から二次元配列が取り出せます。

In [16]:
# x = np.array([1, 2, 3, 4])
# indexes = np.array([[0, 1, 2], [1, 2, 3]]).astype(np.int)
# print(x[indexes]) # ([[1, 2, 3], [2, 3, 4]])

このこととブロードキャストなどをうまく組み合わせることで、一度にまとめて計算することも可能です。

畳み込みの計算方法に正解はないので、自分なりに効率化していってください。

**《参考》**

以下のページのInteger array indexingの部分がこの方法についての記述です。

[Indexing — NumPy v1.17 Manual](https://docs.scipy.org/doc/numpy/reference/arrays.indexing.html)

### 【問題4】チャンネル数を限定しない1次元畳み込み層クラスの作成
チャンネル数を1に限定しない1次元畳み込み層のクラスConv1dを作成してください。

In [17]:
class Conv1d:
    """
    チャンネル数を限定しない1次元畳み込み層
    Parameters
    ----------
    initializer : 初期化方法のインスタンス
    optimizer : 最適化手法のインスタンス
    input_channels : int
      入力のチャンネル数
    output_channels : int
      出力のチャンネル数
    filter _sizes : int
      フィルタのサイズ
    stride_sizes : int
      スライドのサイズ
    paddings : int
      ある方向へのパディングする数
    """
    
    # def __init__(self, n_nodes1, n_nodes2, initializer, optimizer):
    def __init__(self, input_channels, output_channels, filter_sizes, stride_sizes, paddings, initializer, optimizer):
        self.input_channels = input_channels
        self.output_channels = output_channels
        self.filter_sizes = filter_sizes
        self.stride_sizes = stride_sizes
        self.paddings = paddings
        self.optimizer = optimizer
        # 初期化
        # initializerのメソッドを使い、self.Wとself.Bを初期化する
        # self.W = initializer.W(input_channels, output_channels, strides)
        # self.B = initializer.B(output_channels)
        if initializer is None:
            self.W = np.ones((self.output_channels, self.input_channels, self.filter_sizes))
            self.B = np.arange(1, self.output_channels+1)
            
    def calc_output_nums(self, input_nums, paddings, filter_sizes, stride_sizes):
        output_nums = (input_nums + 2*paddings - filter_sizes) // stride_sizes + 1
        return output_nums
    
    def forward(self, X):
        """
        フォワード
        Parameters
        ----------
        X : 次の形のndarray, shape (input_nums, n_features)
            入力
        Returns
        ----------
        A : 次の形のndarray, shape (output_channels, output_nums)
            出力
        """
        print('フォワードプロパゲーション')
        _, input_nums = X.shape
        # 出力する特徴量数を計算
        output_nums = self.calc_output_nums(input_nums, self.paddings, self.filter_sizes, self.stride_sizes)
        
        # インデックスを計算
        indexes = np.array([[i for i in range(j * self.stride_sizes , self.filter_sizes + j * self.stride_sizes)]for j in range(output_nums)])
        print(indexes)
        # パディングの処理、Xにゼロを追加する
        if self.paddings > 0:
            X = np.pad(X, [(0, 0), (self.paddings, self.paddings)], 'constant')
            print(X)
            
        # 畳み込みを実施
        A = (X[:,indexes] * self.W[:, :, np.newaxis]).sum(axis=1).sum(axis=2) + self.B[:, np.newaxis]
        return A
    
    def backward(self, dA, X):
        """
        バックワード
        Parameters
        ----------
        dA : 次の形のndarray, shape (self.output_channels, output_nums)
            後ろから流れてきた勾配
        X : 次の形のndarray, shape (self.input_channles, input_nums)
            入力データ
        Returns
        ----------
        dZ : 次の形のndarray, shape (batch_size, n_nodes1)
            前に流す勾配
        """
        # dZを計算
        # dZ = np.dot(dA, self.W.T)
        
        # 更新
        #  self = self.optimizer.update(self, dA, Z)
        # return dZ
        self = self.optimizer.update(self, dA, X)

In [18]:
class SGD_kai:
    """
    確率的勾配降下法
    Parameters
    ----------
    lr : 学習率
    """
    def __init__(self, lr=0.001):
        self.lr = lr
    def update(self, layer, dA, X):
        """
        ある層の重みやバイアスの更新
        Parameters
        ----------
        layer : 更新前の層のインスタンス
        dA : 次の形のndarray, shape (self.output_channels, output_nums)
            後ろから流れてきた勾配
        X : 次の形のndarray, shape (self.input_channles, input_nums)
            入力データ
        """
        print('バックプロパゲーション ')
        # 各勾配の計算
        axis = dA.ndim-1
        dB = dA.sum(axis=axis)
        print('dB', dB.shape, dB)
        
        # 3　はfilter_size, 2は出力のサイズ(N_out)
        N_out = dA.shape[-1]
        print(N_out)
        indexes = np.array([[i for i in range(j * layer.stride_sizes, (N_out+j) * layer.stride_sizes)] for j in range(layer.filter_sizes)])
        print(indexes)
        
        # パディングの処理？　行わないとXの形が変わっている
        if layer.paddings > 0:
            X = np.pad(X, [(0, 0), (layer.paddings, layer.paddings)], 'constant')
        
        dW = np.dot(dA, X[:, indexes.T])
        print('dW', dW.shape, dW)
        
        # 2はinput_channels, 4はfilter_sizes + N_out - 1
        dX = np.zeros((layer.input_channels, layer.filter_sizes + N_out - 1))
        # 重みを転置して、dAとの行列積をとり、再度転置
        calc = np.dot(layer.W.T, dA).T
        # 2はN_out
        for i in range(N_out):
            dX[:, i:i+layer.filter_sizes] += calc[i]
        print('dX', dX.shape, dX)

        
        '''
            dX = np.zeros(X.shape[0])
            diff = X.shape[0] - dA.shape[0]
            if diff > 0:
                dA_trans = np.concatenate([np.array([0]*diff), dA, np.array([0]*diff)], axis=0)
                # 重みの数
                w_num = layer.W.shape[0]
#                 print(diff)
#                 print(w_num)
#                 print(dA_trans)
                for i in range(X.shape[0]):
                    dX[i] = np.dot(dA_trans[i: w_num+i], layer.W[::-1])
                print(dX.shape, dX)
            
            # バイアス、重みの更新
            # layer.B -= dB * self.lr
            layer.B = layer.B - dB * self.lr
            # layer.W -= dW * self.lr
            layer.W = layer.W - dW * self.lr
        '''

In [19]:
input_channels = 2
output_channels = 3
filter_sizes = 3
stride_sizes = 1
paddings = 0
initializer = None 
optimizer = SGD_kai()
conv1d = Conv1d(input_channels, output_channels, filter_sizes, stride_sizes, paddings, initializer, optimizer)

- 初期化した重みの確認

In [20]:
conv1d.W, conv1d.B

(array([[[1., 1., 1.],
         [1., 1., 1.]],
 
        [[1., 1., 1.],
         [1., 1., 1.]],
 
        [[1., 1., 1.],
         [1., 1., 1.]]]),
 array([1, 2, 3]))

In [21]:
X = np.array([[1, 2, 3, 4], [2, 3, 4, 5]])
X.shape

(2, 4)

- パディングの処理

In [22]:
if paddings > 0:
    local_X = np.pad(X, [(0, 0), (paddings, paddings)], 'constant')
else:
    local_X = X
local_X

array([[1, 2, 3, 4],
       [2, 3, 4, 5]])

- 出力数の計算

In [23]:
def calc_output_nums(input_nums, paddings, filter_sizes, stride_sizes):
    output_nums = (input_nums + 2*paddings - filter_sizes) // stride_sizes + 1
    return output_nums

In [24]:
N_out = calc_output_nums(input_nums=4, paddings=paddings, filter_sizes=3, stride_sizes=1)
N_out

2

- 畳み込み計算するインデックスの取得

In [25]:
indexes = np.array([[i for i in range(j, filter_sizes+j)] for j in range(N_out)])
indexes

array([[0, 1, 2],
       [1, 2, 3]])

In [26]:
local_X[:,indexes], local_X[:,indexes].shape

(array([[[1, 2, 3],
         [2, 3, 4]],
 
        [[2, 3, 4],
         [3, 4, 5]]]),
 (2, 2, 3))

In [27]:
conv1d.W.shape

(3, 2, 3)

- 畳み込み計算の実行

In [28]:
(local_X[:,indexes] * conv1d.W[:, :, np.newaxis]), (local_X[:,indexes] * conv1d.W[:, :, np.newaxis]).shape

(array([[[[1., 2., 3.],
          [2., 3., 4.]],
 
         [[2., 3., 4.],
          [3., 4., 5.]]],
 
 
        [[[1., 2., 3.],
          [2., 3., 4.]],
 
         [[2., 3., 4.],
          [3., 4., 5.]]],
 
 
        [[[1., 2., 3.],
          [2., 3., 4.]],
 
         [[2., 3., 4.],
          [3., 4., 5.]]]]),
 (3, 2, 2, 3))

- 畳み込み計算で残したい軸以外を合計

In [29]:
(local_X[:,indexes] * conv1d.W[:, :, np.newaxis]).sum(axis=1).sum(axis=2) + conv1d.B[:, np.newaxis]

array([[16., 22.],
       [17., 23.],
       [18., 24.]])

In [30]:
# クラスで実行確認
A = conv1d.forward(X)
A

フォワードプロパゲーション
[[0 1 2]
 [1 2 3]]


array([[16., 22.],
       [17., 23.],
       [18., 24.]])

- dAはとりあえず上記で計算したAを使う

In [31]:
dA = conv1d.forward(X)
dA, dA.shape

フォワードプロパゲーション
[[0 1 2]
 [1 2 3]]


(array([[16., 22.],
        [17., 23.],
        [18., 24.]]),
 (3, 2))

- dBを計算

In [32]:
# dAの最後の軸に沿って合計する
axis = conv1d.forward(X).ndim-1
dB = conv1d.forward(X).sum(axis=axis)
dB

フォワードプロパゲーション
[[0 1 2]
 [1 2 3]]
フォワードプロパゲーション
[[0 1 2]
 [1 2 3]]


array([38., 40., 42.])

- dWを計算

In [55]:
local_X, local_X.shape

(array([[1, 2, 3, 4],
        [2, 3, 4, 5]]),
 (2, 4))

In [56]:
# 3　はfilter_size, 2は出力のサイズ(N_out)
indexes = np.array([[i for i in range(j, N_out+j)] for j in range(filter_sizes)])
indexes, indexes.shape

(array([[0, 1],
        [1, 2],
        [2, 3]]),
 (3, 2))

In [57]:
indexes.T, indexes.T.shape

(array([[0, 1, 2],
        [1, 2, 3]]),
 (2, 3))

In [58]:
local_X[:, indexes.T], (local_X[:, indexes.T]).shape

(array([[[1, 2, 3],
         [2, 3, 4]],
 
        [[2, 3, 4],
         [3, 4, 5]]]),
 (2, 2, 3))

In [59]:
# dW　の計算
np.dot(dA, local_X[:, indexes.T]), np.dot(dA, local_X[:, indexes.T]).shape

(array([[[ 60.,  98., 136.],
         [ 98., 136., 174.]],
 
        [[ 63., 103., 143.],
         [103., 143., 183.]],
 
        [[ 66., 108., 150.],
         [108., 150., 192.]]]),
 (3, 2, 3))

In [60]:
conv1d.backward(dA, X)

バックプロパゲーション 
dB (3,) [38. 40. 42.]
2
[[0 1]
 [1 2]
 [2 3]]
dW (3, 2, 3) [[[ 60.  98. 136.]
  [ 98. 136. 174.]]

 [[ 63. 103. 143.]
  [103. 143. 183.]]

 [[ 66. 108. 150.]
  [108. 150. 192.]]]
dX (2, 4) [[ 51. 120. 120.  69.]
 [ 51. 120. 120.  69.]]


- dXを計算

In [39]:
filters = np.arange(output_channels*input_channels*filter_sizes).reshape(output_channels, input_channels, filter_sizes)
filters, filters.shape

(array([[[ 0,  1,  2],
         [ 3,  4,  5]],
 
        [[ 6,  7,  8],
         [ 9, 10, 11]],
 
        [[12, 13, 14],
         [15, 16, 17]]]),
 (3, 2, 3))

In [40]:
dA, dA.shape

(array([[16., 22.],
        [17., 23.],
        [18., 24.]]),
 (3, 2))

In [41]:
filters.T

array([[[ 0,  6, 12],
        [ 3,  9, 15]],

       [[ 1,  7, 13],
        [ 4, 10, 16]],

       [[ 2,  8, 14],
        [ 5, 11, 17]]])

In [42]:
calc = np.dot(filters.T, dA).T
calc

array([[[318., 369., 420.],
        [471., 522., 573.]],

       [[426., 495., 564.],
        [633., 702., 771.]]])

In [43]:
# 2はinput_channels, 4はfilter_sizes + N_out - 1
dX = np.zeros((input_channels, filter_sizes+N_out-1))
# 重みを転置して、dAとの行列積をとり、再度転置
calc = np.dot(filters.T, dA).T
# 2はN_out
for i in range(N_out):
    dX[:, i:i+filter_sizes] += calc[i]
dX

array([[ 318.,  795.,  915.,  564.],
       [ 471., 1155., 1275.,  771.]])

In [44]:
# 2はinput_channels, 4はfilter_sizes + N_out - 1
dX = np.zeros((input_channels, filter_sizes+N_out-1))
# 重みを転置して、dAとの行列積をとり、再度転置
calc = np.dot(conv1d.W.T, dA).T
# 2はN_out
for i in range(N_out):
    dX[:, i:i+filter_sizes] += calc[i]
dX

array([[ 51., 120., 120.,  69.],
       [ 51., 120., 120.,  69.]])

In [45]:
w = np.array([3, 5, 7])
dA_ = np.array([10, 20])
w * dA_[:, np.newaxis]

array([[ 30,  50,  70],
       [ 60, 100, 140]])

In [46]:
tmp = w[:, np.newaxis] @ dA_[np.newaxis]
tmp.T

array([[ 30,  50,  70],
       [ 60, 100, 140]])

In [47]:
x = np.array([1, 2, 3, 4])
w = np.array([3, 5, 7])
a = np.empty((2, 3))
indexes0 = np.array([0, 1, 2]).astype(np.int)
indexes1 = np.array([1, 2, 3]).astype(np.int)
a[0] = x[indexes0]*w # x[indexes0]は([1, 2, 3])である
a[1] = x[indexes1]*w # x[indexes1]は([2, 3, 4])である
# a = a.sum(axis=1)

In [48]:
a

array([[ 3., 10., 21.],
       [ 6., 15., 28.]])

In [49]:
conv1d.W * (X[:, 0:3][np.newaxis], X[:, 1:4][np.newaxis])

array([[[[1., 2., 3.],
         [2., 3., 4.]],

        [[1., 2., 3.],
         [2., 3., 4.]],

        [[1., 2., 3.],
         [2., 3., 4.]]],


       [[[2., 3., 4.],
         [3., 4., 5.]],

        [[2., 3., 4.],
         [3., 4., 5.]],

        [[2., 3., 4.],
         [3., 4., 5.]]]])

In [50]:
tmp = (conv1d.W * (X[:, 0:3][np.newaxis], X[:, 1:4][np.newaxis])).sum(axis=0).sum(axis=2)
tmp

array([[15., 21.],
       [15., 21.],
       [15., 21.]])

In [51]:
tmp2 = (filters * (X[:, 0:3][np.newaxis], X[:, 1:4][np.newaxis])).sum(axis=0).sum(axis=2)
tmp2

array([[ 19,  88],
       [109, 214],
       [199, 340]])

In [52]:
from copy import deepcopy
import numpy as np
class ScratchDeepNeuralNetworkClassifier():
    """
    多層なニューラルネットワーク分類器
    Parameters
    ----------
    Attributes
    ----------
    """
    def __init__(self, verbose=False, random_state=None, activation='relu', epoch=10, batch_size=20, lr=0.0001, hidden_layer_sizes=(400, 200,), initialize=None, sigma=None, optimize='adagrad'):
        self.verbose = verbose
        # 学習率
        self.lr = lr
        # 層のノード数のタプル
        self.hidden_layer_sizes = hidden_layer_sizes
        # バッチサイズ
        self.batch_size = batch_size
        # epochの回数
        self.epoch = epoch
        # 活性化関数の名前　'sigmoid', 'tanh', 'relu'
        self.activation = activation
        # 乱数の設定
        self.random_state = random_state
        # 初期値の設定方法 'gauss', 'xavier', 'he'
        self.initialize = initialize
        # 初期値をガウス分布で定める際の標準偏差
        self.sigma = sigma
        # 最適化手法 'sgd', 'adagrad'
        self.optimize = optimize
        
        # 出力クラス数
        self.n_output = None
        # 出力カテゴリの配列
        self.categories_ = None
        # 重みの情報リスト
        self.coefs_ = None
        self.intercepts_ = None
        # epoch毎に損失を記録
        self.losses = np.zeros(self.epoch)
        self.val_losses = None
        # fitの時にvalデータがあるフラグ
        self.val_flag = None    

    def fit(self, X_train, y_train, X_val=None, y_val=None):
        """
        ニューラルネットワーク分類器を学習する。
        
        Parameters
        ----------
        X_train : 次の形のndarray, shape (n_samples, n_features)
            訓練データの特徴量
        y_train : 次の形のndarray, shape (n_samples, )
            訓練データの正解値
        X_val : 次の形のndarray, shape (n_samples, n_features)
            検証データの特徴量
        y_val : 次の形のndarray, shape (n_samples, )
            検証データの正解値
        """
        # random_stateが設定されている場合はseed設定
        if type(self.random_state) == int:
            np.random.seed(self.random_state)
        
        # 変数情報
        self.n_samples, self.n_features = X_train.shape
        # valデータの確認
        if type(X_val) == np.ndarray and type(y_val) == np.ndarray:
            if X_val.shape == (y_val.shape[0], self.n_features):
                self.val_flag = True
                self.val_losses = np.zeros(self.epoch)
        
        # 目的変数をone_hot_encoding
        from sklearn.preprocessing import OneHotEncoder
        enc = OneHotEncoder(handle_unknown='ignore', sparse=False)
        y_train_one_hot = enc.fit_transform(y_train[:, np.newaxis])
        if self.val_flag:
            y_val_one_hot = enc.transform(y_val[:, np.newaxis])
        
        # 出力カテゴリの配列
        self.categories_ = enc.categories_[0]
        # 出力クラス数を特定
        self.n_output = len(self.categories_)
        
        # 初期化のインスタンス作成
        # initializeで指定がない時、sigmoid, tanhではxavier, reluではhe
        if self.initialize is None:
            if self.activation == 'relu':
                self.initialize = 'he'
            elif self.activation == 'sigmoid' or self.activation == 'tanh':
                self.initialize = 'xavier'
        # self.initializeに応じてinitializerのインスタンスを作成
        if self.initialize == 'he':
            initializer = HeInitializer()
        elif self.initialize == 'xavier':
            initializer = XavierInitializer()
        elif self.initialize == 'gauss':
            if self.sigma is None:
                self.sigma = 0.01
            initializer = SimpleInitializer(self.sigma)
        
        # 最適化のインスタンス作成
        if self.optimize == 'adagrad':
            optimizer = AdaGrad(self.lr)
        elif self.optimize == 'sgd':
            optimizer = SGD(self.lr)
        
        # 重みを初期化し、インスタンスをリストへ
        fc_lst = []
        row = self.n_features
        for n_nodes in list(self.hidden_layer_sizes) + [self.n_output]:
            fc = FC(row, n_nodes, initializer, optimizer)
            
            # self.optimize　が 'adagrad' の際、インスタンスへH_W, H_Bを持たせる
            if self.optimize == 'adagrad':
                fc.adagrad_initialize(row, n_nodes)
            
            fc_lst.append(fc)
            row = n_nodes
            
        # 活性化関数のインスタンス
        if self.activation == 'relu':
            self.act = ReLU()
        elif self.activation == 'sigmoid':
            self.act = Sigmoid()
        elif self.activation == 'tanh':
            self.act = Tanh()
        else:
            print('not proper activation name')
        
        # epoch　でループ
        for n_epoch in range(self.epoch):
            # 各々のバッチの損失の記録
            batches_losses = np.zeros(self.n_samples // self.batch_size, dtype=np.float64)
            if self.val_flag:
                batches_val_losses = np.zeros(self.n_samples // self.batch_size, dtype=np.float64)
            
            # バッチを取り出し、バッチでループ
            get_mini_batch = GetMiniBatch(X_train, y_train_one_hot, batch_size=self.batch_size)
            for mini_X_train, mini_y_train in get_mini_batch:
                
                # forward propagation
                # 全結合層の結果をリストで管理、ex [0, A_1, A_2]
                A_lst = [0]
                # 、活性化の結果をリストで管理、ex [X, Z_1, Z_2]
                Z_lst = [mini_X_train]
                Z = mini_X_train
                if self.val_flag:
                    Z_val = X_val
                
                # 各層をループ
                for i in range(len(self.hidden_layer_sizes)):
                    
                    A = fc_lst[i].forward(Z)
                    # actでZを計算
                    Z = self.act.forward(A)
                    
                    if self.val_flag:
                        A_val = fc_lst[i].forward(Z_val)
                        Z_val = self.act.forward(A_val)
                    
                    # A, Z　をdeepcopyしてリストへ保管　（deepcopy必要？）
                    A_lst.append(deepcopy(A))
                    Z_lst.append(deepcopy(Z))
                
                # 最後の出力層、forward_propagation
                # 最終層の活性化関数のインスタンス
                self.last_act = Softmax(mini_y_train)
                A = fc_lst[i+1].forward(Z)
                Z = self.last_act.forward(A)
                
                if self.val_flag:
                    last_act_val = Softmax(y_val_one_hot)
                    A_val = fc_lst[i+1].forward(Z_val)
                    Z_val = last_act_val.forward(A_val) 

                # 最後の出力層、back_propagation
                # 最終層のAをもとに、dA　と　損失を求める（損失は記録）
                dA, batches_losses[get_mini_batch._counter - 1] = self.last_act.backward(A)
                if self.val_flag:
                    _, batches_val_losses[get_mini_batch._counter - 1] = last_act_val.backward(A_val)
                
                # back propagation
                # fc, A, Zを逆順で取り出す
                for fc, A, Z in zip(fc_lst[::-1], A_lst[::-1], Z_lst[::-1]):
                    # dZ の勾配を求める、関数内でW、Bを更新
                    dZ = fc.backward(dA, Z)
                    # dAを求める、dZ　と　活性化関数のアダマール積、最後のdAはゼロになる
                    dA = dZ * self.act.backward(A)
  
            #　バッチのループが終わったら損失を合計し、記録
            self.losses[n_epoch] = batches_losses.sum()
            if self.val_flag:
                self.val_losses[n_epoch] = batches_val_losses.sum()
        
            #verboseをTrueにした際は学習過程などを出力する
            if self.verbose is True:
                print('epoch : {} finished'.format(n_epoch))
                print('train_loss : {}'.format(self.losses[n_epoch]))
                if self.val_flag:
                    print('val_loss : {}'.format(self.val_losses[n_epoch]))
        
        # 学習が終わったら重み、バイアスを変数へ記録
        self.coefs_ = [fc.W for fc in fc_lst]
        self.intercepts_ = [fc.B for fc in fc_lst]
        
    def predict(self, X):
        """
        ニューラルネットワーク分類器を使い推定する。

        Parameters
        ----------
        X : 次の形のndarray, shape (n_samples, n_features)
            サンプル

        Returns
        -------
            次の形のndarray, shape (n_samples, 1)
            推定結果
        """
        # 活性化関数の出力結果をpredで管理
        pred = X

        # 各層をループ
        for n_layer in range(len(self.hidden_layer_sizes)):
            A = np.dot(pred, self.coefs_[n_layer]) + self.intercepts_[n_layer]
            pred = self.act.forward(A)

        # 最後の出力層
        A = np.dot(pred, self.coefs_[-1]) + self.intercepts_[-1]
        # 最後の活性化関数
        pred = self.last_act.forward(A)

        # 列、横方向に最大のインデックスを取得し、出力カテゴリのself.categories_の値を返す
        return self.categories_[np.argmax(pred, axis=1)]

In [53]:
class SimpleInitializer:
    """
    ガウス分布によるシンプルな初期化
    Parameters
    ----------
    sigma : float
      ガウス分布の標準偏差
    """
    def __init__(self, sigma=0.01):
        self.sigma = sigma
    def W(self, input_channels, output_channels, strides):
        """
        重みの初期化
        Parameters
        ----------
        n_nodes1 : int
          前の層のノード数
        n_nodes2 : int
          後の層のノード数
        Returns
        ----------
        W :
        """
        W = self.sigma * np.random.randn(output_channels, input_channels, self.n_features-1)
        return W
    def B(self, output_channels):
        """
        バイアスの初期化
        Parameters
        ----------
        output_channels : int
          出力のチャンネル数
        Returns
        ----------
        B :
        """
        B = self.sigma * np.random.randn(output_channels)
        return B

In [54]:
class FC:
    """
    ノード数n_nodes1からn_nodes2への全結合層
    Parameters
    ----------
    input_channel : int
      前の層のノード数
    n_nodes2 : int
      後の層のノード数
    initializer : 初期化方法のインスタンス
    optimizer : 最適化手法のインスタンス
    """
    def __init__(self, n_nodes1, n_nodes2, initializer, optimizer):
        self.optimizer = optimizer
        # 初期化
        # initializerのメソッドを使い、self.Wとself.Bを初期化する
        self.W = initializer.W(n_nodes1, n_nodes2)
        self.B = initializer.B(n_nodes2)
    
    # optimazer がadagradの際に呼び出す
    def adagrad_initialize(self, n_nodes1, n_nodes2):
        self.H_W = np.ones(n_nodes1 * n_nodes2).reshape(n_nodes1, -1)
        self.H_B = np.ones(n_nodes2)

    def forward(self, X):
        """
        フォワード
        Parameters
        ----------
        X : 次の形のndarray, shape (batch_size, n_nodes1)
            入力
        Returns
        ----------
        A : 次の形のndarray, shape (batch_size, n_nodes2)
            出力
        """        
        A = np.dot(X, self.W) + self.B
        return A
    def backward(self, dA, Z):
        """
        バックワード
        Parameters
        ----------
        dA : 次の形のndarray, shape (batch_size, n_nodes2)
            後ろから流れてきた勾配
        Z : 次の形のndarray, shape (batch_size, n_nodes1)
            一層前の活性化関数の出力結果
        Returns
        ----------
        dZ : 次の形のndarray, shape (batch_size, n_nodes1)
            前に流す勾配
        """
        # dZを計算
        dZ = np.dot(dA, self.W.T)
        # 更新
        self = self.optimizer.update(self, dA, Z)
        return dZ