### 前準備

In [45]:
import numpy as np
from IPython.display import display
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score
import matplotlib.pyplot as plt
from keras.datasets import mnist
from sklearn.model_selection import train_test_split

#
# 活性化関数 activator
#

# 全結合層 (Fully Connected Layer, Affine層, Dense層)
class FC:
    def __init__(self, n_nodes1, n_nodes2, initializer, optimizer):
        self.optimizer = optimizer
        self.W = initializer.W(n_nodes1, n_nodes2) # 行列 shape(n_nodes1, n_nodes2)
        self.B = initializer.B(n_nodes2)           # ベクトル shape(1, n_nodes2)
        self.Z = None
        self.hW = 0
        self.hB = 0

    def forward(self, X):
        self.Z = X  # backwardで使う
        return X @ self.W + self.B

    def backward(self, d_a):
        self.dB = np.sum(d_a, axis=0) # なぜsumになるんだ
        self.dW = self.Z.T @ d_a      # おそらく微分値がこうなるんだと思う
        self.dZ = d_a @ self.W.T      # おそらく微分値がこうなるんだと思う
        self.optimizer.update(self)   # 自身のW/Bをoptimizerにより更新
        return self.dZ

class Sigmoid:
    def forward(self, A):
        self.A = A
        return 1 / (1 + np.exp(-A))

    def backward(self, dZ):
        A = self.A
        c = 1 / (1 + np.exp(-A))
        d = (1 / (1 + np.exp(-A)))**2
        return dZ * (c - d)

class Tanh:
    def forward(self, A):
        self.A = A
        return np.tanh(A)

    def backward(self, dZ):
        return dZ * (1 - np.tanh(self.A)**2)

class ReLU:
    def forward(self, A):
        self.A = A
        return np.maximum(0, A) # 0とxを比較して大きい方の数値を返す

    def backward(self, dZ):
        return dZ * np.where(self.A > 0, 1, 0)

# 更新アルゴリズム optimizer
class SGD:
    def __init__(self, lr):
        self.lr = lr

    def update(self, layer):
        layer.W -= self.lr * layer.dW
        layer.B -= self.lr * layer.dB

class AdaGrad:
    def __init__(self, lr):
        self.lr = lr

    def update(self, layer):
        layer.hW += layer.dW * layer.dW # layer側に代入している理由: AdaGradはほかのFCオブジェクトで使い回されるため、FC側に保持する必要がある
        layer.hB += layer.dB * layer.dB
        layer.W -= self.lr * layer.dW / (np.sqrt(layer.hW) + 1e-7)
        layer.B -= self.lr * layer.dB / (np.sqrt(layer.hB) + 1e-7) # Note. / len(layer.Z)

# 初期値 initializer
class SimpleInitializer:
    def __init__(self, sigma):
        self.sigma = sigma

    def W(self, n_nodes1, n_nodes2, b_size):
        return self.sigma * np.random.randn(n_nodes1, n_nodes2, b_size)

    def B(self, n_nodes2):
        return self.sigma * np.random.randn(1, n_nodes2)

class XavierInitializer:
    # sigmaはinterfaceを合わせるためだけに実装
    def __init__(self, sigma):
        self.s = None

    def W(self, n_nodes1, n_nodes2):
        self.s = np.sqrt(1 / n_nodes1)
        return self.s * np.random.randn(n_nodes1, n_nodes2)

    def B(self, n_nodes2):
        return self.s * np.random.randn(1, n_nodes2)

class HeInitializer:
    # sigmaはinterfaceを合わせるためだけに実装
    def __init__(self, sigma):
        self.s = None

    def W(self, n_nodes1, n_nodes2):
        # display("n_nodes1: ", n_nodes1)
        # display("n_nodes2: ", n_nodes2)

        self.s = np.sqrt(2 / n_nodes1)
        # display("self.s: ", self.s)
        w = self.s * np.random.randn(n_nodes1, n_nodes2)
        # display("w: ", w)
        return w

    def B(self, n_nodes2):
        return self.s * np.random.randn(1, n_nodes2)

