Sprintの目的
スクラッチを通してCNNの基礎を理解する

どのように学ぶか
スクラッチで1次元用畳み込みニューラルネットワークを実装した後、学習と検証を行なっていきます。



2.1次元の畳み込みニューラルネットワークスクラッチ

畳み込みニューラルネットワーク（CNN） のクラスをスクラッチで作成していきます。NumPyなど最低限のライブラリのみを使いアルゴリズムを実装していきます。


このSprintでは1次元の 畳み込み層 を作成し、畳み込みの基礎を理解することを目指します。次のSprintでは2次元畳み込み層とプーリング層を作成することで、一般的に画像に対して利用されるCNNを完成させます。


クラスの名前はScratch1dCNNClassifierとしてください。クラスの構造などは前のSprintで作成したScratchDeepNeuralNetrowkClassifierを参考にしてください。


1次元畳み込み層とは
CNNでは画像に対しての2次元畳み込み層が定番ですが、ここでは理解しやすくするためにまずは1次元畳み込み層を実装します。1次元畳み込みは実用上は自然言語や波形データなどの 系列データ で使われることが多いです。


畳み込みは任意の次元に対して考えることができ、立体データに対しての3次元畳み込みまではフレームワークで一般的に用意されています。


データセットの用意
検証には引き続きMNISTデータセットを使用します。1次元畳み込みでは全結合のニューラルネットワークと同様に平滑化されたものを入力します。

【問題1】チャンネル数を1に限定した1次元畳み込み層クラスの作成
チャンネル数を1に限定した1次元畳み込み層のクラスSimpleConv1dを作成してください。基本構造は前のSprintで作成した全結合層のFCクラスと同じになります。なお、重みの初期化に関するクラスは必要に応じて作り変えてください。Xavierの初期値などを使う点は全結合層と同様です。


ここでは パディング は考えず、ストライド も1に固定します。また、複数のデータを同時に処理することも考えなくて良く、バッチサイズは1のみに対応してください。この部分の拡張はアドバンス課題とします。


フォワードプロパゲーションの数式は以下のようになります。


$$
a_i = \sum_{s=0}^{F-1}x_{(i+s)}w_s+b
$$

$a_i$ : 出力される配列のi番目の値


$F$ : フィルタのサイズ


$x_{(i+s)}$ : 入力の配列の(i+s)番目の値


$w_s$ : 重みの配列のs番目の値


$b$ : バイアス項


すべてスカラーです。


次に更新式です。ここがAdaGradなどに置き換えられる点は全結合層と同様です。

$$
w_s^{\prime} = w_s - \alpha \frac{\partial L}{\partial w_s} \\
b^{\prime} = b - \alpha \frac{\partial L}{\partial b}
$$

$\alpha$ : 学習率


$\frac{\partial L}{\partial w_s}$ : $w_s$ に関する損失 $L$ の勾配


$\frac{\partial L}{\partial b}$ : $b$ に関する損失 $L$ の勾配


勾配 $\frac{\partial L}{\partial w_s}$ や $\frac{\partial L}{\partial b}$ を求めるためのバックプロパゲーションの数式が以下です。

$$
\frac{\partial L}{\partial w_s} = \sum_{i=0}^{N_{out}-1} \frac{\partial L}{\partial a_i}x_{(i+s)}\\
\frac{\partial L}{\partial b} = \sum_{i=0}^{N_{out}-1} \frac{\partial L}{\partial a_i}
$$

$\frac{\partial L}{\partial a_i}$ : 勾配の配列のi番目の値


$N_{out}$ : 出力のサイズ


前の層に流す誤差の数式は以下です。

$$
\frac{\partial L}{\partial x_j} = \sum_{s=0}^{F-1} \frac{\partial L}{\partial a_{(j-s)}}w_s
$$

$\frac{\partial L}{\partial x_j}$ : 前の層に流す誤差の配列のj番目の値


ただし、 $j-s<0$ または $j-s>N_{out}-1$ のとき $\frac{\partial L}{\partial a_{(j-s)}} =0$ です。


全結合層との大きな違いは、重みが複数の特徴量に対して共有されていることです。この場合は共有されている分の誤差をすべて足すことで勾配を求めます。計算グラフ上での分岐はバックプロパゲーションの際に誤差の足し算をすれば良いことになります。

