In [105]:
import numpy as np

In [106]:
from tensorflow.keras import datasets

(X_tn0, y_tn0), (X_te0, y_te0) = datasets.mnist.load_data()

In [107]:
input = X_tn0[0]
input.shape

(28, 28)

In [108]:
target = np.array(y_tn0[0]).reshape(1, -1)
target

array([[5]], dtype=uint8)

In [109]:
# 비용 함수 클래스, 비용 함수의 종류에 따라 다른 클래스 내 함수를 사용한다.
class cost_function:
  # 예측값
  predict = []
  # 타겟값
  target = []
  # 비용 함수값
  error_cost = []

  # 오차 제곱합
  def errer_squared_sum(self, predict, target):
    self.predict = predict
    self.target = target

    self.error_cost = np.sum(0.5*((predict - target)**2))
    return self.error_cost

  # 오차 제곱합 미분 함수
  def diff_error_squared_sum(self, predict, target):
    self.predict = predict
    self.target = target

    return self.predict - self.target

In [110]:
# 활성화 함수
class activation_function:
  # 시그모이드 함수
  def sigmoid(self, x):
    return 1 / (1+np.e**-x)
  
  # 시그모이드 함수의 미분 함수
  def sigmoid_diff(self, x):  
    return self.sigmoid(x) * (1 - self.sigmoid(x))

In [111]:
class PADDING:

  def padding(self, input, padding_size):
    """
    input : 입력 데이터
    padding_size : padding 연산 후의 데이터 크기
    """
    if(padding_size == 0):
      return input

    padding_matrix = np.zeros((input.shape[0] + padding_size, input.shape[1] + padding_size))

    for i in range(input.shape[0]):
      for j in range(input.shape[1]):
        padding_matrix[i + int(padding_size / 2)][j + int(padding_size / 2)] = input[i][j]
    
    return padding_matrix


In [112]:
class POOLING:
  pad = PADDING()

  pool_result = []

  def max_pooling(self, input, pooling_size):
    """
    input : 입력 데이터
    pooling_size : pooling size
    """
    self.pool_result = []
    
    pooling_matrix = self.pad.padding(input, input.shape[0] % pooling_size)

    for col in range(0, pooling_matrix.shape[0], pooling_size):
      for row in range(0, pooling_matrix.shape[1], pooling_size):
        pool_arr = []
        for pooling_col in range(pooling_size):
          for pooling_row in range(pooling_size):        
            pool_arr.append(pooling_matrix[pooling_col + col, pooling_row + row])
        self.pool_result.append(max(pool_arr))
    
    # 연산 결과를 크기에 맞게 바꿔준다.
    return np.array(self.pool_result).reshape(int(pooling_matrix.shape[0] / pooling_size), -1)



In [113]:
class MLP:
  #입력값
  input_data = []

  #가중치
  weight = []

  #편향값
  bias = []
    
  # 각 층별 노드들의 출력
  node_output = []

  # 타겟, 목푯값
  target = []

  # 각 층의 델타 값의 저장
  delta = []

  # 비용 함수
  cost = cost_function()

  # 활성화 함수
  activation = activation_function()

  # 가중치 업데이트 크기
  weight_update = []
  
  #순전파 계산
  def forward_cal(self, input, node_count):

    # 가중치의 임의 생성, node_count 개수만큼의 은닉(또는 출력) 노드가 존재한다.
    # (n,1) 개의 입력 값과 m 개의 노드 연결 (m, n) 크기의 가중치가 존재해야 한다.
    weight = np.random.rand(node_count, input.shape[0])

    #편향값의 임의 생성, 동일한 편향 값을 사용한다.
    bias = np.random.rand(1)

    #가중치와 노드 출력의 행렬곱연산, 편향값 덧셈
    hidden_input = weight @ input + bias
            
    #노드 입력과 활성화 함수 연산을 통한 노드 출력 계산
    # cnn 에선 0과 1 사이의 결과를 출력해야 하기 때문에 시그모이드 활성화 함수를 사용용
    output = self.activation.sigmoid(hidden_input)
            
    #노드 출력의 저장(델타 값의 계산을 위해)
    self.node_output.append(output)

    # 가중치 값 저장
    self.weight.append(weight)

    # 편향 값 저장
    self.bias.append(bias)

    return output

  def cost_function(self, target):
    self.target = target
    return self.cost.errer_squared_sum(self.predict, target)

  # 비용 함수에 대한 delta 값 계산
  def cal_delta_result(self, predict, target):  
    #출력층 노드의 변화량에 대한 오차 함수의 변화량 계산
    delta = (self.cost.diff_error_squared_sum(predict, target) * self.activation.sigmoid_diff(predict))

    self.delta.append(delta)

  # 은닉층에 대한 delta 값 계산, 가중치 변화량이 누적된다.
  def cal_delta_hidden(self, node_input):
    # 이전층의 delta 값을 구하기 위해 
    delta = self.weight[len(self.weight) - len(self.delta)].T @ self.delta[(len(self.delta) - 1)]
    delta = delta * self.activation.sigmoid_diff(node_input)

    print(delta.shape)

    self.delta.append(delta)

  #델타(노드 변화량에 대한 비용 함수의 변화량) 계산
  def cal_delta(self):

    #출력층 노드의 변화량에 대한 오차 함수의 변화량 계산
    delta = (self.cost.diff_error_squared_sum() * self.activation.sigmoid_diff(self.predict))
    self.delta.append(delta)

    #이전층 노드의 delta 값 계산과 함께, 가중치 변화량에 대한 비용 함수의 변화량을 계산한다.
    for i in range(len(self.node_output) - 2, -1, -1):

      weight_update = (delta @ self.node_output[i].T).T
      self.weight_update.append(weight_update)
      delta = self.weight[i]@delta
      delta = delta * self.activation.sigmoid_diff(self.node_output[i])
      self.delta.append(delta)
  
  # 가중치 업데이트 크기 계산 함수
  def update_weight(self, learning_rate):
    # 거꾸로 계산된 가중치 업데이트 배열을 뒤집어준다.
    self.weight_update = self.weight_update[::-1]
    for i in range(len(self.weight)):
      weight = self.weight[i]
      weight_update = self.weight_update[i]

      self.weight[i] = weight - weight_update * learning_rate


