In [None]:
import numpy as np

## 합성곱 및 풀링계층 구현

In [None]:
# im2col -> image to column

def im2col(input_data, filter_h, filter_w, stride=1, pad=0):
  # input_data: 4차원 배열 형태의 입력데이터(이미지 수, 채널 수, 높이, 너비)
  # filter_h : 필터의 높이
  # filter_w : 필터의 너비
  # returns: col(2차원 배열)

  N, C, H, W = input_data.shape
  out_h = (H+ 2*pad - filter_h) // stride + 1
  out_w = (W+ 2*pad - filter_w) // stride + 1

  img = np.pad(input_data, [(0,0), (0,0), (pad, pad), (pad,pad)], 'constant')
  col = np.zeros((N,C, filter_h, filter_w, out_h, out_w))

  for y in range(filter_h):
    y_max = y + stride*out_h
    for x in range(filter_w):
      x_max = x + stride*out_w
      col[:,:,y,x,:,:] = img[:,:,y:y_max:stride, x:x_max:stride]
  col = col.transpose(0,4,5,1,2,3).reshape(N*out_h*out_w, -1)
  return col


  # col2im -> column to image
  def col2im(col, input_shape, filter_h, filter_w, stride=1, pad=0):
    # col: 2차원 배열(입력데이터)
    # input_shape : 원래 이미지 데이터의 형상
    # return : img: 변환된 이미지

    N, C, H, W = input_shape
    out_h = (H + 2*pad - filter_h) // stride + 1
    out_w = (W + 2*pad - filter_w) // stride + 1
    col = col.reshape(N, out_h, out_w, C, filter_h, filter_w).transpose(0,3,4,5,1,2)

    img = np.zeros((N, C, H + 2*pad + stride - 1, W + 2*pad + stride - 1))
    for y in range(filter_h):
      y_max = y + stride * out_h
      for x in range(filter_w):
        x_max = x + stride*out_w
        img[:, :, y:y_max:stride, x:x_max:stride] += col[:, :, y, x, :, :]
    return img[:, :, pad:H+pad, pad:W + pad]

### 1. 합성곱 구현

In [None]:
class Convolution:
  def __init__(self, W, b, stride=1, pad=0):
    self.W = W
    self.b = b
    self.stride = stride
    self.pad = pad

  def forward(self, x):
    FN, C, FH, FW = self.W.shape # 필터개수, 채널, 필터높이, 필터너비
    N,C,H,W = x.shape
    out_h = int(1 + (H + 2*self.pad - FH) / self.stride)
    out_w = int(1 + (W + 2*self.pad - FW) / self.stride)

    col = im2col(x, FH, FW, self.stride, self.pad)
    col_W = self.W.reshape(FN, -1).T # 2차원 배열로 전개
    out = np.dot(col, col_W) + self.b

    out = out.reshape(N, out_h, out_w, -1).transpose(0,3,1,2) # transpose : 다차원 배열의 축 순서를 바꿔 줌
    return out

  def backward(self, dout):
    FN, C, FH, FW = self.W.shape
    dout = dout.transpose(0,2,3,1).reshape(-1, FN)

    self.db = np.sum(dout, axis=0)
    self.dW = np.dot(self.col.T, dout)
    self.dW = self.dW.transpose(1,0).reshape(FN,C,FH,FW)

    dcol = np.dot(dout, self.col_W.T)
    dx = col2im(dcol, self.x.shape, FH, FW, self.stride, self.pad)
    return dx

### 2. 풀링 계층

1. 입력 데이터를 전개한다.
2. 행별 최댓값을 구한다.
3. 적절한 모양으로 reshape을 해준다.

In [None]:
class Pooling:
  def __init__(self, pool_h, pool_w, stride=1, pad=0):
    self.pool_h = pool_h
    self.pool_w = pool_w
    self.stride = stride
    self.pad = pad

  def forward(self, x):
    N, C, H, W = x.shape
    out_h = int(1 + (H - self.pool_h) / self.stride)
    out_w = int(1 + (W - self.pool_w) / self.stride)

    # 전개
    col = im2col(x, self.pool_h, self.pool_w, self.stride, self.pad)
    col = col.reshape(-1, self.pool_h * self.pool_w)

    # 최대값
    out = np.max(col, axis=1)

    # 성형
    out = out.reshape(N, out_h, out_w, C).transpose(0,3,1,2)
    return out

  def backward(self, dout):
    dout = dout.transpose(0, 2, 3, 1)

    pool_size = self.pool_h * self.pool_w
    dmax = np.zeros((dout.size, pool_size))
    dmax[np.arange(self.arg_max.size), self.arg_max.flatten()] = dout.flatten()
    dmax = dmax.reshape(dout.shape + (pool_size,))

    dcol = dmax.reshape(dmax.shape[0] * dmax.shape[1] * dmax.shape[2], -1)
    dx = col2im(dcol, self.x.shape, self.pool_h, self.pool_w, self.stride, self.pad)

    return dx