# ミニバッチ
class GetMiniBatch:
    """
    ミニバッチを取得するイテレータ
    Parameters
    ----------
    X : 次の形のndarray, shape (n_samples, n_features)
      訓練データ
    y : 次の形のndarray, shape (n_samples, 1)
      正解値
    batch_size : int
      バッチサイズ
    seed : int
      NumPyの乱数のシード
    """
    def __init__(self, X, y, batch_size = 20, seed=0):
        self.batch_size = batch_size
        np.random.seed(seed)
        shuffle_index = np.random.permutation(np.arange(X.shape[0]))
        self._X = X[shuffle_index]
        self._y = y[shuffle_index]
        self._stop = np.ceil(X.shape[0]/self.batch_size).astype(np.int)
    def __len__(self):
        return self._stop
    def __getitem__(self,item):
        p0 = item*self.batch_size
        p1 = item*self.batch_size + self.batch_size
        return self._X[p0:p1], self._y[p0:p1]
    def __iter__(self):
        self._counter = 0
        return self
    def __next__(self):
        if self._counter >= self._stop:
            raise StopIteration()
        p0 = self._counter*self.batch_size
        p1 = self._counter*self.batch_size + self.batch_size
        self._counter += 1
        return self._X[p0:p1], self._y[p0:p1]

In [8]:
def im2col(input_data, filter_h, filter_w, stride=1, pad=0):
    """

    Parameters
    ----------
    input_data : (データ数, チャンネル, 高さ, 幅)の4次元配列からなる入力データ
    filter_h : フィルターの高さ
    filter_w : フィルターの幅
    stride : ストライド
    pad : パディング

    Returns
    -------
    col : 2次元配列
    """
    N, C, H, W = input_data.shape
    out_h = (H + 2*pad - filter_h)//stride + 1
    out_w = (W + 2*pad - filter_w)//stride + 1

    img = np.pad(input_data, [(0,0), (0,0), (pad, pad), (pad, pad)], 'constant')
    col = np.zeros((N, C, filter_h, filter_w, out_h, out_w))

    for y in range(filter_h):
        y_max = y + stride*out_h
        for x in range(filter_w):
            x_max = x + stride*out_w
            col[:, :, y, x, :, :] = img[:, :, y:y_max:stride, x:x_max:stride]

    col = col.transpose(0, 4, 5, 1, 2, 3).reshape(N*out_h*out_w, -1)
    return col

def col2im(col, input_shape, filter_h, filter_w, stride=1, pad=0):
    """

    Parameters
    ----------
    col :
    input_shape : 入力データの形状（例：(10, 1, 28, 28)）
    filter_h :
    filter_w
    stride
    pad

    Returns
    -------

    """
    N, C, H, W = input_shape
    out_h = (H + 2*pad - filter_h)//stride + 1
    out_w = (W + 2*pad - filter_w)//stride + 1
    col = col.reshape(N, out_h, out_w, C, filter_h, filter_w).transpose(0, 3, 4, 5, 1, 2)

    img = np.zeros((N, C, H + 2*pad + stride - 1, W + 2*pad + stride - 1))
    for y in range(filter_h):
        y_max = y + stride*out_h
        for x in range(filter_w):
            x_max = x + stride*out_w
            img[:, :, y:y_max:stride, x:x_max:stride] += col[:, :, y, x, :, :]

    return img[:, :, pad:H + pad, pad:W + pad]

