In [1]:
import numpy as np
import pandas as pd
import csv
import matplotlib.pyplot as plt

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
from typing import List
# 1. 데이터를 불러오는 함수
def load_data(src : str) -> List[pd.DataFrame]:

  data : pd.DataFrame = pd.read_csv(src)
  x : pd.DataFrame = data.iloc[:, :-1]
  y : pd.DataFrame = data.iloc[:, -1]

  return x, y

In [4]:
from typing import Union
# 2. 가중치와 편향을 초기화하는 함수
# 초기화에 관한 부분에 대한 reference : https://reniew.github.io/13/
def initialize_parameters(x : np.ndarray or pd.DataFrame, d_out:int,
                          init:str = "xavier_normal",
                          use_bias:bool=True) -> Union[np.array, List[np.array]]:

    d_in : int = x.shape[1] # 입력 변수의 차원

    # 편향을 사용하는 경우
    if use_bias:
      # 초기화를 수월하게 진행할 수 있도록
      # 가중치의 크기에 편향의 크기를 넣어서 계산한다.
      w_size : tuple = (d_in+1, d_out)
    # 편향을 사용하지 않는 경우
    else:
      w_size : tuple = (d_in, d_out)

    # he 초기화의 경우
    if init == "he":
      # 분산의 값이 (2/입력)의 root 값
      v : float = (2/d_in)**0.5
      w : np.array = np.random.normal(0, v, w_size)
    # Xavier 초기화 중 정규 분포를 통한 초기화인 경우
    elif init == "xavier_normal":
      v : float = (2/(d_in+d_out))**0.5
      w : np.array = np.random.normal(0, v, w_size)
    # Xavier 초기화 중 균일 분포를 통한 초기화인 경우
    elif init == "xavier_uniform":
      u : float = (6/(d_in+d_out))**0.5
      w : np.array = np.random.uniform(-u, u, w_size)
    else:
      raise ValueError(f"There is no implementation for [{init}] initialization")

    # 편향을 사용하는 경우
    if use_bias:
      # 가중치와 편향을 반환
      return w[:-1], w[-1]
    # 편향을 사용하지 않는 경우
    else:
      # 가중치만 반환
      return w, np.zeros(d_out)

In [5]:
# 3. 데이터의 row를 셔플하는 함수
def data_shuffle(x: pd.DataFrame, y: pd.DataFrame) -> pd.DataFrame:

  idx = np.random.permutation(x.index)
  x = x.reindex(idx)
  y = y.reindex(idx)
  return x, y

In [6]:
from typing import List
# 4. 학습 데이터와 테스트 데이터를 분리하는 함수
def train_test_split(x : pd.DataFrame, y:pd.DataFrame,
                     test_size : float = 0.2) -> List[pd.DataFrame]:
  
  # 데이터의 row 수
  n : int = x.shape[0]

  test_x : pd.DataFrame = x.iloc[:int(n*test_size)]
  test_y : pd.DataFrame = y.iloc[:int(n*test_size)]
  train_x : pd.DataFrame = x.iloc[int(n*test_size):]
  train_y : pd.DataFrame = y.iloc[int(n*test_size):]
  return train_x, train_y, test_x, test_y