### 3. CNN 구현
  -  CNN 구성: Conv -> ReLU -> Pooling -> ... -> Affine -> ReLU -> Affine -> Softmax

In [None]:
def softmax(x):
    if x.ndim == 2:
        x = x.T
        x = x - np.max(x, axis=0)
        y = np.exp(x) / np.sum(np.exp(x), axis=0)
        return y.T 

    x = x - np.max(x) # 오버플로 대책
    return np.exp(x) / np.sum(np.exp(x))

def cross_entropy_error(y, t):
    if y.ndim == 1:
        t = t.reshape(1, t.size)
        y = y.reshape(1, y.size)
        
    # 훈련 데이터가 원-핫 벡터라면 정답 레이블의 인덱스로 반환
    if t.size == y.size:
        t = t.argmax(axis=1)
             
    batch_size = y.shape[0]
    return -np.sum(np.log(y[np.arange(batch_size), t])) / batch_size

In [None]:
class Relu:
    def __init__(self):
      self.mask = None
    def forward(self, x):
      self.mask = (x <= 0)
      out = x.copy()
      out[self.mask] = 0
      return out
    def backward(self, out):
      out[self.mask] = 0
      x = out
      return x

class Affine:
  def __init__(self, W, b):
    self.W = W
    self.b = b
    self.x = None
    self.dW = None
    self.db = None
  
  def forward(self, x):
    self.x = x
    out = np.dot(x, self.W) + self.b
    return out

  def backward(self, out):
    dx = np.dot(out, self.W.T)
    self.dW = np.dot(self.x.T, out)
    self.db = np.sum(out, axis=0)
    return dx

class SoftmaxWithLoss:
  def __init__(self):
    self.loss = None
    self.y = None
    self.t = None

  def forward(self,x,t):
    self.t = t
    self.y = softmax(x)
    self.loss = cross_entropy_error(self.y, self,t)
    return self.loss

  def backward(self, dout=1):
    batch_size = self.t.shape[0]
    dx = (self.y - self.t) / batch_size
    return dx

In [None]:
class SimpleConvNet:
  def __init__(self, input_dim = (1,28,28),
               conv_param = {"filter_num":30, "filter_size":5, "pad":0, "stride":1},
               hidden_size = 100, output_size=10, weight_init_std=0.01):
    # conv_param: 합성곱 계층의 하이퍼파라미터를 딕셔너리 형태로 주어짐
    filter_num = conv_param["filter_num"]
    filter_size = conv_param["filter_size"]
    filter_pad = conv_param["pad"]
    filter_stride = conv_param["stride"]
    input_size = input_dim[1]
    conv_output_size = (input_size - filter_size + 2*filter_pad) / filter_stride + 1
    pool_output_size = int(filter_num * (conv_output_size/2) * (conv_output_size/2))

    self.params = {}
    self.params["W1"] = weight_init_std * np.random.randn(filter_num, input_dim[0], filter_size, filter_size)
    self.params["b1"] = np.zeros(filter_num)
    self.params["W2"] = weight_init_std * np.random.randn(pool_output_size, hidden_size)
    self.params["b2"] = np.zeros(hidden_size)
    self.params["W3"] = weight_init_std * np.random.randn(hidden_size, output_size)
    self.params["b3"] = np.zeros(output_size)

    self.layers = OrderedDict()
    self.layers["Conv1"] = Convolution(self.params["W1"], self.params["b1"], conv_param["stride"]. conv_param["pad"])
    self.layers["Relu1"] = Relu()
    self.layers["Pool1"] = Pooling(pool_h=2, pool_w=2, stride=2)
    self.layers["Affine1"] = Affine(self.params["W2"], self.params["b2"])
    self.layers["Relu2"] = Relu()
    self.layers["Affine2"] = Affine(self.params["W3"], self.params["b3"])
    self.last_layer = SoftmaxWithLoss()

  def predict(self,x):
    for layer in self.layers.values():
      x = layer.forward(x)
    return x
  
  def loss(self,x,t):
    y = self.predict(x)
    return self.last_layer.forward(y,t)

  def gradient(self, x, t):
    # 순전파
    self.loss(x,t)

    # 역전파
    dout = 1
    dout = self.last_layer.backward(dout)

    layers = list(self.layers.values())
    layers.reverse()
    for layer in layers:
      dout = layer.backward(dout)

    grads = {}
    grads["W1"] = self.layers["Conv1"].dW
    grads["b1"] = self.layers["Conv1"].db
    grads["W2"] = self.layers["Affine1"].dW
    grads["b2"] = self.layers["Affine1"].db
    grads["W3"] = self.layers["Affine2"].dW
    grads["b3"] = self.layers["Affine2"].db

    return grads

