# 畳み込みニューラルネットワークを用いた画像認識
---

## 目的
畳み込みニューラルネットワーク (Convolutional Neural Network; CNN) を用いてCIFAR10データセットに対する物体認識を行う．


## 対応するチャプター
* 8.1.3: バッチアルゴリズムとミニバッチアルゴリズム
* 8.3.1: 確率的勾配降下法
* 9.1: 畳み込み処理
* 9.3: プーリング


## モジュールのインポート
プログラムの実行に必要なモジュールをインポートします．
`pickle`はPythonのリストや辞書などのオブジェクトを保存・読み込みを行うためのライブラリです．今回はCIFAR-10データセットを読み込むために使用します・

In [None]:
import numpy as np
import pickle
from time import time

## データセットの読み込み
実験に使用するCIFAR-10データセットを読み込みます．

まず，CIFAR-10データセットをダウンロードします．

In [None]:
# CIFAR-10データセットのダウンロード
!wget https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz -O cifar-10-python.tar.gz
!tar zxvf cifar-10-python.tar.gz

次に，ダウンロードしたデータセットを読み込みます．

In [None]:
def unpickle(file):
    with open(file, 'rb') as fo:
        dict = pickle.load(fo, encoding='bytes')
    return dict

def load_cifar10_train():
    data = np.empty((0, 3 * 32 * 32))
    label = np.empty(0)
    
    for i in range(1, 6):
        tmp_data = unpickle("cifar-10-batches-py/data_batch_%d" % i)
        data = np.vstack((data, tmp_data[b'data']))
        label = np.hstack((label, tmp_data[b'labels']))
    data = data.reshape(-1, 3, 32, 32).astype(np.float32)
    label = label.astype(np.int64)
    return data, label

def load_cifar10_test():
    tmp_data = unpickle("cifar-10-batches-py/test_batch")
    data = tmp_data[b'data']
    label = np.array(tmp_data[b'labels'])
    data = data.reshape(-1, 3, 32, 32).astype(np.float32)
    label = label.astype(np.int64)
    return data, label

x_train, y_train = load_cifar10_train()
x_test, y_test = load_cifar10_test()

# 正規化（0~1）
x_train /= 255.
x_test /= 255.

print(x_train.shape, y_train.shape)
print(x_test.shape, y_test.shape)

## ネットワークモデルの定義
次に，CNNを定義します．

まずはじめに，ネットワークの定義に必要な関数を定義します．

In [None]:
def relu(x):
    return np.maximum(0, x)

def relu_grad(x):
    grad = np.zeros(x.shape)
    grad[x >= 0] = 1
    return grad

def softmax(x):
    if x.ndim == 2:
        x = x.T
        x = x - np.max(x, axis=0)
        y = np.exp(x) / np.sum(np.exp(x), axis=0)
        return y.T 

    x = x - np.max(x)
    return np.exp(x) / np.sum(np.exp(x))

def cross_entropy(y, t):
    if y.ndim == 1:
        t = t.reshape(1, t.size)
        y = y.reshape(1, y.size)

    if t.size == y.size:
        t = t.argmax(axis=1)

    batch_size = y.shape[0]
    return -np.sum(np.log(y[np.arange(batch_size), t] + 1e-7)) / batch_size

def softmax_cross_entropy(x, t):
    y = softmax(x)
    return cross_entropy(y, t)

def multiclass_classification_accuracy(pred, true):
    clf_res = np.argmax(pred, axis=1)
    return np.sum(clf_res == true).astype(np.float32)

`im2col`およびその逆の変換の`col2im`も定義を行います．

In [None]:
def im2col(input_image, kernel_h, kernel_w, stride=1, padding=0):
    n, c, h, w = input_image.shape
    
    dst_h = (h + 2 * padding - kernel_h) // stride + 1
    dst_w = (w + 2 * padding - kernel_w) // stride + 1
    
    image = np.pad(input_image, [(0,0), (0,0), (padding, padding), (padding, padding)], 'constant')
    col = np.zeros((n, c, kernel_h, kernel_w, dst_h, dst_w))
    
    for y in range(kernel_h):
        y_max = y + stride * dst_h
        for x in range(kernel_w):
            x_max = x + stride * dst_w
            col[:, :, y, x, :, :] = image[:, :, y:y_max:stride, x:x_max:stride]
    
    col = col.transpose(0, 4, 5, 1, 2, 3).reshape(n * dst_h * dst_w, -1)
    return col