In [7]:
from typing import Iterator
# 5. mini-batch 생성기
def mini_batch_generator(x : pd.DataFrame, y: pd.DataFrame,
                         batch_size : int = 4) -> Iterator[pd.DataFrame]:
  
  n : int = x.shape[0]

  x, y = data_shuffle(x, y)

  for j in range(n//batch_size): # 데이터 크기를 배치 사이즈로 나눈만큼 반복한다.
    yield x.iloc[j*batch_size:j*batch_size+batch_size], \
           y.iloc[j*batch_size:j*batch_size+batch_size]

In [8]:
# 6. sigmoid 함수
def sigmoid(x : np.ndarray or pd.DataFrame):
  return 1./(1.+np.exp(-x))

# 7. Binary Cross entropy
def binaryCrossEntropy(y_true : np.ndarray or pd.DataFrame,
                       y_pred : np.ndarray or pd.DataFrame,
                       c : float = 1e-6) -> float:

  y_true = np.squeeze(y_true)
  y_pred = np.squeeze(y_pred)
  # y의 실제값과 y의 예측값의 shape가 같지 않다면 ValueError
  if y_true.shape != y_pred.shape:
    raise ValueError("Shape of true value of y and prediction of y must be equal.")

  return - np.mean(y_true * np.log(y_pred+c))

# 8. 정확도 연산 기능
def get_accuracy(y_true : np.ndarray or pd.DataFrame,
                 y_pred : np.ndarray or pd.DataFrame):
  
  y_true = np.squeeze(y_true)
  y_pred = np.squeeze(y_pred)
  # y의 실제값과 y의 예측값의 shape가 같지 않다면 ValueError
  if y_true.shape != y_pred.shape:
    raise ValueError("Shape of true value of y and prediction of y must be equal.")

  return np.mean(y_pred == y_true)



In [9]:
# 8. 훈련 데이터를 통해 정확도를 계산하는 함수
def compute_network_train_acc(x : np.ndarray or pd.DataFrame,
                              y : np.ndarray or pd.DataFrame,
                              w : np.ndarray, b : np.ndarray,
                              batch_size : int = 4) -> float:
    acc = []
    # generator로부터 x와 y 값을 받아오는 generator
    for batch_x, batch_y in mini_batch_generator(x, y, batch_size=batch_size):
      # 행렬 연산
      logit = sigmoid(np.matmul(batch_x, w)+ b)
      # logit 값이 0.5보다 크다면 class 1 아니라면 0
      y_pred = (logit >= 0.5).applymap(int)
      acc.append(get_accuracy(batch_y, y_pred))
    return np.mean(acc)

In [23]:
# 시그모이드를 미분한 함수
def sigmoid_derivative(x: np.ndarray or pd.DataFrame) -> Union[np.ndarray, pd.DataFrame]:

  s = sigmoid(x)
  return s * (1-s) 

# 역전파를 수행하는 함수
# 역전파를 위한 미분 계산 : https://smwgood.tistory.com/6
def backprop(x :np.ndarray or pd.DataFrame,
             y : np.ndarray or pd.DataFrame,
             logit : np.ndarray or pd.DataFrame,
             w : np.ndarray, b : np.ndarray,
             learning_rate :float = 1e-4) -> None:

  # pandas 형식을 numpy array로 형식을 바꿔주고 차원 축을 동일하게 고정해둔다.
  x = np.asarray(x)
  y = np.asarray(np.squeeze(y))[:, np.newaxis]
  logit = np.asarray(np.squeeze(logit))[:, np.newaxis]

  # sigmoid를 activation으로 사용하는 cross entropy의 미분 값 계산
  target_error : np.ndarray = y - logit

  # 이전 항으로 미분 값 이전을 위한 체인 룰 계산
  target_error_derivative : np.ndarray = target_error * sigmoid_derivative(logit)

  # 가중치 업데이트
  w_delta = learning_rate * np.mean(x.T.dot(target_error_derivative), axis=1)
  w += w_delta[:, np.newaxis]

  if len(b.shape) == 1:
    b_delta = learning_rate * np.mean(target_error_derivative, axis=0)
  else:
    b_delta = learning_rate * np.mean(target_error_derivative, axis=1)[:, np.newaxis]

  # 편향 업데이트
  b += b_delta


In [13]:
# 9. 훈련 데이터를 통해 신경망 연산을 수행하는 함수
def compute_network_train(x : np.ndarray or pd.DataFrame,
                          y : np.ndarray or pd.DataFrame,
                          w : np.ndarray, b : np.ndarray,
                          batch_size : int = 4, epochs : int = 1) -> None:

  # 손실값을 연산하는 함수
  for epoch in range(epochs):
    errors : list = []
    for batch_x, batch_y in mini_batch_generator(x, y, batch_size = batch_size):
      logit = sigmoid(np.matmul(batch_x, w) + b)
      error = binaryCrossEntropy(batch_y, logit)
      backprop(batch_x, batch_y, logit, w, b) # 역전파
      errors.append(error)
    errors = np.mean(errors)

    acc = compute_network_train_acc(x,y,w,b,batch_size)

    print(f"[Epoch {epoch+1}] TrainData - Loss = {np.round(errors, 4)}, Accuracy = {np.round(acc,4)}")    

In [14]:
# 10. 테스트 데이터를 통해 정확도를 연산하는 함수
def compute_network_test(x : np.ndarray or pd.DataFrame,
                         y : np.ndarray or pd.DataFrame,
                         w : np.ndarray, b : np.ndarray):

  logit = sigmoid(np.matmul(x, w)+ b)
  y_pred = (logit >= 0.5).applymap(int)
  return get_accuracy(y, y_pred)

In [24]:
def main() -> None:
  # file 위치
  DATA_FILE_PATH = "/content/drive/MyDrive/codestates/cp1/binary_dataset.csv"

  # (1) file load
  x, y = load_data(DATA_FILE_PATH)
  # (2) 파라미터와 편향 생성
  w, b = initialize_parameters(x, 1, init="xavier_normal")
  # (3) 데이터 셔플링
  x, y = data_shuffle(x,y)
  # (4) 훈련 데이터와 테스트 데이터 분리
  train_x, train_y, test_x, test_y = train_test_split(x, y)
  # (5) 훈련 데이터를 통한 손실값 및 정확도 계산
  compute_network_train(train_x, train_y, w, b, epochs=10)
  # (6) 테스트 데이터 정확도 연산
  test_acc = compute_network_test(test_x, test_y, w, b)
  print(f"TestData - Accuracy = {np.round(test_acc,4)}") 

In [26]:
main()

[Epoch 1] TrainData - Loss = 4.3108, Accuracy = 0.6875
[Epoch 2] TrainData - Loss = 4.2989, Accuracy = 0.6875
[Epoch 3] TrainData - Loss = 4.1862, Accuracy = 0.6875
[Epoch 4] TrainData - Loss = 4.1419, Accuracy = 0.6875
[Epoch 5] TrainData - Loss = 4.026, Accuracy = 0.6875
[Epoch 6] TrainData - Loss = 3.9074, Accuracy = 0.6875
[Epoch 7] TrainData - Loss = 3.7885, Accuracy = 0.6875
[Epoch 8] TrainData - Loss = 3.6416, Accuracy = 0.6875
[Epoch 9] TrainData - Loss = 3.5642, Accuracy = 0.75
[Epoch 10] TrainData - Loss = 3.4868, Accuracy = 0.75
TestData - Accuracy = 0.75
