<a href="https://colab.research.google.com/github/jiaiaraki/test/blob/main/20201125_myOwnCNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### numpyだけでCNN実装
https://qiita.com/ta-ka/items/1c588dd0559d1aad9921

**<font color = red size = 8>！overflow！</font>**

#### CNN
- 畳込み層
- プーリング層

#### 学習
- 重みの更新
- 誤差逆伝播

#### pythonでの実装
- 畳込み層の実装
- プーリング層の実装

#### MNISTデータセットでの実験
- 学習
- 結果

In [None]:
## activator.py

import numpy as np

class Activator:
  class linear(object):
    def __init__(self):
      'linear class'

    def activate(self, x):
      return x

    def derivate(self, x):
      return np.ones_like(x)


  class sigmoid(object):
    def __init__(self):
      'sigmoid class'

    def activate(self, x):
      return 1.0 / (1.0 + np.exp(-x))

    def derivate(self, x):
      return x * (1.0 - x)


  class softmax(object):
    def __init__(self):
      'softmax class'
      
    def activate(self, x):
      exp = np.exp(x)
      return exp / exp.sum(axis = 0)

    def derivate(self, x):
      return x * (1.0 - x)


  class relu(object):
    def __init__(self):
      'relu class'

    def activate(self, x):
      return np.maximum(x, 0.0)

    def derivate(self, x):
      return np.vectorize(lambda _x: 0.0 if _x < 0.0 else 1.0)(x)

In [None]:
## cnn.py

import numpy as np
from matplotlib import pyplot

class CNN(object):
  def __init__(self, conv1, pool1, conv2, pool2, neural, error):
    self.conv1 = conv1
    self.pool1 = pool1
    self.conv2 = conv2
    self.pool2 = pool2
    self.neural = neural
    self.error = error

  def train(self, X, T, epsilon, lam, gamma, s_batch, epochs):
    n_data = X.shape[0]
    self.__set_loss(epochs)
    for epo in range(epochs):
      perm = np.random.permutation(n_data)
      for i in range(0, n_data, s_batch):
        x, t = X[perm[i:i+s_batch]], T[perm[i:i+s_batch]].T

        # forward
        cv1, pl1, cv2, pl2, u, z, y = self.__forward(x, s_batch)

        # backward
        output_delta, hidden_delta, input_delta = self.neural.backward(t, y, z, u, self.error)
        pl2_delta, cv2_delta, pl1_delta, cv1_delta = self.__backward(input_delta, pl2, cv2, pl1, cv1)

        # update weight
        self.neural.update_weight(output_delta, hidden_delta, z, u, epsilon, lam)
        self.conv2.update_weight(cv2_delta, epsilon)
        self.conv1.update_weight(cv1_delta, epsilon)

        # accumulate loss
        self.__accumulate_loss(y, t, n_data, epo)

      # update learning rate
      if (epo + 1) % 10 == 0: epsilon = self.__update_epsilon(epsilon, gamma)
      print('epoch: {0}, loss: {1}'.format(epo, self.__loss[epo]))

  def predict(self, X):
    return self.__forward(X, X.shape[0])[6]

  def accuracy(self, Y, T):
    return (Y.argmax(axis = 0) == T.argmax(axis = 1)).sum() * 1.0 / Y.shape[1]

  def save_lossfig(self, fn = 'loss.png'):
    pyplot.plot(np.arange(self.__loss.size), self.__loss)
    pyplot.savefig(fn)

  def __forward(self, X, s_batch):
    cv1 = self.conv1.forward(X)
    pl1 = self.pool1.forward(cv1)
    cv2 = self.conv2.forward(pl1)
    pl2 = self.pool2.forward(cv2)
    u = pl2.reshape(s_batch, -1).T
    z, y = self.neural.forward(u)
    return cv1, pl1, cv2, pl2, u, z, y

  def __backward(self, input_delta, pl2, cv2, pl1, cv1):
    pl2_delta = input_delta.reshape(pl2.shape)
    cv2_delta = self.pool2.backward(cv2, pl2_delta, self.conv2.activator)
    pl1_delta = self.conv2.backward(cv2_delta, pl1.shape)
    cv1_delta = self.pool1.backward(cv1, pl1_delta, self.conv1.activator)
    return pl2_delta, cv2_delta, pl1_delta, cv1_delta

  def __update_epsilon(self, epsilon, gamma):
    return gamma * epsilon

  def __set_loss(self, epochs):
    self.__loss = np.zeros(epochs)

  def __accumulate_loss(self, y, t, n_data, epo):
    self.__loss[epo] += self.error.delta(y, t) / n_data