In [1]:
## ライブラリのimport
import numpy as np
import math
from keras.datasets import mnist
from sklearn.preprocessing import OneHotEncoder
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
%matplotlib inline
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import train_test_split

# 学習に使用するその他クラスの定義

これまでのsprintで定義してきたものを流用します。

In [2]:
#活性化関数
class Sigmoid:
    
    def forward(self, A):
        self.A = A
        return self.sigmoid(A)
    
    def backward(self, dZ):
        _sig = self.sigmoid(self.A)
        return dZ * (1 - _sig)*_sig
    
    def sigmoid(self, X):
        return 1 / (1 + np.exp(-X))

#活性化関数
class Tanh:
    
    def forward(self, A):
        self.A = A
        return np.tanh(A)
    
    def backward(self, dZ):
        return dZ * (1 - (np.tanh(self.A))**2)

#活性化関数
class Softmax:
    
    def forward(self, X):
        self.Z = np.exp(X) / np.sum(np.exp(X), axis=1).reshape(-1,1)
        return self.Z
    
    def backward(self, Y):
        self.loss = self.loss_func(Y)
        return self.Z - Y
    
    def loss_func(self, Y, Z=None):
        if Z is None:
            Z = self.Z
        return (-1)*np.average(np.sum(Y*np.log(Z), axis=1))

#活性化関数
class ReLU:
    
    def forward(self, A):
        self.A = A
        return np.clip(A, 0, None)
    
    def backward(self, dZ):
        return dZ * np.clip(np.sign(self.A), 0, None)

#全結合層
class FC:

    def __init__(self, n_nodes1, n_nodes2, initializer, optimizer):
        self.optimizer = optimizer
        self.W = initializer.W(n_nodes1, n_nodes2)
        self.B = initializer.B(n_nodes2)
        
    def forward(self, X):
        self.X = X
        A = X@self.W + self.B
        return A
    
    def backward(self, dA):
        dZ = dA@self.W.T
        self.dB = np.sum(dA, axis=0)
        self.dW = self.X.T@dA
        self.optimizer.update(self)
        return dZ

#Xavier(ザビエル)の初期値
# ニューラルネットワークにおける重みの初期値は、学習の精度、速度に大きく影響する非常に重要な問題であり、勾配消失、アクティベーションの偏りなどが起こらないようにしなければいけない。
# そこで提案されたのが「Xaiverの初期値」である。
# Xavierの初期値によるアクティベーションは、広がった分布となり、高い表現力を有する。
# 活性化関数としてシグモイド関数などを用いるときはおすすめである。
# 「Xavierの初期値」は層のノードの数によって、作用させる係数を
# 変化させます。

# たとえば、前層から渡されるノード数がn個であるときには、標準偏差√nで割ってやります。
# つまり以下の値を作用させます。

# 1n‾√
# 1n
# つまり、初期値のバラツキについては、各層ごとにノードの数で均一化しているイメージになるかと思います。
class XavierInitializer:
    
    def W(self, n_nodes1, n_nodes2):
        self.sigma = math.sqrt(1 / n_nodes1)
        W = self.sigma * np.random.randn(n_nodes1, n_nodes2)
        return W
    
    def B(self, n_nodes2):
        B = self.sigma * np.random.randn(n_nodes2)
        return B


# Heの初期値
# Xavierの初期値と共によく使われる初期値として「Heの初期値」があります。
# 作用させる値はXavierと似ていますが、標準偏差√(n/2)で割ってやります。
# つまり以下の値を作用させます。

# 2n‾‾√
# 2n
# これまで、活性化関数はSigmoid関数を用いていました、Heの初期値は
# 活性化関数は、ReLU関数と一緒に使用します。

# ReLU関数は今まで触れておりませんでしたが、以下のようなシンプルな関数です。

# f(x)=max(0,x)
# f(x)=max(0,x)
# つまり、xが0以下の値のときは0, xが正の値の場合はそのままxの値をとる関数です。

class HeInitializer():
    
    def W(self, n_nodes1, n_nodes2):
        self.sigma = math.sqrt(2 / n_nodes1)
        W = self.sigma * np.random.randn(n_nodes1, n_nodes2)
        return W
    
    def B(self, n_nodes2):
        B = self.sigma * np.random.randn(n_nodes2)
        return B

#最適化手法
class SGD:

    def __init__(self, lr):
        self.lr = lr
    
    def update(self, layer):
        layer.W -= self.lr * layer.dW
        layer.B -= self.lr * layer.dB
        return