## CNN with MNIST

### 1. Dataset

In [22]:
from torchvision import datasets
import numpy as np

In [52]:
def load_mnist(normalize=True, flatten=True):
  # MNIST dataset
  mnist_train = datasets.MNIST(root="./data/", train=True, download=True)
  mnist_test = datasets.MNIST(root="./data/", train=False, download=True)
  print ("mnist_train:\n",mnist_train,"\n")
  print ("mnist_test:\n",mnist_test,"\n")
  print ("Done.")
  
  train_img = []
  train_label = []
  for data in mnist_train:
    train_img.append(np.array(data[0]).reshape(-1, 784))
    train_label.append(data[1])

  test_img = []
  test_label = []
  for test_data in mnist_test:
    test_img.append(np.array(test_data[0]).reshape(-1, 784))
    test_label.append(test_data[1])
  
  train_img = np.array(train_img)
  train_label = np.array(train_label)
  test_img = np.array(test_img)
  test_label = np.array(test_label)


  if normalize:
    train_img = train_img.astype(np.float32)
    train_img /= 255.0
    test_img = test_img.astype(np.float32)
    test_img /= 255.0

  if not flatten:
    train_img = train_img.reshape(-1, 1, 28, 28)
    test_img = test_img.reshape(-1, 1, 28, 28)

  return train_img, train_label, test_img, test_label
    

### 2. Trainer

In [None]:
# optimizer
class SGD:
  def __init__(self, lr=0.01):
    self.lr = lr
  
  def update(self, params, grads):
    for key in params.keys():
      params[key] -= self.lr * grads[key]

class AdaGrad:
  def __init__(self, lr=0.01):
    self.lr =lr
    self.h = None

  def update(self, params, grads):
    if self.h == None:
      self.h = {}
      for key, val in params.items():
        self.h[key] = np.zeros_like(val)

    for key in params.keys():
      self.h[key] += grads[key] * grads[key]
      params[key] -= self.lr * grads[key] / (np.sqrt(self.h[key]) + 1e-7)

In [None]:
# Trainer

class Trainer:
  def __init__(self, network, x_train, t_train, x_test, t_test,
               epochs=20, mini_batch_size=100,
               optimizer="SGD", optimizer_param={"lr":0.01},
               eval_sample_num_per_epoch=None, verbose=True):
    self.network = network
    self.verbose = verbose
    self.x_train = x_train
    self.t_train = t_train
    self.x_test = x_test
    self.t_test = t_test
    self.epochs = epochs
    self.mini_batch_size = mini_batch_size
    self.eval_sample_num_per_epoch = eval_sample_num_per_epoch
    
    optimizer_dict = {"sgd" : SGD, "adagrad" : AdaGrad}
    self.optimizer = optimizer_dict[optimizer.lower()](**optimizer_param)
    self.train_size = x_train.shape[0]
    self.iter_per_epoch = max(self.train_size/mini_batch_size, 1)
    self.current_iter = 0
    self.current_epoch = 0
    self.train_loss_list = []
    self.train_acc_list = []
    self.test_acc_list= []

  def train_step(self):
    batch_mask = np.random.choice(self.train_size, self.batch_size)
    x_batch = self.x_train[batch_mask]
    t_batch = self.t_train[batch_mask]

    grads = self.network.gradient(x_batch, t_batch)
    self.optimizer.update(self.network.params, grads)

    loss = self.network.loss(x_batch, t_batch)
    self.train_loss_list.append(loss)
    if self.verbose:
      print("Train loss: " + str(loss))

    if self.current_iter % self.iter_per_epoch == 0:
      self.current_epoch += 1

      x_train_sample, t_train_sample = self.x_train, self.t_train
      x_test_sample, t_test_sample = self.x_test, self.t_test
      if not self.eval_sample_num_per_epoch is None:
        t = self.eval_sample_num_per_epoch
        x_train_sample, t_train_sample = self.x_train[:t], self.t_train[:t]
        x_test_sample, t_test_sample = self.x_test[:t], self.t_test[:t]

      train_acc = self.network.accuracy(x_train_sample, t_train_sample)
      test_acc = self.network.accuracy(x_test_sample, t_test_sample)
      self.train_acc_list.append(train_acc)
      self.test_acc_list.append(test_acc)

      if self.verbose:
        print("=== Epoch: " + str(self.current_epoch) + " , Train acc" + str(train_acc) +  ", Test acc:" + str(test_acc) + "===")
      
      self.current_iter += 1

    def train(self):
      for i in range(self.max_iter):
        self.train_step()

      test_acc = self.network.accuracy(self.x_test, self,t_test)
      if self.verbose:
        print("=== Final Accuracy: " + str(test_acc))