In [None]:
## convolution.py

import numpy as np

class Convolution(object):
  def __init__(self, m, k, kh, kw, act):
    self.kh = kh
    self.kw = kw
    self.weight = np.random.normal(0.0, np.sqrt(2.0 / (m * kh * kw)), (m, k, kh, kw))
    self.activator = act

  def forward(self, X):
    return self.activator.activate(self.__forward(X))

  def backward(self, delta, shape):
    s_batch, k, h, w = delta.shape
    delta_patch = np.tensordot(delta.reshape(s_batch, k, h * w), self.weight, (1, 0))
    return self.__patch2im(delta_patch, h, w, shape)

  def update_weight(self, delta, epsilon):
    s_batch, k, h, w = delta.shape
    self.weight -= epsilon * self.__grad(delta, s_batch, k, h, w)

  def __forward(self, X):
    s_batch, k, xh, xw = X.shape
    m = self.weight.shape[0]
    oh, ow = xh - int(self.kh / 2) * 2, xw - int(self.kw / 2) * 2
    self.__patch = self.__im2patch(X, s_batch, k, oh, ow)
    return np.tensordot(self.__patch, self.weight, ((2, 3, 4), (1, 2, 3))).swapaxes(1, 2).reshape(s_batch, m, oh, ow)

  def __im2patch(self, X, s_batch, k, oh, ow):
    patch = np.zeros((s_batch, oh * ow, k, self.kh, self.kw))
    for j in range(oh):
      for i in range(ow):
        patch[:, j * ow + i, :, :, :] = X[:, :, j:j+self.kh, i:i+self.kw]
    return patch

  def __patch2im(self, patch, h, w, shape):
    im = np.zeros(shape)
    for j in range(h):
      for i in range(w):
        im[:, :, j:j+self.kh, i:i+self.kw] += patch[:, j * w + i]
    return im

  def __grad(self, delta, s_batch, k, h, w):
    return np.tensordot(delta.reshape(s_batch, k, h * w), self.__patch, ((0, 2), (0, 1))) / s_batch

In [None]:
## error.py

import numpy as np

class Error:
  class squared(object):
    def __init__(self):
      'squared class'

    def delta(self, y, t):
      return (y - t).T.dot(y - t).sum() / 2.0

    def derivated_delta(self, y, t):
      return y - t


  class cross_entropy(object):
    def __init__(self):
      'cross entropy'

    def delta(self, y, t):
      return (-t * np.log(y)).sum()

    def derivated_delta(self, y, t):
      return (y - t) / (y * (1.0 - y))

In [None]:
## nn.py

import numpy as np

class NN(object):
  def __init__(self, n_input, n_hidden, n_output, input_act, hidden_act, output_act):
    self.n_input = n_input
    self.n_hidden = n_hidden
    self.n_output = n_output
    self.hidden_weight = np.random.randn(n_hidden, n_input + 1) * 0.01
    self.output_weight = np.random.randn(n_output, n_hidden + 1) * 0.01
    self.input_act = input_act
    self.hidden_act = hidden_act
    self.output_act = output_act

  def forward(self, x):
    z = self.__forward(x, self.hidden_weight, self.hidden_act)
    y = self.__forward(z, self.output_weight, self.output_act)
    return z, y

  def backward(self, t, y, z, u, error):
    output_delta = y - t
    # error.derivated_delta(t, y) * self.output_act.derivate(y)
    hidden_delta = self.__delta(self.output_weight, output_delta, z, self.hidden_act)
    input_delta = self.__delta(self.hidden_weight, hidden_delta, u, self.input_act)
    return output_delta, hidden_delta, input_delta

  def update_weight(self, output_delta, hidden_delta, z, u, epsilon, lam):
    s_batch = z.shape[1]
    reg_term = np.hstack((np.zeros((self.n_output, 1)), self.output_weight[:, 1:]))
    self.output_weight -= epsilon * (self.__grad(z, output_delta, s_batch) + lam * reg_term)
    self.hidden_weight -= epsilon * self.__grad(u, hidden_delta, s_batch)

  def __forward(self, x, weight, act):
    return act.activate(weight.dot(np.vstack((np.ones((x.shape[1])), x))))

  def __delta(self, weight, delta, x, act):
    return weight[:, 1:].T.dot(delta) * act.derivate(x)

  def __grad(self, x, delta, s_batch):
    return delta.dot(np.vstack((np.ones((1, x.shape[1])), x)).T) / s_batch