#最適化手法
class AdaGrad:
    
    def __init__(self, lr):
        self.lr = lr
        self.HW = 1
        self.HB = 1
    
    def update(self, layer):
        self.HW += layer.dW**2
        self.HB += layer.dB**2
        layer.W -= self.lr * np.sqrt(1/self.HW) * layer.dW
        layer.B -= self.lr * np.sqrt(1/self.HB) * layer.dB


#ミニバッチ（学習させるときにデータを分けて行う手法）
# numpyのshuffleとpermutationの違い
# numpy Python
# python - shuffle vs permute numpy - Stack Overflow

# numpyにはshuffle(x)とpermutation(x)というほぼ同じ機能の関数があります．
# どちらも，配列をランダムに並び替えますが，違いが2つあります．

# ひとつは，shuffle(x)は配列をin-placeで並び替えるが，permutation(x)は並び替えた配列のコピーを生成するという点です．つまり：
class GetMiniBatch:

    def __init__(self, X, y, batch_size = 20, seed=0):
        self.batch_size = batch_size
        np.random.seed(seed)
        shuffle_index = np.random.permutation(np.arange(X.shape[0]))
        self._X = X[shuffle_index]
        self._y = y[shuffle_index]
        self._stop = np.ceil(X.shape[0]/self.batch_size).astype(np.int)
        
    def __len__(self):
        return self._stop
    
    def __getitem__(self,item):
        p0 = item*self.batch_size
        p1 = item*self.batch_size + self.batch_size
        return self._X[p0:p1], self._y[p0:p1] 
    
    def __iter__(self):
        self._counter = 0
        return self
    
    def __next__(self):
        if self._counter >= self._stop:
            raise StopIteration()
        p0 = self._counter*self.batch_size
        p1 = self._counter*self.batch_size + self.batch_size
        self._counter += 1
        return self._X[p0:p1], self._y[p0:p1]

#重みとバイアスの初期化
# 可変長引数
# Python の関数では引数の個数をいくつでも
# 受け取ることができる関数を定義することができます。
# 可変長の引数を定義するためには可変長引数の前に
# アスタリスク * を付けます。
class SimpleInitializer:

    def __init__(self, sigma):
        self.sigma = sigma
        
    def W(self, *shape):
        W = self.sigma * np.random.randn(*shape) 
        #引数*shapeは、可変長引数である。よって、Wは、W１、W2、W3・・・とか名前つけなくても、自動でつけられる
        return W
    
    def B(self, *shape):
        B = self.sigma * np.random.randn(*shape)
        return B

# #重みとバイアスの初期化
# class SimpleInitializer:

#     def __init__(self, sigma):
#         self.sigma = sigma
        
#     def W(self, shape):
#         W = self.sigma * np.random.randn(shape)
#         return W
    
#     def B(self, shape):
#         B = self.sigma * np.random.randn(shape)
#         return B





# class functions():
#     # 中間層の活性化関数
#     # シグモイド関数（ロジスティック関数）
#     def sigmoid(self, x):
#         return 1/(1 + np.exp(-x))

#     # ReLU関数
#     def relu(self, x):
#         return np.maximum(0, x)

#     # ステップ関数（閾値0）
#     def step_function(self, x):
#         return np.where( x > 0, 1, 0) 

#     # 出力層の活性化関数
#     # ソフトマックス関数
#     def softmax(self, x):
#         if x.ndim == 2:
#             x = x.T
#             x = x - np.max(x, axis=0)
#             y = np.exp(x) / np.sum(np.exp(x), axis=0)
#             return y.T

#         x = x - np.max(x) # オーバーフロー対策
#         return np.exp(x) / np.sum(np.exp(x))

#     # ソフトマックスとクロスエントロピーの複合関数
#     def softmax_with_loss(self, d, x):
#         y = softmax(x)
#         return cross_entropy_error(d, y)

#     # 誤差関数
#     # 最小二乗法
#     def least_square(self, d, y):
#         return np.sum(np.square(d - y)) / 2

#     # クロスエントロピー
#     def cross_entropy_error(self, d, y):
#         if y.ndim == 1:
#             d = d.reshape(1, d.size)
#             y = y.reshape(1, y.size)

#         # 教師データがone-hot-vectorの場合、正解ラベルのインデックスに変換
#         if d.size == y.size:
#             d = d.argmax(axis=1)

