<a href="https://colab.research.google.com/github/nan-park/cp1_project/blob/main/main.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **0과 1을 분류하는 인공신경망 프로그래밍**

# **Import**

In [1]:
# (체크) vscode에 주석 달았던 것들 나중에 colab에도 옮겨놓기

In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import csv

# **Data Load**

In [3]:
from google.colab import drive  # google drive mount
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
# 데이터 불러오기
def data_load():
  df = pd.read_csv('/content/drive/MyDrive/binary_dataset.csv')
  return df
# df = data_load()

In [5]:
# df.head()

# **Data Split**

In [6]:
# 학습/테스트 데이터 뒤섞기
def df_shuffle(df):
  return df.sample(frac=1).reset_index(drop=True)

# 특성, 타겟 나누기
def divide_xy(df):
  target = 'y'
  features = list(df.columns)
  features.remove(target)

  X = df[features]
  y = np.array(df[target]).reshape(-1, 1)
  return X, y

# 학습/테스트 데이터 분리하기
def train_test_divide(X, y, test_size=0.2):  # X: pandas dataframe, y: numpy array
  length = len(y)
  test_index = int(length * test_size)

  X_test = X[:test_index]
  y_test = y[:test_index]

  X_train = X[test_index:]  
  y_train = y[test_index:]

  return X_train, y_train, X_test, y_test

In [7]:
# 위의 함수 통합
def train_test_split(df, shuffle=True, test_size=0.2):
  if shuffle:
    df = df_shuffle(df)
  
  X, y = divide_xy(df)
  return train_test_divide(X, y, test_size=test_size)

# X_train, y_train, X_test, y_test = train_test_split(df)

In [8]:
# 미니배치 설정
def split_mini_batch(X, y, n): # train, test 들어올 예정
  # 4개씩 미니배치 설정. 나머지는 버리기
  length = len(y)
  num = length // n # 미니배치 개수
  X_batch_list = []
  y_batch_list = []
  for i in range(num):
    i = i * n
    # 비복원 추출. 데이터가 적기 때문에 겹치지 않는 게 나을 듯.
    X_batch_list.append(X[i:i+n]) # index: 0~4, 4~8, 8~12, ...
    y_batch_list.append(y[i:i+n])
  return X_batch_list, y_batch_list

# X_batch_list, y_batch_list = set_mini_batch(X_train, y_train)

# **Weight and Bias Initialization**

In [9]:
# Xavier 초기화(활성화함수가 시그모이드일 때 잘 동작), 편향 포함한 후에 분리하기
# 이전 층 노드 개수가 n, 현재 층 노드 개수 m일 때, 표준편차가 2/루트(n+m)인 정규분포로 초기화
def initialize_parameter(n, m):
  init = np.random.normal(0, 2/((n+m)**2), (n+1, m))
  W = init[:-1, :]
  b = init[-1, :]
  return W, b

# initialize_parameter(4, 5)

In [10]:
def sigmoid(x):
    return 1 / (1 +np.exp(-x))

def classification(x):
  if x < 0.5:
    return 0
  else:
    return 1

# **Sequential Layers**

In [11]:
class Layer():  # 입력층, 은닉층, 출력층
  def __init__(self, node_num, activation='linear'):
    self.node_num = node_num  # 레이어의 노드 개수
    self.activation = activation  # 활성화 함수
    self.prev = None  # 이전 층
    self.next = None  # 다음 층

  def set_weights(self):  # 가중치 행렬, 편향 초기화
    if self.prev is not None:
      prev_node_num = self.prev.node_num
      self.W, self.b = initialize_parameter(prev_node_num, self.node_num)
  
  # input 행렬 X
  @property
  def X(self):
    return self._X
  @X.setter
  def X(self, value):
    self._X = value

class Dense(Layer): # 은닉층, 출력층
  def __init__(self, node_num, activation='linear'):
    # super().__init__(self, node_num)
    self.node_num = node_num
    self.activation = activation
    self.prev = None
    self.next = None
  
  def output(self):
    answer = np.dot(self._X, self.W) + self.b
    if self.activation == 'linear': # 활성화함수 없음(선형함수)
      return answer
    elif self.activation == 'sigmoid': # 활성화함수 : 시그모이드
      answer = sigmoid(answer)
      return answer