In [None]:
## pooling.py

import numpy as np

class Pooling(object):
  def __init__(self, kh, kw, s):
    self.kh = kh
    self.kw = kw
    self.s = s

  def forward(self, X):
    s_batch, k, h, w = X.shape
    oh, ow = int((h - self.kh) / self.s) + 1, int((w - self.kw) / self.s) + 1
    val, self.__ind = self.__max(X, s_batch, k, oh, ow)
    return val

  def backward(self, X, delta, act):
    s_batch, k, h, w = X.shape
    oh, ow = delta.shape[2:]
    rh, rw = int(h / oh), int(w / ow)
    ind = np.arange(s_batch * k * oh * ow) * rh * rw + self.__ind.flatten()
    return self.__backward(delta, ind, s_batch, k, h, w, oh, ow) * act.derivate(X)

  def __max(self, X, s_batch, k, oh, ow):
    patch = self.__im2patch(X, s_batch, k, oh, ow)
    return map(lambda _f: _f(patch, axis = 3).reshape(s_batch, k, oh, ow), [np.max, np.argmax])

  def __im2patch(self, X, s_batch, k, oh, ow):
    patch = np.zeros((s_batch, oh * ow, k, self.kh, self.kw))
    for j in range(oh):
      for i in range(ow):
        _j, _i = j * self.s, i * self.s
        patch[:, j * ow + i, :, :, :] = X[:, :, _j:_j+self.kh, _i:_i+self.kw]
    return patch.swapaxes(1, 2).reshape(s_batch, k, oh * ow, -1)

  def __backward(self, delta, ind, s_batch, k, h, w, oh, ow):
    _delta = np.zeros(s_batch * k * h * w)
    _delta[ind] = delta.flatten()
    return _delta.reshape(s_batch, k, oh, ow, self.kh, self.kw).swapaxes(3, 4).reshape(s_batch, k, h, w)

In [None]:
## main.py

import numpy as np
import os

def read_data(fn):
  ml = np.loadtxt(fn, delimiter = ',')
  X, t = np.hsplit(ml, [-1])
  return X / X.max(), t.astype('int')

def create_label(t, n_data, n_class):
  T = np.zeros((n_data, n_class))
  T[np.arange(n_data), t[:, 0]] = 1.0
  return T

In [None]:
print('make train data...')
fn_train = 'https://pjreddie.com/media/files/mnist_train.csv'
X1, t1 = read_data(fn_train)
n_data1, n_input1 = X1.shape
n_class1 = np.unique(t1).size
T1 = create_label(t1, n_data1, n_class1)

i1 = np.random.permutation(n_data1)[:len(X1)]
X_train = X1[i1, :].reshape(len(X1), 1, 28, 28)
T_train = T1[i1, :]

make train data...


In [None]:
print('make test data...')
fn_test = 'https://pjreddie.com/media/files/mnist_test.csv'
X2, t2 = read_data(fn_test)
n_data2, n_input2 = X2.shape
n_class2 = np.unique(t2).size
T2 = create_label(t2, n_data2, n_class2)

i2 = np.random.permutation(n_data2)[:len(X2)]
X_test = X2[i2, :].reshape(len(X2), 1, 28, 28)
T_test = T2[i2, :]

make test data...


In [None]:
print('initialize...')
linear, sigmoid, softmax, relu = Activator.linear(), Activator.sigmoid(), Activator.softmax(), Activator.relu()
conv1, conv2 = Convolution(20, 1, 5, 5, relu), Convolution(50, 20, 5, 5, relu)
pool1, pool2 = Pooling(2, 2, 2), Pooling(2, 2, 2)
neural = NN(800, 500, 10, linear, sigmoid, softmax)
error = Error.cross_entropy()
cnn = CNN(conv1, pool1, conv2, pool2, neural, error)

initialize...


In [None]:
print('train...')
cnn.train(X_train, T_train, epsilon = 0.005, lam = 0.0001, gamma = 0.9, s_batch = 5, epochs = 50)

train...


  return umr_sum(a, axis, dtype, out, keepdims, initial, where)
  outputs = ufunc(*inputs)


KeyboardInterrupt: ignored

In [None]:
print('predict...')
Y_test = cnn.predict(X_test)
accuracy = cnn.accuracy(Y_test, T_test)
print('accuracy: {0}'.format(accuracy))

In [None]:
print('save figure of loss...')
cnn.save_lossfig()