#         batch_size = y.shape[0]
#         return -np.sum(np.log(y[np.arange(batch_size), d] + 1e-7)) / batch_size



#     # 活性化関数の導関数
#     # シグモイド関数（ロジスティック関数）の導関数
#     def d_sigmoid(self, x):
#         dx = (1.0 - self.sigmoid(x)) * self.sigmoid(x)
#         return dx

#     # ReLU関数の導関数
#     def d_relu(self, x):
#         return np.where( x > 0, 1, 0)

#     # ステップ関数の導関数
#     def d_step_function(self, x):
#         return 0

#     # 最小二乗法の導関数
#     def d_least_square(d, y):
#         return y - d


#     # ソフトマックスとクロスエントロピーの複合導関数
#     def d_softmax_with_loss(self, d, y):
#         batch_size = d.shape[0]
#         if d.size == y.size: # 教師データがone-hot-vectorの場合
#             dx = (y - d) / batch_size
#         else:
#             dx = y.copy()
#             dx[np.arange(batch_size), d] -= 1
#             dx = dx / batch_size
#         return dx

#     # シグモイドとクロスエントロピーの複合導関数
#     def d_sigmoid_with_loss(self, d, y):
#         return y - d

#     # 数値微分
#     def numerical_gradient(self, f, x):
#         h = 1e-4
#         grad = np.zeros_like(x)

#         for idx in range(x.size):
#             tmp_val = x[idx]
#             # f(x + h)の計算
#             x[idx] = tmp_val + h
#             fxh1 = f(x)

#             # f(x - h)の計算
#             x[idx] = tmp_val - h
#             fxh2 = f(x)

#             grad[idx] = (fxh1 - fxh2) / (2 * h)
#             # 値を元に戻す
#             x[idx] = tmp_val

#         return grad

In [3]:
class SimpleConv1d():
    """畳み込みクラス
    """
    def forward(self, x, w, b):
        """順伝播
        Parameters
        -----------
        x : 入力配列
        w : 重み
        b : バイアス
        """
        
        # 返り値入力配列
        a = []
        # 1づつずらしながら畳み込み計算
        for i in range(len(w) - 1):
            a.append((x[i:i+len(w)] @ w) + b[0])
            
        return np.array(a)
    
    def backward(self, x, w, da):
        """逆伝播
        x : 入力配列
        w : 重み
        da : 逆伝播の値　⇨順伝播の値でないのか？
        """
        # バイアスの勾配
        db = np.sum(da)
        
        # 重みの勾配
        dw = []
        for i in range(len(w)):
            dw.append(da @ x[i:i+len(da)])
        dw = np.array(dw)
        
        # 逆伝播の値
        dx = []
        # 逆畳込み計算用配列
        new_w = np.insert(w[::-1], 0, 0) #w[::-1]:範囲を全指定にしてマイナスステップにすると、要素がリバースする。
        #a = [1, 2, 3, 4, 5, 6, 7, 8, 9]  a[::-1] [9, 8, 7, 6, 5, 4, 3, 2, 1]
        new_w = np.append(new_w, 0)
        for i in range(len(new_w)-1):
            dx.append(new_w[i:i+len(da)] @ da)
        dx = np.array(dx[::-1])
        return db, dw, dx

【問題2】1次元畳み込み後の出力サイズの計算
畳み込みを行うと特徴量の数が変化します。どのように変化するかは以下の数式から求められます。パディングやストライドも含めています。この計算を行う関数を作成してください。

$$
N_{out} =  \frac{N_{in}+2P-F}{S} + 1\\
$$

$N_{out}$ : 出力のサイズ（特徴量の数）


$N_{in}$ : 入力のサイズ（特徴量の数）


$P$ : ある方向へのパディングの数


$F$ : フィルタのサイズ


$S$ : ストライドのサイズ



In [4]:
def output_size_calculation(n_in, F, P=0, S=1):
    """出力サイズ計算
    n_in : 入力サイズ
    F : フィルターサイズ
    P : パッディング数
    S : ストライド数
    """
    # 出力サイズの計算
    n_out = int((n_in + 2*P - F) / S + 1)
    
    return n_out

【問題3】小さな配列での1次元畳み込み層の実験
次に示す小さな配列でフォワードプロパゲーションとバックプロパゲーションが正しく行えているか確認してください。

$$
 ---------------
$$
入力x、重みw、バイアスbを次のようにします。