# 活性化関数 activator
class Convolution:
    def __init__(self, W, b, stride=1, pad=0):
        self.W = W              # 重み
        self.b = b              # バイアス
        self.stride = stride    # ストライド
        self.pad = pad          # ?

        # 中間データ（backward時に使用）
        self.x = None
        self.col = None
        self.col_W = None

        # 重み・バイアスパラメータの勾配
        self.dW = None
        self.db = None

    def forward(self, x):
        FN, C, FH, FW = self.W.shape
        N, C, H, W = x.shape
        out_h = 1 + int((H + 2*self.pad - FH) / self.stride)
        out_w = 1 + int((W + 2*self.pad - FW) / self.stride)

        col = im2col(x, FH, FW, self.stride, self.pad)
        col_W = self.W.reshape(FN, -1).T

        out = np.dot(col, col_W) + self.b
        out = out.reshape(N, out_h, out_w, -1).transpose(0, 3, 1, 2)

        self.x = x
        self.col = col
        self.col_W = col_W

        return out

    def backward(self, dout):
        FN, C, FH, FW = self.W.shape
        dout = dout.transpose(0,2,3,1).reshape(-1, FN)

        self.db = np.sum(dout, axis=0)
        self.dW = np.dot(self.col.T, dout)
        self.dW = self.dW.transpose(1, 0).reshape(FN, C, FH, FW)

        dcol = np.dot(dout, self.col_W.T)
        dx = col2im(dcol, self.x.shape, FH, FW, self.stride, self.pad)

        return dx

# 活性化関数 activator
class Pooling:
    def __init__(self, pool_h, pool_w, stride=2, pad=0):
        self.pool_h = pool_h
        self.pool_w = pool_w
        self.stride = stride
        self.pad = pad

        self.x = None
        self.arg_max = None

    def forward(self, x):
        N, C, H, W = x.shape
        out_h = int(1 + (H - self.pool_h) / self.stride)
        out_w = int(1 + (W - self.pool_w) / self.stride)

        col = im2col(x, self.pool_h, self.pool_w, self.stride, self.pad)
        col = col.reshape(-1, self.pool_h*self.pool_w)

        arg_max = np.argmax(col, axis=1)
        out = np.max(col, axis=1)
        out = out.reshape(N, out_h, out_w, C).transpose(0, 3, 1, 2)

        self.x = x
        self.arg_max = arg_max

        return out

    def backward(self, dout):
        dout = dout.transpose(0, 2, 3, 1)

        pool_size = self.pool_h * self.pool_w
        dmax = np.zeros((dout.size, pool_size))
        dmax[np.arange(self.arg_max.size), self.arg_max.flatten()] = dout.flatten()
        dmax = dmax.reshape(dout.shape + (pool_size,))

        dcol = dmax.reshape(dmax.shape[0] * dmax.shape[1] * dmax.shape[2], -1)
        dx = col2im(dcol, self.x.shape, self.pool_h, self.pool_w, self.stride, self.pad)

        return dx

### 【問題1】チャンネル数を1に限定した1次元畳み込み層クラスの作成

In [9]:
# 活性化関数 activator
# パディングは考えず、ストライドも1固定
# 重みが複数の特徴量に対して共有されている
class SimpleConv1d:
    def forward(self, x, w, b):
        # padding = None
        stride = 1
        a = []
        for i in range(len(w) - 1):
            tmp_x = x[i:i+len(w)] # これスカラじゃないの？
            a.append(tmp_x @ w + b[0])
        return np.array(a)

    # 共有されている誤差をすべて足すことで勾配を求める?
    def backward(self, x, w, da):
        db = np.sum(da)
        dw = []
        for i in range(len(w)):
            dw.append(da @ x[i:i+len(da)])
        dw = np.array(dw)
        dx = []
        new_w = np.insert(w[::-1], 0, 0)
        new_w = np.append(new_w, 0)
        for i in range(len(new_w)-1):
            dx.append(new_w[i:i+len(da)] @ da)
        dx = np.array(dx[::-1])
        return db, dw, dx

### 【問題2】1次元畳み込み後の出力サイズの計算