class Input(Layer): # 입력층
  def __init__(self, node_num, activation='linear'):
    self.node_num = node_num
    self.activation = activation
    self.prev = None
    self.next = None
  # 입력층의 경우 input을 그대로 출력한다
  def output(self): 
    return self._X

In [12]:
# Sequential([])에 layer 쌓고 서로 연결되도록 하기. 가중치 초기화 가능해야 함
class Sequential():
  def __init__(self, layer_list): # Layer들을 서로 링크드리스트로 연결. 처음과 끝 지정.
    # layer가 없는 경우
    if len(layer_list)==0:
      self.head = None
      self.tail = None
    # layer가 1개인 경우
    elif len(layer_list)==1:
      self.head = layer_list[0]
      self.tail = layer_list[0]
    else: # layer가 2개 이상인 경우
      self.head = layer_list[0]
      iterator = self.head
      for layer in layer_list[1:-1]:
        layer.prev= iterator
        iterator.next = layer
        iterator = layer
      iterator.next = layer_list[-1]
      self.tail = layer_list[-1]
      self.tail.prev = iterator

    # 가중치, 편향 초기화
    iterator = self.head
    while iterator:
      iterator.set_weights()
      iterator = iterator.next

  # input 행렬 X
  @property
  def input(self):
    return self._input
  @input.setter
  def input(self, value):
    self._input = value
    self.head.X = value # 처음 층 input에도 들어가도록 하기

  def output(self):
    iterator = self.head  # Input
    while iterator.next:
      iterator.next.X = iterator.output()
      iterator = iterator.next
    return iterator.output()
    

In [13]:
# model = Sequential([Input(8), Dense(16), Dense(32), Dense(1, activation='sigmoid')])

# **Predict and Evaluate**

In [14]:
def predict(X_batch, y_batch, model):
    model.input = X_batch
    y_pred_prob = model.output()
    return y_pred_prob


def evaluate(X_batch_list, y_batch_list, model):
    def accuracy(y_pred, y_batch):
        return sum(y_pred == y_batch)[0] / y_pred.shape[0]

    def cross_entropy(y_pred_prob, y_batch):
        delta = 1e-7
        return -np.sum(y_batch * np.log(y_pred_prob + delta)) # / y_batch.shape[0]  # (체크)

    def classification(x):
        # if x < 0.5:
        #     return 0
        # else:
        #     return 1
        # return round(x, 0)
        return 0 if x < 0.5 else 1

    accuracy_list = []  # 미니배치의 정확도 리스트
    cross_entropy_list = [] # 미니배치의 손실함수 리스트
    classify = np.vectorize(classification) # 분류 작업의 함수화
    for i in range(len(X_batch_list)):  # 미니배치마다 따로 수행
        X_batch = X_batch_list[i]
        y_batch = y_batch_list[i]
        y_pred_prob = predict(X_batch, y_batch, model)  # 예측 확률(0~1)
        y_pred = classify(y_pred_prob)  # 예측(0, 1)

        accuracy_list.append(accuracy(y_pred, y_batch)) # 정확도
        cross_entropy_list.append(cross_entropy(y_pred_prob, y_batch))  # 손실함수

    accuracy = np.mean(accuracy_list)
    cross_entropy = np.sum(cross_entropy_list)
    return accuracy, cross_entropy


# **Main**

In [15]:
def main():
  # csv 데이터 불러오기
  df = data_load()

  # train, test 데이터로 나누기
  X_train, y_train, X_test, y_test = train_test_split(df)
  # 미니배치(4개 데이터)로 나누기
  X_batch_list, y_batch_list = split_mini_batch(X_train, y_train, 4) # train 데이터만

  # 모델 만들기(입력층, 은닉층, 출력층)
  model = Sequential([Input(8), Dense(32), Dense(8), Dense(1, activation='sigmoid')])

  # 정확도, 손실함수 측정하기
  accuracy, cross_entropy = evaluate(X_batch_list, y_batch_list, model)
  # 결과 출력
  print(f"[Epoch 1] TrainData - Loss = {round(cross_entropy, 3)}, Accuracy = {round(accuracy, 3)}")

main()

[Epoch 1] TrainData - Loss = 3.447, Accuracy = 0.312