x = np.array([1,2,3,4])

w = np.array([3, 5, 7])

b = np.array([1])

$$
 ---------------
$$

フォワードプロパゲーションをすると出力は次のようになります。


a = np.array([35, 50])

次にバックプロパゲーションを考えます。誤差は次のようであったとします。



delta_a = np.array([10, 20])

バックプロパゲーションをすると次のような値になります。

delta_b = np.array([30])

delta_w = np.array([50, 80, 110])

delta_x = np.array([30, 110, 170, 140])


$$
 ---------------
$$




実装上の工夫
畳み込みを実装する場合は、まずはfor文を重ねていく形で構いません。しかし、できるだけ計算は効率化させたいため、以下の式を一度に計算する方法を考えることにします。

$$
a_i = \sum_{s=0}^{F-1}x_{(i+s)}w_s+b
$$

バイアス項は単純な足し算のため、重みの部分を見ます。

$$
\sum_{s=0}^{F-1}x_{(i+s)}w_s
$$

これは、xの一部を取り出した配列とwの配列の内積です。具体的な状況を考えると、以下のようなコードで計算できます。この例では流れを分かりやすくするために、各要素同士でアダマール積を計算してから合計を計算しています。これは結果的に内積と同様です。


$$
 ---------------
$$

x = np.array([1, 2, 3, 4])

w = np.array([3, 5, 7])

a = np.empty((2, 3))

indexes0 = np.array([0, 1, 2]).astype(np.int)

indexes1 = np.array([1, 2, 3]).astype(np.int)

a[0] = x[indexes0]*w # x[indexes0]は([1, 2, 3])である

a[1] = x[indexes1]*w # x[indexes1]は([2, 3, 4])である

a = a.sum(axis=1)

$$
 ---------------
$$
ndarrayは配列を使ったインデックス指定ができることを利用した方法です。


また、二次元配列を使えば一次元配列から二次元配列が取り出せます。


$$
 ---------------
$$
x = np.array([1, 2, 3, 4])

indexes = np.array([[0, 1, 2], [1, 2, 3]]).astype(np.int)

print(x[indexes]) # ([[1, 2, 3], [2, 3, 4]])
$$
 ---------------
$$

このこととブロードキャストなどをうまく組み合わせることで、一度にまとめて計算することも可能です。


畳み込みの計算方法に正解はないので、自分なりに効率化していってください。


《参考》


以下のページのInteger array indexingの部分がこの方法についての記述です。


Indexing — NumPy v1.17 Manual



In [5]:
# 問題3　小さな配列での1次元畳み込み層の実験
# テストデータ
x = np.array([1,2,3,4])
w = np.array([3, 5, 7])
b = np.array([1])
da = np.array([10, 20])

In [6]:
# インスタンス化
simple_conv_1d = SimpleConv1d()

In [7]:
# 順伝播
simple_conv_1d.forward(x, w, b)

array([35, 50])

In [8]:
# 逆伝播
db, dw, dx = simple_conv_1d.backward(x, w, da)
print(db)
print(dw)
print(dx)

30
[ 50  80 110]
[ 30 110 170 140]


【問題4】チャンネル数を限定しない1次元畳み込み層クラスの作成
チャンネル数を1に限定しない1次元畳み込み層のクラスConv1dを作成してください。


例えば以下のようなx, w, bがあった場合は、


x = np.array([[1, 2, 3, 4], [2, 3, 4, 5]]) # shape(2, 4)で、（入力チャンネル数、特徴量数）である。
w = np.ones((3, 2, 3)) # 例の簡略化のため全て1とする。(出力チャンネル数、入力チャンネル数、フィルタサイズ)である。
b = np.array([1, 2, 3]) # （出力チャンネル数）

出力は次のようになります。

a = np.array([[16, 22], [17, 23], [18, 24]]) # shape(3, 2)で、（出力チャンネル数、特徴量数）である。

入力が2チャンネル、出力が3チャンネルの例です。計算グラフを書いた上で、バックプロパゲーションも手計算で考えてみましょう。計算グラフの中には和と積しか登場しないので、微分を新たに考える必要はありません。


《補足》


チャンネル数を加える場合、配列をどういう順番にするかという問題があります。(バッチサイズ、チャンネル数、特徴量数)または(バッチサイズ、特徴量数、チャンネル数)が一般的で、ライブラリによって順番は異なっています。（切り替えて使用できるものもあります）