In [18]:
# n_out: 出力の特徴量の数
# n_in:  入力の特徴量の数
# p:     ある方向へのパディングの数
# f:     フィルタのサイズ
# s:     すとらいどのサイズ
def output_size_calculation(n_in, f, p=0, s=1):
    return int((n_in + 2*p - f) / s + 1)

### 【問題3】小さな配列での1次元畳み込み層の実験


In [22]:
x = np.array([1,2,3,4])
w = np.array([3, 5, 7])
b = np.array([1])
d_a = np.array([10, 20])

conv = SimpleConv1d()
print(conv.forward(x, w, b))
db, dw, dx = conv.backward(x, w, d_a)
print(db)
print(dw)
print(dx)

[35 50]
30
[ 50  80 110]
[ 30 110 170 140]


### 【問題4】チャンネル数を限定しない1次元畳み込み層クラスの作成

In [91]:
class Conv1d:
    def __init__(self, b_size, initializer, optimizer, n_in_channels=1, n_out_channels=1, pa=0):
        self.b_size = b_size
        self.optimizer = optimizer
        self.pa = pa
        self.W = initializer.W(n_out_channels, n_in_channels, b_size)
        self.B = initializer.B(n_out_channels)
        self.n_in_channels = n_in_channels
        self.n_out_channels = n_out_channels
        self.n_out = None

    def forward(self, X):
        self.n_in = X.shape[-1]
        self.n_out = output_size_calculation(self.n_in, self.b_size, self.pa)
        X = X.reshape(self.n_in_channels, self.n_in)
        self.X = np.pad(X, ((0,0), ((self.b_size-1), 0)))
        self.X1 = np.zeros((self.n_in_channels, self.b_size, self.n_in+(self.b_size-1)))
        for i in range(self.b_size):
            self.X1[:, i] = np.roll(self.X, -i, axis=-1)
        A = np.sum(self.X1[:, :, self.b_size-1-self.pa:self.n_in+self.pa]*self.W[:, :, :, np.newaxis], axis=(1, 2)) + self.B.reshape(-1,1)
        return A

    def backward(self, dA):
        self.dW = np.sum(np.dot(dA, self.X1[:, :, self.b_size-1-self.pa:self.n_in+self.pa, np.newaxis]), axis=-1)
        self.dB = np.sum(dA, axis=1)
        self.dA = np.pad(dA, ((0,0), (0, (self.b_size-1))))
        self.dA1 = np.zeros((self.n_out_channels, self.b_size, self.dA.shape[-1]))
        for i in range(self.b_size):
            self.dA1[:, i] = np.roll(self.dA, i, axis=-1)
        dX = np.sum(self.W@self.dA1, axis=0)
        self.optimizer.update(self)
        return dX

x = np.array([[1, 2, 3, 4], [2, 3, 4, 5]])
conv = Conv1d(b_size=3, initializer=SimpleInitializer(0.01), optimizer=SGD(0.01), n_in_channels=2, n_out_channels=3, pa=0)
conv.W = np.ones((3, 2, 3), dtype=float)
conv.B = np.array([1, 2, 3], dtype=float)
result = conv.forward(x)
print(result)

[[16. 22.]
 [17. 23.]
 [18. 24.]]


### 【問題8】学習と推定

In [113]:
class Softmax:

    def forward(self, X):
        self.Z = np.exp(X) / np.sum(np.exp(X), axis=1).reshape(-1,1)
        return self.Z

    def backward(self, Y):
        self.loss = self.loss_func(Y)
        return self.Z - Y

    def loss_func(self, Y, Z=None):
        if Z is None:
            Z = self.Z
        return (-1)*np.average(np.sum(Y*np.log(Z), axis=1))