def col2im(col, input_shape, kernel_h, kernel_w, stride=1, padding=0):
    n, c, h, w = input_shape
    out_h = (h + 2 * padding - kernel_h) // stride + 1
    out_w = (w + 2 * padding - kernel_w) // stride + 1
    col = col.reshape(n, out_h, out_w, c, kernel_h, kernel_w).transpose(0, 3, 4, 5, 1, 2)

    img = np.zeros((n, c, h + 2 * padding + stride - 1, w + 2 * padding + stride - 1))
    for y in range(kernel_h):
        y_max = y + stride * out_h
        for x in range(kernel_w):
            x_max = x + stride*out_w
            img[:, :, y:y_max:stride, x:x_max:stride] += col[:, :, y, x, :, :]

    return img[:, :, padding:h + padding, padding:w + padding]

畳み込みおよびプーリングの処理は煩雑になってしまうため，関数として定義します．

In [None]:
def conv(x, w, b, stride=1, padding=0):
    FN, C, FH, FW = w.shape
    N, C, H, W = x.shape

    out_h = 1 + int((H + 2 * padding - FH) / stride)
    out_w = 1 + int((W + 2 * padding - FW) / stride)

    col = im2col(x, FH, FW, stride, padding)
    col_w = w.reshape(FN, -1).T

    out = np.dot(col, col_w) + b
    out = out.reshape(N, out_h, out_w, -1).transpose(0, 3, 1, 2)
    
    return out, col, col_w

def conv_grad(dout, x, col, col_w, w, b, stride=1, padding=0):
    FN, C, FH, FW = w.shape
    dout = dout.transpose(0, 2, 3, 1).reshape(-1, FN)
    
    grad_b = np.sum(dout, axis=0)
    grad_w = np.dot(col.T, dout)
    grad_w = grad_w.transpose(1, 0).reshape(FN, C, FH, FW)
    
    dcol = np.dot(dout, col_w.T)
    dx = col2im(dcol, x.shape, FH, FW, stride, padding)

    return dx, grad_w, grad_b
    
def maxpool(x, pool_size=2, stride=2, padding=0):
    N, C, H, W = x.shape
    out_h = int(1 + (H - pool_size) / stride)
    out_w = int(1 + (W - pool_size) / stride)
    
    col = im2col(x, pool_size, pool_size, stride, padding)
    col = col.reshape(-1, pool_size * pool_size)
    
    arg_max = np.argmax(col, axis=1)
    out = np.max(col, axis=1)
    out = out.reshape(N, out_h, out_w, C).transpose(0, 3, 1, 2)

    return out, arg_max

def maxpool_grad(dout, x, arg_max, p_size=2, stride=2, padding=0):
    dout = dout.transpose(0, 2, 3, 1)
    pool_size = p_size * p_size

    dmax = np.zeros((dout.size, pool_size))
    dmax[np.arange(arg_max.size), arg_max.flatten()] = dout.flatten()
    dmax = dmax.reshape(dout.shape + (pool_size,)) 

    dcol = dmax.reshape(dmax.shape[0] * dmax.shape[1] * dmax.shape[2], -1)
    dx = col2im(dcol, x.shape, p_size, p_size, stride, padding)

    return dx

次に，上で定義した関数を用いてネットワークを定義します．
ここでは，畳み込み層，中間層，出力層から構成されるCNNとします．

入力画像のチャンネル数と，畳み込みのカーネルサイズ，畳み込みのカーネル数を引数として指定します．
さらに，中間層，出力層のユニット数は引数として与え，それぞれ`hidden_size`, `output_size`とします．
そして，`__init__`関数を用いて，ネットワークのパラメータを初期化します．
`w1`, `w2`, `w3`は各層の重みで，`b1`, `b2`, `b3`はバイアスを表しています．
重みは`randn`関数で，標準正規分布に従った乱数で生成した値を保有する配列を生成します．
バイアスは`zeros`関数を用いて，要素が全て0の配列を生成します．

そして，`forward`関数で，データを入力して結果を出力するための演算を定義します．

次に，`backward`関数ではパラメータの更新量を計算します．
まず，ネットワークの出力結果と教師ラベルから，誤差`dy`を算出します．
この時，教師ラベルをone-hotベクトルへ変換し，各ユニットの出力との差を取ることで，`dy`を計算しています．
その後，連鎖律に基づいて，出力層から順番に勾配を計算していきます．
このとき，パラメータの更新量を`self.grads`へ保存しておきます．

最後に`update_parameters`関数で，更新量をもとにパラメータの更新を行います．