今回のスクラッチでは自身の実装上どちらが効率的かを考えて選んでください。上記の例ではバッチサイズは考えておらず、(チャンネル数、特徴量数)です。



In [9]:
class Conv1d:
    """畳み込みクラス
    """
    def __init__(self, b_size, initializer, optimizer, n_in_channels=1, n_out_channels=1, pa=0):
        """コンストラクタ
        b_size : フィルターサイズ
        initializer : 初期化クラス
        optimizer : 最適化手法クラス
        n_in_channels : 入力チャンネル数
        n_out_channels : 出力チャンネル数
        pa : パディング数
        """
        self.b_size = b_size
        self.optimizer = optimizer
        self.pa = pa
        # 重みの初期化
        self.W = initializer.W(n_out_channels, n_in_channels, b_size)
        # バイアスの初期化
        self.B = initializer.B(n_out_channels)
        self.n_in_channels = n_in_channels
        self.n_out_channels = n_out_channels
        self.n_out = None
        
    def forward(self, X):
        """順伝播
        X : 入力配列
        """
        # 入力配列の特徴量数
        self.n_in = X.shape[-1]
        # 出力サイズ
        self.n_out = output_size_calculation(self.n_in, self.b_size, self.pa)
        # 計算のために逆転させる
        X = X.reshape(self.n_in_channels, self.n_in)
        # 0埋め実施
        self.X = np.pad(X, ((0,0), ((self.b_size-1), 0)))
        # 出力配列（A）の計算のためゼロ配列X1を用意する
        self.X1 = np.zeros((self.n_in_channels, self.b_size, self.n_in+(self.b_size-1)))
        # 重みの長さでループ
        for i in range(self.b_size):
            # ずらしながら上書き
            self.X1[:, i] = np.roll(self.X, -i, axis=-1)
        # 重みとバイアスを考慮して計算
        A = np.sum(self.X1[:, :, self.b_size-1-self.pa:self.n_in+self.pa]*self.W[:, :, :, np.newaxis], axis=(1, 2)) + self.B.reshape(-1,1)
        return A
    
    def backward(self, dA):
        """逆伝播
        dA : 逆伝播してきた配列
        """
        # 重みの勾配
        self.dW = np.sum(np.dot(dA, self.X1[:, :, self.b_size-1-self.pa:self.n_in+self.pa, np.newaxis]), axis=-1)
        # バイアスの勾配
        self.dB = np.sum(dA, axis=1)
        # 逆伝播の値計算のためにdAを変形
        self.dA = np.pad(dA, ((0,0), (0, (self.b_size-1))))
        # 出力配列（dX）の計算のためゼロ配列dA1を用意する
        self.dA1 = np.zeros((self.n_out_channels, self.b_size, self.dA.shape[-1]))
        # 重みの長さでループ
        for i in range(self.b_size):
            self.dA1[:, i] = np.roll(self.dA, i, axis=-1)
        dX = np.sum(self.W@self.dA1, axis=0)
        # 重みとバイアスの更新
        self.optimizer.update(self)
        return dX

In [10]:
# インスタンス化
conc_1d = Conv1d(b_size=3, initializer=SimpleInitializer(0.01), optimizer=SGD(0.01), n_in_channels=2, n_out_channels=3, pa=0)

In [11]:
x = np.array([[1, 2, 3, 4], [2, 3, 4, 5]])
# コンストラクタ内で初期化しているけど確認のため再代入
conc_1d.W = np.ones((3, 2, 3), dtype=float)
conc_1d.B = np.array([1, 2, 3], dtype=float)

In [12]:
conc_1d.forward(x)

array([[16., 22.],
       [17., 23.],
       [18., 24.]])

【問題8】学習と推定
これまで使ってきたニューラルネットワークの全結合層の一部をConv1dに置き換えてMNISTを学習・推定し、Accuracyを計算してください。


出力層だけは全結合層をそのまま使ってください。ただし、チャンネルが複数ある状態では全結合層への入力は行えません。その段階でのチャンネルは1になるようにするか、 平滑化 を行なってください。


画像に対しての1次元畳み込みは実用上は行わないことのため、精度は問いません。

In [13]:
# データ読み込み
(X_train, y_train), (X_test, y_test) = mnist.load_data()

# 画像データ→行データに
X_train = X_train.reshape(-1, 784)
X_test = X_test.reshape(-1, 784)