class Conv1d2:
    def __init__(self, b_size, initializer, optimizer, n_in_channels=1, n_out_channels=1, pa=0, stride=1):
        self.b_size = b_size
        self.optimizer = optimizer
        self.pa = pa
        self.stride = stride
        self.W = initializer.W(n_out_channels, n_in_channels, b_size)
        self.B = initializer.B(n_out_channels)
        self.n_in_channels = n_in_channels
        self.n_out_channels = n_out_channels
        self.n_out = None

    def forward(self, X):
        self.n_samples = X.shape[0]
        self.n_in = X.shape[-1]
        self.n_out = output_size_calculation(self.n_in, self.b_size, self.pa, self.stride)
        X = X.reshape(self.n_samples, self.n_in_channels, self.n_in)
        self.X = np.pad(X, ((0,0), (0,0), ((self.b_size-1), 0)))
        self.X1 = np.zeros((self.n_samples, self.n_in_channels, self.b_size, self.n_in+(self.b_size-1)))
        for i in range(self.b_size):
            self.X1[:, :, i] = np.roll(self.X, -i, axis=-1)
        A = np.sum(self.X1[:, np.newaxis, :, :, self.b_size-1-self.pa:self.n_in+self.pa:self.stride]*self.W[:, :, :, np.newaxis], axis=(2, 3)) + self.B.reshape(-1,1)
        return A

    def backward(self, dA):
        self.dW = np.sum(dA[:, :, np.newaxis, np.newaxis]*self.X1[:, np.newaxis, :, :, self.b_size-1-self.pa:self.n_in+self.pa:self.stride], axis=(0, -1))
        self.dB = np.sum(dA, axis=(0, -1))
        self.dA = np.pad(dA, ((0,0), (0,0), (0, (self.b_size-1))))
        self.dA1 = np.zeros((self.n_samples, self.n_out_channels, self.b_size, self.dA.shape[-1]))
        for i in range(self.b_size):
            self.dA1[:, :, i] = np.roll(self.dA, i, axis=-1)
        dX = np.sum(self.W[:, :, :, np.newaxis]*self.dA1[:, :, np.newaxis], axis=(1,3))
        self.optimizer.update(self)
        return dX

# スクラッチクラス
class ScratchCNNClassifier:

    def __init__(self, epoch=10, lr=0.01, batch_size=20, n_features=784, n_nodes1=400, n_nodes2=200, n_output=10, Activater=Tanh, Optimizer=AdaGrad):
        self.sigma = 0.02
        self.epoch = epoch
        self.lr = lr
        self.batch_size = batch_size
        self.n_features = n_features
        self.n_nodes2 = n_nodes2
        self.n_output = n_output
        self.Activater = Activater
        if Activater == Sigmoid or Activater == Tanh:
            self.Initializer = XavierInitializer
        elif Activater == ReLU:
            self.Initializer = HeInitializer
        self.Optimizer = Optimizer

    def fit(self, X, y, X_val=None, y_val=None):
        self.val_enable = False
        if X_val is not None:
            self.val_enable = True
        self.Conv1d = Conv1d2(b_size=7, initializer=SimpleInitializer(0.01), optimizer=self.Optimizer(self.lr), n_in_channels=1, n_out_channels=1, pa=3, stride=2)
        self.Conv1d.n_out = output_size_calculation(X.shape[-1], self.Conv1d.b_size, self.Conv1d.pa, self.Conv1d.stride)
        self.activation1 = self.Activater()
        self.FC2 = FC(1*self.Conv1d.n_out, self.n_nodes2, self.Initializer(self.sigma), self.Optimizer(self.lr))
        self.activation2 = self.Activater()
        self.FC3 = FC(self.n_nodes2, self.n_output, self.Initializer(self.sigma), self.Optimizer(self.lr))
        self.activation3 = Softmax()

        self.loss = []
        self.loss_epoch = [self.activation3.loss_func(y, self.forward_propagation(X))]

        for i in range(self.epoch):
            print(f"epoch: {i}")
            get_mini_batch = GetMiniBatch(X, y, batch_size=self.batch_size)
            self.iter = len(get_mini_batch)
            for mini_X, mini_y in get_mini_batch:
                self.forward_propagation(mini_X)
                self.back_propagation(mini_X, mini_y)
                self.loss.append(self.activation3.loss)
            self.loss_epoch.append(self.activation3.loss_func(y, self.forward_propagation(X)))

    def predict(self, X):
        return np.argmax(self.forward_propagation(X), axis=1)

    def forward_propagation(self, X):
        A1 = self.Conv1d.forward(X)
        A1 = A1.reshape(A1.shape[0], A1.shape[-1])
        Z1 = self.activation1.forward(A1)
        A2 = self.FC2.forward(Z1)
        Z2 = self.activation2.forward(A2)
        A3 = self.FC3.forward(Z2)
        Z3 = self.activation3.forward(A3)
        return Z3

    def back_propagation(self, X, y_true):
        dA3 = self.activation3.backward(y_true)
        dZ2 = self.FC3.backward(dA3)
        dA2 = self.activation2.backward(dZ2)
        dZ1 = self.FC2.backward(dA2)
        dA1 = self.activation1.backward(dZ1)
        dA1 = dA1[:, np.newaxis]
        dZ0 = self.Conv1d.backward(dA1)