In [None]:
class CNN:
    
    def __init__(self, n_channels=3, filter_size=3, num_kernel=64, hidden_size=128, output_size=10, w_std=0.01):
        
        # convolutional layer
        self.w1 = w_std * np.random.randn(num_kernel, n_channels, filter_size, filter_size)
        self.b1 = np.zeros(num_kernel)
        # hidden layer
        pooled_feature_size = int(num_kernel * (32 / 2) * (32 / 2))
        self.w2 = w_std * np.random.randn(pooled_feature_size, hidden_size)
        self.b2 = np.zeros(hidden_size)
        # output layer
        self.w3 = w_std * np.random.randn(hidden_size, output_size)
        self.b3 = np.zeros(output_size)
        # dict. for gradients
        self.grads = {}
        
    def forward(self, x):
        self.h1, self.h1_col, self.h1_col_w = conv(x, self.w1, self.b1, stride=1, padding=1)
        self.h2 = relu(self.h1)
        self.h3, self.h3_argmax = maxpool(self.h2, pool_size=2, stride=2, padding=0)
        self.h4 = np.dot(self.h3.reshape(self.h2.shape[0], -1), self.w2) + self.b2
        self.h5 = relu(self.h4)
        self.h6 = np.dot(self.h5, self.w3) + self.b3
        return self.h6    
        
    def backward(self, x, t):
        batch_size = x.shape[0]
        
        # forward  #####
        _ = self.forward(x)
        y = softmax(self.h6)
        
        # backward #####
        self.grads = {}
        
        t = np.identity(10)[t]
        
        dy = (y - t) / batch_size
        
        # output layer
        d_h5 = np.dot(dy, self.w3.T)
        self.grads['w3'] = np.dot(self.h5.T, dy)
        self.grads['b3'] = np.sum(dy, axis=0)
        
        # relu
        d_h4 = relu_grad(self.h4) * d_h5
        
        # hidden layer
        d_h3 = np.dot(d_h4, self.w2.T)
        self.grads['w2'] = np.dot(self.h3.T, d_h4)
        self.grads['b2'] = np.sum(d_h4, axis=0)
        
        # maxpool
        d_h3 = d_h3.reshape(self.h3.shape)
        d_h2 = maxpool_grad(d_h3, self.h2, self.h3_argmax, p_size=2, stride=2, padding=0)
        
        # relu
        d_h1 = relu_grad(self.h1) * d_h2
        
        # convolution
        _, self.grads['w1'], self.grads['b1'] = conv_grad(d_h1, x, self.h1_col, self.h1_col_w, self.w1, self.b2, stride=1, padding=1)
        
    def update_parameters(self, lr=0.1): 
        self.w1 -= lr * self.grads['w1']
        self.b1 -= lr * self.grads['b1']
        self.w2 -= lr * self.grads['w2'].reshape(self.w2.shape)
        self.b2 -= lr * self.grads['b2']
        self.w3 -= lr * self.grads['w3']
        self.b3 -= lr * self.grads['b3']

## ネットワークの作成と学習の準備

読み込んだCIFAR10データセットと作成したネットワークを用いて，学習を行います．

1回の誤差を算出するデータ数（ミニバッチサイズ）を100，学習エポック数を10とします．

学習データは毎回ランダムに決定するため，numpyの`permutation`という関数を利用します．
各更新において，学習用データと教師データをそれぞれ`x_batch`と`y_batch`とします．
学習モデルに`x_batch`を与えて，`h`を取得します．
取得した`h`は精度および誤差を算出するための関数へと入力され，値を保存します．
そして，誤差を`backward`関数で逆伝播し，`update_parameters`でネットワークの更新を行います．

In [None]:
model = CNN(n_channels=3, filter_size=3, num_kernel=64, hidden_size=256, output_size=10)

## 学習
学習したネットワークを用いて，テストデータに対する認識律の確認を行います．

In [None]:
num_train_data = x_train.shape[0]
batch_size = 100
epoch_num = 10

iteration = 1
start = time()
for epoch in range(1, epoch_num + 1):
    sum_accuracy = 0.0
    sum_loss= 0.0
    
    perm = np.random.permutation(num_train_data)
    for i in range(0, num_train_data, batch_size):
        x_batch = x_train[perm[i:i+batch_size]]
        y_batch = y_train[perm[i:i+batch_size]]
        
        h = model.forward(x_batch)
        sum_accuracy += multiclass_classification_accuracy(h, y_batch)
        loss = softmax_cross_entropy(h, y_batch)
        sum_loss += loss
        
        model.backward(x_batch, y_batch)
        model.update_parameters(lr=0.1)
        
        if iteration % 100 == 0:
            print("iteration: {}, loss: {}".format(iteration, loss))
        
        iteration += 1

    print("epoch: {}, mean loss: {}, mean accuracy: {}, elapsed time: {}".format(epoch,
                                                                                 sum_loss / num_train_data,
                                                                                 sum_accuracy / num_train_data,
                                                                                 time() - start))

## テスト
学習したネットワークを用いて，テストデータに対する認識率の確認を行います．

In [None]:
count = 0
num_test_data = x_test.shape[0]

for i in range(num_test_data):
    x = np.array([x_test[i]], dtype=np.float32)
    t = y_test[i]
    y = model.forward(x)
    pred = np.argmax(y.flatten())
    
    if pred == t:
        count += 1

print("test accuracy: {}".format(count / num_test_data))