In [114]:
class CNN:

  # 필터의 개수만큼 가중치가 존재
  cnn_weight = []
  
  # 패딩
  pad = PADDING()

  # 풀링
  pool = POOLING()

  # MLP
  mlp = MLP()  

  # 데이터 크기
  data_size = 0

  # 레이어 구현
  layer_result = []

  # 현재 레이어 
  layer = 0

  # delta 값
  delta = []

  # target 값
  target = []
  
  def cal_cnn(self, input, target, filter_size, filter_count):
    # 각 필터별 연산 결과를 저장할 리스트
    filter_result_arr = []

    # 가중치 행렬 생성
    weight_arr = []

    # 타겟값 저장
    self.target = target

    # 입력 받은 필터의 개수만큼 반복
    for i in range(filter_count):

      # 필터 크기에 맞는 임의의 가중치 생성
      weight = np.random.random(filter_size * filter_size).reshape(filter_size, filter_size)
      
      weight_arr.append(weight)
      
      # 합성곱 연산 결과가 저장된다.
      result_arr = []

      # 합성곱 연산 수행
      for col in range(input.shape[0] - weight.shape[0] + 1):
        for row in range(input.shape[1] - weight.shape[1] + 1):
          result = []
          for w_col in range(weight.shape[0]):
            for w_row in range(weight.shape[1]):
              result.append(input[col + w_col, row + w_row ] * weight[w_col, w_row])
          result_arr.append(np.sum(result))

      # 연산 결과를 크기에 맞게 바꿔준다.
      result_arr = np.array(result_arr).reshape(input.shape[0] - weight.shape[0] + 1, -1)

      # 연산 결과의 저장 (numpy 형태)
      filter_result_arr.append(result_arr)
    
    # 가중치 행렬의 저장
    self.cnn_weight.append(weight_arr)

    # 층에다가 결과를 저장한다. 모든 필터의 결과가 저장
    self.layer_result.append(filter_result_arr)

    # 레이어의 추가
    self.layer = self.layer +1

  def same_padding_cnn(self, input, filter_size, filter_count):
    # 각 필터별 연산 결과를 저장할 리스트
    filter_result_arr = []
    
    # 데이터 크기 지정
    self.data_size = input.shape[0]

    # 입력 받은 필터의 개수만큼 반복
    for i in range(filter_count):

      # 필터 크기에 맞는 임의의 가중치 생성
      weight = np.random.random(filter_size * filter_size).reshape(filter_size, filter_size)
      
      self.cnn_weight.append(weight)
      
      # 합성곱 연산 결과가 저장된다.
      result_arr = []

      # 합성곱 연산 수행행
      for col in range(input.shape[0] - weight.shape[0] + 1):
        for row in range(input.shape[1] - weight.shape[1] + 1):
          result = []
          for w_col in range(weight.shape[0]):
            for w_row in range(weight.shape[1]):
              result.append(input[col + w_col, row + w_row ] * weight[w_col, w_row])
          result_arr.append(np.sum(result))

      # 연산 결과를 크기에 맞게 바꿔준다.
      result_arr = np.array(result_arr).reshape(input.shape[0] - weight.shape[0] + 1, -1)

      result_arr = self.pad.padding(result_arr, input.shape[0] - result_arr.shape[0])

      # 연산 결과의 저장
      filter_result_arr.append(result_arr)
    
    # 층에다가 결과를 저장한다.
    self.layer_result.append(filter_result_arr)

    # 레이어의 추가
    self.layer = self.layer +1

  def pooling(self, pooling_size):
    # 각 필터별 연산 결과를 저장할 리스트
    filter_result_arr = []

    # 이전 레이어의 필터 개수만큼 반복.
    for i in range(len(self.layer_result[self.layer - 1])):
      # numpy array 의 반환
      pooling_result = self.pool.max_pooling(self.layer_result[self.layer - 1][i], pooling_size)

      filter_result_arr.append(pooling_result)
    
    self.layer_result.append(filter_result_arr)

    # 레이어의 추가
    self.layer = self.layer +1

  # 두 번째 층 이후의 cnn 연산 함수, 입력값은 이전 층의 출력이 된다.
  def layer_cnn(self, filter_size, filter_count):
    # 각 필터별 연산 결과를 저장할 리스트
    filter_result_arr = []

    # 각 layer 별 가중치를 저장할 리스트
    weight_arr = []

    # 입력 받은 필터의 개수만큼 반복
    for i in range(filter_count):

      # 필터 크기에 맞는 임의의 가중치 생성
      weight = np.random.random(filter_size * filter_size).reshape(filter_size, filter_size)
      
      weight_arr.append(weight)
      
      # 합성곱 연산 결과가 저장된다.
      result_arr = []

      # 합성곱 연산 수행. 이전 레이어의 필터 출력 크기에 맞게 반복이 시행된다.
      for col in range(self.layer_result[self.layer - 1][0].shape[0] - weight.shape[0] + 1):
        for row in range(self.layer_result[self.layer - 1][0].shape[1] - weight.shape[1] + 1):
          result = []
          for w_col in range(weight.shape[0]):
            for w_row in range(weight.shape[1]):
              result.append(input[col + w_col, row + w_row ] * weight[w_col, w_row])
          result_arr.append(np.sum(result))
          
      # 연산 결과를 크기에 맞게 바꿔준다.
      result_arr = np.array(result_arr).reshape(self.layer_result[self.layer - 1][0].shape[0] - weight.shape[0] + 1, -1)

      result_arr = self.pad.padding(result_arr, self.layer_result[self.layer - 1][0].shape[0] - result_arr.shape[0])

      # 연산 결과의 저장
      filter_result_arr.append(result_arr)
    
    # 층에다가 결과를 저장한다.
    self.layer_result.append(filter_result_arr)

    # 가중치의 저장
    self.cnn_weight.append(weight_arr)

    # 레이어의 추가
    self.layer = self.layer +1

  def flatten(self):
    data = self.layer_result[self.layer - 1]
    
    data_arr = []

    # 필터의 개수만큼 반복
    for filter_count in range(len(data)):
      for filter_size_col in range(data[filter_count].shape[0]):
        for filter_size_row in range(data[filter_count].shape[1]):
          data_arr.append(data[filter_count][filter_size_col][filter_size_row])

    self.layer_result.append(np.array(data_arr).reshape(-1,1))

    self.layer = self.layer + 1

  # MLP 층의 연결, 순전파 연산

  def mlp_forward_cal(self, node_count):
    self.layer_result.append(self.mlp.forward_cal(self.layer_result[self.layer-1], node_count))

    self.cnn_weight.append(self.mlp.weight)

    self.layer = self.layer +1   

  def cal_delta_result_layer(self):

    self.mlp.cal_delta_result(self.layer_result[self.layer - 1], self.target)

  def cal_delta_hidden_layer(self):
    # 노드 입력값을 넘겨준다.
    self.delta.append(self.mlp.cal_delta_hidden(self.layer_result[self.layer - len(self.mlp.delta) - 1]))




In [115]:
cnn = CNN()

In [116]:
cnn.target = target

In [117]:
cnn.same_padding_cnn(input, 7, 64)

In [118]:
cnn.pooling(2)

In [119]:
cnn.layer_cnn(3, 128)

In [120]:
cnn.pooling(2)

In [121]:
cnn.layer_cnn(3, 256)

In [122]:
cnn.pooling(2)

In [123]:
cnn_result = cnn.flatten()

In [124]:
cnn.mlp_forward_cal(128)

In [125]:
cnn.mlp_forward_cal(64)

In [126]:
cnn.mlp_forward_cal(10)

In [127]:
cnn.cal_delta_result_layer()

In [128]:
cnn.cal_delta_hidden_layer()

(64, 1)


In [131]:
cnn.cal_delta_hidden_layer()

(128, 1)


In [132]:
cnn.cal_delta_hidden_layer()

(4096, 1)