#
# 前処理
#

# データセット
(x_train, y_train), (x_test, y_test) = mnist.load_data() # shape(60000, 28, 28)

# 平滑化
x_train = x_train.reshape(-1, 784) # shape(60000, 784)
x_test = x_test.reshape(-1, 784)   # shape(60000, 784)
# スケール調整
x_train = x_train.astype(np.float)
x_test = x_test.astype(np.float)
x_train /= 255
x_test /= 255
# one-hot encoding
from sklearn.preprocessing import OneHotEncoder
enc = OneHotEncoder(handle_unknown='ignore', sparse=False)
y_train_one_hot = enc.fit_transform(y_train[:, np.newaxis])
y_test_one_hot = enc.transform(y_test[:, np.newaxis])

def result(model, x_train, y_train, x_test, y_test):
    # 学習結果
    # display(model.loss)

    # 予測
    pred_train = model.predict(x_train)
    pred_test = model.predict(x_test)

    # 評価
    print("train accuracy_score: \t", accuracy_score(y_train, pred_train))
    print("train precision_score: \t", precision_score(y_train, pred_train, average="micro"))
    print("train recall_score: \t", recall_score(y_train, pred_train, average="micro"))
    print("train f1_score: \t", f1_score(y_train, pred_train, average="micro"))
    print("train accuracy_score: \t", accuracy_score(y_test, pred_test))
    print("train precision_score: \t", precision_score(y_test, pred_test, average="micro"))
    print("train recall_score: \t", recall_score(y_test, pred_test, average="micro"))
    print("train f1_score: \t", f1_score(y_test, pred_test, average="micro"))


In [111]:
# 学習 AdaGrad, Xavier, Tanh
import time
start = time.time()

m = ScratchCNNClassifier(epoch=5, lr=0.01, batch_size=20, n_features=784, n_nodes1=400, n_nodes2=400, n_output=10, Activater=Tanh, Optimizer=SGD)
m.fit(x_train, y_train_one_hot, x_test, y_test_one_hot)

print ("elapsed_time: {} ".format(int(time.time() - start)) + "[sec]")

# y_pred = test3.predict(x_test)
# accuracy_score(y_test, y_pred)

epoch: 0
epoch: 1
epoch: 2
epoch: 3
epoch: 4
elapsed_time: 72 [sec]


In [114]:
result(m, x_train, y_train, x_test, y_test)



train accuracy_score: 	 0.9870833333333333
train precision_score: 	 0.9870833333333333
train recall_score: 	 0.9870833333333333
train f1_score: 	 0.9870833333333333
train accuracy_score: 	 0.9765
train precision_score: 	 0.9765
train recall_score: 	 0.9765
train f1_score: 	 0.9765