# 正規化
X_train = X_train.astype(np.float)
X_test = X_test.astype(np.float)
X_train /= 255
X_test /= 255

# onehotベクトル化
enc = OneHotEncoder(handle_unknown='ignore', sparse=False)
y_train_one_hot = enc.fit_transform(y_train[:, np.newaxis])
y_test_one_hot = enc.transform(y_test[:, np.newaxis])

# 訓練データと評価データに
X_train_, X_val, y_train_, y_val = train_test_split(X_train, y_train_one_hot, test_size=0.2)

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz


In [14]:
class Conv1d_Arbitrary_Strides:
    """畳み込みクラス
    """
    def __init__(self, b_size, initializer, optimizer, n_in_channels=1, n_out_channels=1, pa=0, stride=1):
        """コンストラクタ
        b_size : フィルターサイズ
        initializer : 初期化クラス
        optimizer : 最適化手法クラス
        n_in_channels : 入力チャンネル数
        n_out_channels : 出力チャンネル数
        pa : パディング数
        stride : ストライド数
        """
        self.b_size = b_size
        self.optimizer = optimizer
        self.pa = pa
        self.stride = stride
        # 重みの初期化
        self.W = initializer.W(n_out_channels, n_in_channels, b_size)
        # バイアスの初期化
        self.B = initializer.B(n_out_channels)
        self.n_in_channels = n_in_channels
        self.n_out_channels = n_out_channels
        self.n_out = None
        
    def forward(self, X):
        """順伝播
        X : 入力配列
        """
        # バッチ数
        self.n_samples = X.shape[0]
        # 入力配列の特徴量数
        self.n_in = X.shape[-1]
        # 出力サイズ
        self.n_out = output_size_calculation(self.n_in, self.b_size, self.pa, self.stride)
        # 計算のためにX変形
        X = X.reshape(self.n_samples, self.n_in_channels, self.n_in)
        # 0埋め実施
        self.X = np.pad(X, ((0,0), (0,0), ((self.b_size-1), 0)))
        # 出力配列（A）の計算のためゼロ配列X1を用意する
        self.X1 = np.zeros((self.n_samples, self.n_in_channels, self.b_size, self.n_in+(self.b_size-1)))
        # 重みの長さでループ
        for i in range(self.b_size):
            # ずらしながら上書き
            self.X1[:, :, i] = np.roll(self.X, -i, axis=-1)
        # 重みとバイアスを考慮して計算
        A = np.sum(self.X1[:, np.newaxis, :, :, self.b_size-1-self.pa:self.n_in+self.pa:self.stride]*self.W[:, :, :, np.newaxis], axis=(2, 3)) + self.B.reshape(-1,1)
        return A
    
    def backward(self, dA):
        """逆伝播
        dA : 逆伝播してきた配列
        """
        # 重みの勾配
        self.dW = np.sum(dA[:, :, np.newaxis, np.newaxis]*self.X1[:, np.newaxis, :, :, self.b_size-1-self.pa:self.n_in+self.pa:self.stride], axis=(0, -1))
        # バイアスの勾配
        self.dB = np.sum(dA, axis=(0, -1))
        # 逆伝播の値計算のためにdAを変形
        self.dA = np.pad(dA, ((0,0), (0,0), (0, (self.b_size-1))))
        # 出力配列（dX）の計算のためゼロ配列dA1を用意する
        self.dA1 = np.zeros((self.n_samples, self.n_out_channels, self.b_size, self.dA.shape[-1]))
        # 重みの長さでループ
        for i in range(self.b_size):
            self.dA1[:, :, i] = np.roll(self.dA, i, axis=-1)
        dX = np.sum(self.W[:, :, :, np.newaxis]*self.dA1[:, :, np.newaxis], axis=(1,3))
        # 重みとバイアスの更新
        self.optimizer.update(self)
        return dX

In [15]:
class ScratchCNNClassifier:
    
    def __init__(self, num_epoch=10, lr=0.01, batch_size=20, n_features=784, n_nodes1=400, n_nodes2=200, n_output=10, verbose=True, Activater=Tanh, Optimizer=AdaGrad):
        """コンストラクタ
        Parameters
        -----------
        num_epoch : 学習回数
        lr : 学習率
        batch_size : バッチサイズ
        n_features : 特徴量数
        n_nodes1 : 1層目のノード数
        n_nodes2 : 2層目のノード数
        n_output : 出力層の数
        verbose : 仮定出力するか否か
        Activater : 活性化関数
        Optimizer : 最適化手法
        """
        self.num_epoch = num_epoch
        self.lr = lr
        self.verbose = verbose  
        self.batch_size = batch_size 
        self.n_features = n_features 
        self.n_nodes2 = n_nodes2 
        self.n_output = n_output 
        self.Activater = Activater
        if Activater == Sigmoid or Activater == Tanh:
            self.Initializer = XavierInitializer
        elif Activater == ReLU:
            self.Initializer = HeInitializer
        self.Optimizer = Optimizer
    
    def fit(self, X, y, X_val=None, y_val=None):
        """学習
        Parameters
        ----------
        X : 訓練データの説明変数
        y : 訓練データの目的変数
        X_val : 評価データの説明変数
        y_val : 評価データの目的変数
        """        
        # レイヤー初期化
        self.Conv1d_Arbitrary_Strides = Conv1d_Arbitrary_Strides(b_size=7, initializer=SimpleInitializer(0.01), optimizer=self.Optimizer(self.lr), n_in_channels=1, n_out_channels=1, pa=3, stride=2)
        self.Conv1d_Arbitrary_Strides.n_out = output_size_calculation(X.shape[-1], self.Conv1d_Arbitrary_Strides.b_size, self.Conv1d_Arbitrary_Strides.pa, self.Conv1d_Arbitrary_Strides.stride)
        self.activation1 = self.Activater()
        self.FC2 = FC(1*self.Conv1d_Arbitrary_Strides.n_out, self.n_nodes2, self.Initializer(), self.Optimizer(self.lr))
        self.activation2 = self.Activater()
        self.FC3 = FC(self.n_nodes2, self.n_output, self.Initializer(), self.Optimizer(self.lr))
        self.activation3 = Softmax()
        
        # loss配列定義と初期値格納（loss:ミニバッチごとの損失格納  loss_epoch:ミニバッチ学習終了後の全体損失）
        self.loss = []
        self.loss_epoch = [self.activation3.loss_func(y, self.forward_propagation(X))]
        
        # 学習回数分ループ
        for _ in range(self.num_epoch):
            # ミニバッチイテレータ生成
            get_mini_batch = GetMiniBatch(X, y, batch_size=self.batch_size)
            # イテレータ呼び出し
            for mini_X, mini_y in get_mini_batch:
                # 順伝播
                self.forward_propagation(mini_X)
                # 逆伝播
                self.back_propagation(mini_X, mini_y)
                # 損失記録
                self.loss.append(self.activation3.loss)
            # 損失記録
            self.loss_epoch.append(self.activation3.loss_func(y, self.forward_propagation(X)))
    
    def predict(self, X):
        """予測値出力
        Parameters
        ----------
        X : 説明変数
        """
        return np.argmax(self.forward_propagation(X), axis=1)
    
    def forward_propagation(self, X):
        """順伝播
        Parameters
        ----------
        X : 訓練データの説明変数
        """
        A1 = self.Conv1d_Arbitrary_Strides.forward(X)
        A1 = A1.reshape(A1.shape[0], A1.shape[-1])
        Z1 = self.activation1.forward(A1)
        A2 = self.FC2.forward(Z1)
        Z2 = self.activation2.forward(A2)
        A3 = self.FC3.forward(Z2)
        Z3 = self.activation3.forward(A3)
        return Z3
        
    def back_propagation(self, X, y_true):
        """逆伝播
        Parameters
        ----------
        X : 訓練データの説明変数
        y_true : 正解データ
        """
        dA3 = self.activation3.backward(y_true) 
        dZ2 = self.FC3.backward(dA3)
        dA2 = self.activation2.backward(dZ2)
        dZ1 = self.FC2.backward(dA2)
        dA1 = self.activation1.backward(dZ1)
        dA1 = dA1[:, np.newaxis]
        dZ0 = self.Conv1d_Arbitrary_Strides.backward(dA1) 

In [16]:
cnn = ScratchCNNClassifier(num_epoch=20, lr=0.01, batch_size=20, n_features=784, n_nodes1=400, n_nodes2=400, n_output=10, verbose=True, Activater=Tanh, Optimizer=SGD)
cnn.fit(X_train_[:1000], y_train_[:1000])

In [17]:
y_pred = cnn.predict(X_test)
accuracy_score(y_test, y_pred)

0.8934