<a href="https://colab.research.google.com/github/kyungjaecheong/Binary_ANN_Programming/blob/main/CP1_DS_test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 📝 Codestates AI Bootcamp 15th<br>CP1 (Codestates Project 1) - DS track
---
### 🍷 작성자 : AIB 15기 정경재 (Kyung Jae, Cheong)
---

# 💾 1. 라이브러리 및 데이터 불러오기

## ⚙️ Library Import

In [None]:
# Library Import
import numpy as np
import pandas as pd
import csv
import matplotlib.pyplot as plt

# matplotlib minus표시 설정
plt.rcParams['axes.unicode_minus'] = False

## ⚙️ 기능 1-1. 데이터 불러오기 : data_load()

In [None]:
# 데이터 불러오기 기능
def data_load(filepath):
    '''
    입력값 정보
    filepath : csv 파일 디렉토리
    '''
    # open 함수로 파일을 열기 (처리가 끝나면 파일을 닫도록 with문 활용)
    with open(file=filepath) as csv_file:
        # csv 모듈로 파일을 읽기 (reader)
        csv_reader = csv.reader(csv_file)
        # 첫 열 따로 저장
        row0 = next(csv_reader)
        # 두번째 열부터 데이터를 list로 저장
        data = list(csv_reader)
    
    # pandas로 DataFrame 생성 (column : row0)
    dataframe = pd.DataFrame(data=data, columns=row0)
    # 독립변수(x1~x8)는 float로, 종속변수(y)는 int로 변환
    dataframe = dataframe.astype(float).astype({'y':int})
    
    # 출력 : DataFrame
    return dataframe

# 🛠️ 2. 데이터 전처리 및 분리

## ⚙️ 기능 2-1. 데이터 뒤섞기 기능 : data_shuffle()

In [None]:
# 데이터 뒤섞기 기능
def data_shuffle(data, reset_index=True, seed=2023):
    '''
    입력값 정보
    data : 뒤섞을 데이터(DataFrame)
    reset_index : 기본값 True, 뒤섞인 index를 재설정할지 여부
    seed : 기본값 2023, numpy.random.seed 값
    '''
    # 데이터 길이를 data_length변수에 저장
    data_length = data.shape[0]

    # index 초기값 : 0 ~ n-1
    shuffle_idx = np.arange(data_length)
    # seed 설정 (기본값은 2023)
    np.random.seed(seed)
    # index array 뒤섞기
    np.random.shuffle(shuffle_idx)

    # index를 초기화하지 않을 경우
    if reset_index==False:
        # shuffle_idx로 reindex만 실시
        df = data.reindex(index=shuffle_idx)
    # index를 초기화할 경우(기본값으로 설정되어있음)
    elif reset_index==True:
        # shuffle_idx로 reindex 실시 + reset_index도 실시
        df = data.reindex(index=shuffle_idx).reset_index(drop=True)
    
    # 출력 : 뒤섞인 dataframe
    return df

## ⚙️ 기능 2-2. 데이터 표준화 기능 : standard_scaler()

In [None]:
# 데이터 표준화 기능
def standard_scaler(data, output_dim):
    '''
    입력값 정보
    data : 표준화시킬 데이터(DataFrame)
    output_dim : 출력층의 노드 수(종속변수 수)
    '''
    # X와 y를 일단 분리시키도록 함
    X = data.iloc[:,:-output_dim]
    y = data.iloc[:,-output_dim:]

    # pandas연산에 의해 column 별로 연산이 진행된다(평균값을 빼고 표준편차로 나누기)
    std_scale_X = (X-X.mean()) / X.std()

    # 표준화한 X를 y와 concat하여 다시 DataFrame을 정의
    df = pd.concat([std_scale_X, y], axis = 1)

    # 출력 표준화를 마친 dataframe
    return df

## ⚙️ 기능 2-3. 학습 및 테스트 데이터 분리 기능 : train_test_split()

In [None]:
# 학습 및 테스트 데이터 분리 기능
def train_test_split(data, train_ratio, batch_size):
    '''
    입력값 정보
    data : 분리할 데이터(DataFrame)
    trian_ratio : 훈련 데이터 비율 ( 0 ~ 1 )
    batch_size : mini-batch size (integer)
    '''
    # 데이터의 행 갯수를 data_size로 지정
    data_size = data.shape[0]
    # train_ratio를 곱한 값을 배치사이즈로 나누어 batch데이터의 갯수를 구함
    batch_count = int(data_size*train_ratio) // batch_size
    
    # bacth데이터 갯수와 사이즈를 곱한 값이 test 데이터의 시작 인덱스가 됨
    test_1st_idx = batch_count*batch_size

    # train, test 데이터 분리(pandas)
    train = data.iloc[:test_1st_idx,:]
    test = data.iloc[test_1st_idx:,:]

    # 출력 : train, test dataframe
    return train, test

## ⚙️ 기능 2-4. 독립변수 및 종속변수 분리 기능 : X_y_split()

In [None]:
# 독립변수 및 종속변수 분리
def X_y_split(data, output_dim):
    '''
    입력값 정보
    data : 분리할 데이터 (DataFrame)
    output_dim : 출력층 노드 수 (종속변수 수)
    '''
    # X는 뒤에서 부터 output_dim직전까지의 column 데이터
    X = np.array(data.iloc[:,:-output_dim])
    # y는 뒤에서부터 output_dim이후의 column데이터 (끝에 : 를 꼭 붙여주자)
    y = np.array(data.iloc[:,-output_dim:])

    # 출력 : X, y ndarray
    return X, y

# 🧠 3. 인공신경망(ANN) 프로그래밍

## ⚙️ 기능 3-1. 가중치, 편향 생성 기능 : initialization_parameter()

In [None]:
# 가중치, 편향 생성 기능
def initialization_parameter(input_dim, output_dim, seed=2023):
    '''
    입력값 정보
    input_dim : 입력층 노드수
    output_dim : 출력층 노드수
    seed : 기본값은 2023, numpy.random.seed값
    '''
    # seed 설정 (기본값은 2023으로 설정)
    np.random.seed(seed)
    
    # weight ndarray생성, shape : (input_dim,output_dim)
    weights = np.random.randn(input_dim,output_dim)
    # bias ndarray생성, shape : (output_dim,)
    bias = np.random.randn(output_dim)

    # 출력 : weights, bias
    return weights, bias

## 📊 기능 3-2. 가중치 시각화 기능 : weights_plot()

In [None]:
# 가중치 시각화 기능
def weights_plot(weights):
    '''
    입력값 정보
    weights : 가중치 (ndarray)
    '''
    # 가중치 ndarray -> pandas Series로 변환(인덱싱이 더 편해짐)
    weights_series = pd.Series(np.reshape(weights,(-1)))
    
    # plot 크기를 6x6으로 지정
    plt.figure(figsize=(6,6))
    # barplot 생성
    weights_bar = plt.bar(weights_series.index, weights_series, color = 'pink')
    
    # 각 bar 마다 값 text가 위쪽에 표시되도록 해보기
    for rect in weights_bar:
        height = rect.get_height()
        plt.text(rect.get_x() + rect.get_width()/2.0, height, '%.3f' % height,
                 ha='center', va='bottom', size=8)
    # x값의 순서의 의미가 있는 건 아니라서 x축은 표시되지 않도록 설정
    plt.gca().axes.xaxis.set_visible(False)

    # barplot 출력
    plt.title("Barplot of weights")
    plt.show()

## ⚙️ 기능 3-3. 배치데이터 얻는 함수 : get_batch_data()

In [None]:
# 배치데이터 얻는 기능
def get_batch_data(X, y, size, n):
    '''
    입력값정보
    X : 독립변수(x1~x8)
    y : 종속변수(y)
    size : mini-batch 크기
    n : 0 ~ (batch수 - 1) - for 문을 통해 입력받을 값임
    '''
    X_batch = X[size*n : size*(n+1)]
    y_batch = y[size*n : size*(n+1)]
    # 출력 : 인덱스 batch크기*n부터 batch크기 만큼의 데이터
    return X_batch, y_batch

## ⚙️ 기능 3-4. 활성화함수 : sigmoid()

In [None]:
# 활성화함수 : sigmoid (y : logits)
def sigmoid(y):
    '''
    입력값 정보
    y : 가중치와 편향으로 연산된 logits
    '''
    Y = 1 / (1 + np.exp(-y))
    
    # 출력값 Y : 활성화함수를 거친 최종 확률값
    return Y

## ⚙️ 기능 3-5. 손실함수 : sigmoid_crossentropy_logits()

In [None]:
# 손실 함수 기능 (from_logits=True, 활성화 기능과 손실함수 기능을 한번에 실시함)
def sigmoid_crossentropy_logits(z, y):
    '''
    입력값 정보
    z : 정답 data (Label data)
    y : 가중치와 편향으로 연산된 logits
    '''
    E = y*(1-z) + np.log(1 + np.exp(-y))
    
    # 출력값 E : 손실값
    return E

## ⚙️ 기능 3-6. 손실함수 편미분함수 : sigmoid_crossentropy_logits_prime()

In [None]:
# 역전파를 위한 손실함수 편미분함수(logit값으로 바로 편미분하여 활성화함수의 편미분 단계를 건너뜀)
def sigmoid_crossentropy_logits_prime(z, y):
    '''
    입력값 정보
    z : 정답 data (Label data)
    y : 가중치와 편향으로 연산된 logits
    '''
    dE_dy = -z + sigmoid(y)
    
    # 출력값 dE_dy : 손실값 E를 logit값 y로 편미분하여 얻어낸 기울기값
    return dE_dy

## ⚙️ 기능 3-7. 정확도 연산 기능 : accuracy_score()

In [None]:
# 정확도 연산 기능
def accuracy_score(z, y):
    '''
    입력값 정보
    z : 정답 data (Label data)
    y : 가중치와 편향으로 연산된 logits
    '''    
    # pred : logit의 부호에 따라서 boolean으로 반환 (0이하는 False(0), 0초과는 True(1))
    pred = np.greater(y, 0)
    # real : 0과 1로 이뤄진 label data를 boolean으로 반환 (0은 False(0), 1은 True(1))
    real = np.greater(z, 0.5)
    
    # correct : 예측이 label과 같은 경우 True(1)로 반환
    correct = np.equal(pred, real)
    
    # accuracy : correct를 평균내면 Boolean이 int로 연산되어 정확도가 바로 구해진다
    accuracy = np.mean(correct)
    
    # 출력값 : accuracy
    return accuracy

## 🧠 인공신경망(ANN) 클래스(Class) 구현 : NeuralNetwork() - 순전파, 예측, 검증 기능

In [None]:
# 순전파까지만 Class로 구현해보기(이후 추가기능들은 상속을 통해 추가할 예정)
class NeuralNetwork:
    # 초기함수(Class 선언시 실행되는 초기 함수)
    def __init__(self, input_dim, output_dim):
        '''
        입력값 정보
        input_dim : 입력층 노드 수
        output_dim : 출력층 노드 수
        '''
        # initialization_parameter 함수를 실행시켜 초기 가중치와 편향을 저장
        self.w, self.b = initialization_parameter(input_dim, output_dim)
    
    # 가중치 시각화 함수(가중치 변화를 확인해보기 위해 class내부에 포함시켰음)
    def weights_plot(self):
        # self.w로 저장되어있는 가중치를 bar plot으로 시각화함
        weights_plot(self.w)
    
    # 가중합 함수 (가중치와 편향에대한 연산 결과 값(logit)을 얻어내는 기능)
    def weight_sum(self, X):
        '''
        입력값 정보
        X : 독립변수(x1~x8)
        '''
        # y : X와 가중치행렬(self.w)의 행렬곱 + 편향값
        y = np.matmul(X, self.w) + self.b
        return y
    
    # 순전파 기능(학습과정이 아닌 단순 순전파 기능)
    def feed_forward(self, X_data, y_data, mb_size, verbose=1):
        '''
        입력값 정보
        X_data : input 변수(x1~x8), X_train
        y_data : label 변수(y), y_train
        mb_size : Mini-Batch 크기
        verbose : 기본값은 1, 훈련 결과를 출력 할지 여부를 결정함
            verbose = 0 : 출력안함
            verbose = 1 : 1 epoch에 대한 평균 loss와 accuracy출력
            verbose = 2 : batch별 데이터까지 함께 출력
        '''
        # Batch 갯수를 계산
        batch_count = int(X_data.shape[0]) // mb_size
        # 손실과 정확도를 담을 빈 리스트를 생성
        losses, accs = [], []
        
        # for 문으로 batch 연산을 반복함
        for batch in range(batch_count):
            # X(input), z(label)를 get_batch_data 함수로부터 얻어냄
            X, z = get_batch_data(X_data, y_data, size=mb_size, n=batch)
            # X(input) -> y(가중합) 연산
            y = self.weight_sum(X=X)
            # 각 값들의 손실값(E) 연산
            E = sigmoid_crossentropy_logits(z=z, y=y)
            # loss : 미니배치의 평균 손실값
            loss = np.mean(E)
            # accuracy : 미니배치의 정확도값
            accuracy = accuracy_score(z=z, y=y)
            
            # 손실값과 정확도를 리스트에 append하기(소수점 셋째자리로 반올림)
            losses.append(round(loss, 3))
            accs.append(round(accuracy, 3))
        
        # verbose 기능(기본값은 1)
        if verbose in [1,2]:
            # 평균 loss와 평균 accuracy를 출력(소수점 셋째자리로 반올림)
            print("[Epoch 1] TrainData - Loss = {:.3f}, Accuracy = {:.3f}"
                  .format(np.mean(losses), np.mean(accs)))
            if verbose == 2:
                # 각 배치별 loss와 accuracy를 확인하기 위한 출력(verbose==2로 설정해야 보임)
                print(f"\tBatch Size : {mb_size}\n\tMini-Batches : {batch_count}\
                    \n\tLoss : {losses}\n\tAccuracy : {accs}")
        elif verbose == 0:
            # verbose = 0 으로 주어지면 출력안하고 pass함
            pass
    
    # 예측 기능(prediction, ndarray로 반환함)
    def predict(self, X_data, threshold=0.5):
        '''
        입력값 정보
        X_data : input 변수(x1~x8)
        threshold : 기본값은 0.5, 0과 1을 구분할 임계치(기준선)
        '''
        # 가중합 연산
        y = self.weight_sum(X_data)
        # 활성화 함수를 거쳐 확률값을 얻음
        Y = sigmoid(y)
        # Threshold보다 작으면 0으로 예측, 크거나 같으면 1로 예측
        pred = np.where(Y < threshold, 0, 1)
        
        # 출력 : 예측값(ndarray)
        return pred
    
    # 검증 기능(evaluation, 테스트 데이터에 대한 순전파 결과를 바로 출력)
    def evaluate(self, X_data, y_data):
        '''
        입력값 정보
        X_data : input 변수(x1~x8), X_test
        y_data : label 변수(y), y_test
        '''
        # X(input), z(label) 정의
        X, z = X_data, y_data
        # 가중합(logits) 연산
        y = self.weight_sum(X_data)
        
        # 평균 loss값 연산
        loss = np.mean(sigmoid_crossentropy_logits(z=z, y=y))
        # 평균 accuracy 연산
        accuracy = accuracy_score(z=z, y=y)
        
        # 검증 결과 출력하기 (소수점 셋째자리로 반올림)
        print("[Evaluation] TestData - Loss = {:.3f}, Accuracy = {:.3f}".format(loss, accuracy))

## 🧠 인공신경망(ANN) 클래스(Class) 추가구현 : NeuralNetwork_additional() - 역전파, 학습 기능을 추가 구현

In [None]:
# 이전에 정의한 NeuralNetwork class를 상속받고 추가적으로 역전파와 학습기능을 구현하기
class NeuralNetwork_additional(NeuralNetwork):
    # 초기함수(Class 선언시 실행되는 초기함수 : 상속받아서 그대로 사용함)
    def __init__(self, input_dim, output_dim):
        '''
        입력값 정보
        input_dim : 입력층 노드 수
        output_dim : 출력층 노드 수
        '''
        # super()를 통해서 self.w, self.b 변수정보를 그대로 받아와서 실행함
        super().__init__(input_dim, output_dim)
    
    # 역전파 기능 (활성화 단계도 함께 계산하는 sigmoid_crossentropy_with_logits 방식 사용)
    def feed_backward(self, X, z, y):
        '''
        입력값 정보
        X : input 변수(x1~x8), X_train
        z : label 변수(y), y_train
        y : 가중합 연산을 통해 얻은 logits
        '''
        # dE/dy 연산 (sigmoid_crossentropy_logits_prime)
        dE_dy = sigmoid_crossentropy_logits_prime(z=z, y=y)
        # dy/dw = X의 역행렬(transpose)
        dy_dw = X.T
        
        # dE/dw 가중치 기울기값 연산 (matmul을 통한 행렬곱)
        self.dE_dw = np.matmul(dy_dw, dE_dy)
        # dE/db 편향 기울기값 연산 (dE/dy의 합), bias와 형태 맞추기위해 axis=0으로 설정(열 방향)
        self.dE_db = np.sum(dE_dy, axis=0) # (4x1) -> (1x1)
    
    # 학습 기능 (Epoch별 batch를 고려한 학습을 진행)
    def training(self, X_data, y_data, mb_size, epochs, learning_rate=1, verbose=1):
        '''
        입력값 정보
        X_data : input 변수(x1~x8), X_train
        y_data : label 변수(y), y_train
        mb_size : Mini-Batch 크기
        epochs : Epoch 수
        learning_rate : 기본값은 1로 설정함, 학습률
        verbose : 기본값은 1, 훈련 결과를 출력 할지 여부를 결정함
            verbose = 0 : 출력안함
            verbose = 1 : 1 epoch에 대한 평균 loss와 accuracy출력
            verbose = 2 : batch별 데이터까지 함께 출력
        '''
        # learning_rate를 lr로 저장함
        lr = learning_rate
        # Batch 갯수를 계산
        batch_count = int(X_data.shape[0]) // mb_size
        # Epoch별 손실과 정확도 리스트를 담을 history 딕셔너리를 지정
        self.history = {'Loss': [], 'Accuracy': []}
        
        # 지정한 epoch 수에 따라서 반복을 진행
        for epoch in range(epochs):
            # 손실과 정확도를 담을 빈 리스트 지정(epoch마다 초기화됨)
            losses, accs = [], []
            
            # batch 단위로 반복을 진행함
            for batch in range(batch_count):
                # X(input), z(label)를 get_batch_data 함수로부터 얻어냄
                X, z = get_batch_data(X_data, y_data, size=mb_size, n=batch)
                # X(input) -> y(가중합) 연산
                y = self.weight_sum(X=X)
                
                # 각 값들의 손실값(E) 연산
                E = sigmoid_crossentropy_logits(z=z, y=y)
                # loss : 미니배치의 평균 손실값
                loss = np.mean(E)
                # accuracy : 미니배치의 정확도값
                accuracy = accuracy_score(z=z, y=y)
                
                # 손실값과 정확도값을 append (소수점 셋째자리로 반올림)
                losses.append(round(loss, 3))
                accs.append(round(accuracy, 3))
                
                # 역전파 실시(self.dE_dw, self.dE_db가 업데이트 됨)
                self.feed_backward(X=X, z=z, y=y)
                # 가중치 업데이트 실시
                self.w -= lr*self.dE_dw
                # 편향 업데이트 실시
                self.b -= lr*self.dE_db
            
            # epoch당 평균 loss와 평균 accuracy를 딕셔너리의 리스트에 append (소수점 셋째자리로 반올림)
            self.history['Loss'].append(round(np.mean(losses), 3))
            self.history['Accuracy'].append(round(np.mean(accs), 3))
            
            # verbose 기능(기본값은 1)
            if verbose in [1,2]:
                # 평균 loss와 평균 accuracy를 출력(소수점 셋째자리로 반올림)
                print("\n[Epoch {}/{}] TrainData - Loss = {:.3f}, Accuracy = {:.3f}"
                      .format(epoch+1, epochs, np.mean(losses), np.mean(accs)))
                if verbose == 2:
                    # 각 배치별 loss와 accuracy를 확인하기 위한 출력(verbose==2로 설정해야 보임)
                    print(f"\tBatch Size : {mb_size}\n\tMini-Batchs : {batch_count}\
                        \n\tLoss : {losses}\n\tAccurracy : {accs}")
            elif verbose == 0:
                # verbose = 0 으로 주어지면 출력안하고 pass함
                pass

## 📈 기능3-8. 학습 곡선 시각화 기능 : plot_history()

In [None]:
# 학습 곡선 시각화 기능
def plot_history(NN, key, show_value=False, show_xticks=False):
    '''
    입력값 정보
    NN : NeuralNetwork 인스턴스
    key : history변수의 key값, 'Loss' or 'Accuracy'
    show_value : 그래프에 값을 표시할지 여부 (기본값은 False로 설정)
    show_xticks : 그래프에 x축 값을 표시할지 여부 (기본값은 False로 설정)
    '''
    # 그래프 y 값 : key에 해당하는 리스트
    y = NN.history[key]
    # 그래프 x 값 : epoch(1 ~ y길이)
    x = np.arange(len(y))+1
    
    # 그래프 크기를 6x6으로 설정
    plt.figure(figsize=(6,6))
    # 선그래프 plot ('o-': 점표시)
    plt.plot(y, 'o-', color='orange')
    
    if show_xticks == True:
        # x축 눈금 설정
        plt.xticks(np.arange(len(y)), labels=x)
    elif show_xticks == False:
        plt.xticks([])
    
    # y축 범위 설정
    ylim = [0 , np.trunc(max(y)+1)]
    plt.ylim(ylim)
    
    # 축 이름, 제목 설정
    plt.xlabel("Epoch")
    plt.ylabel(f"{key}")
    plt.title(f'History of "{key}" in Neural Network')
    
    if show_value == True:
        # 그래프에 데이터 값을 표시하기
        for i in range(len(x)):
            height = y[i]
            plt.text(x[i]-1, height + ylim[1]/100, '%.3f' % height,
                     ha='center', va='bottom', size = 8)
    
    plt.show()

# 💽 4. 전체 기능 동작함수 정의

## 💽 기능 4-1. 전체 기능 동작 기능(순전파) : main_test()

In [None]:
def main_test():
    # 1-1. 데이터 불러오기 기능
    df = data_load(filepath=csv_dir)
    
    # 2-1. 데이터 뒤섞기 기능
    df_sf = data_shuffle(df)
    
    # 2-2. 데이터 표준화 기능
    df_sc = standard_scaler(df_sf, output_dim=1)
    
    # 2-3. 학습 및 테스트 데이터 분리 기능
    train, test = train_test_split(df_sc, train_ratio=0.8, batch_size=4)
    
    # 2-4. 독립변수 및 종속변수 분리
    X_train, y_train = X_y_split(train, output_dim=1)
    X_test, y_test = X_y_split(test, output_dim=1)
    
    # 3. 인공신경망(ANN) - 순전파 (가중치 편향 생성 기능 포함)
    NN = NeuralNetwork(input_dim=8, output_dim=1)
    NN.feed_forward(X_train, y_train, mb_size=4, verbose=1)

## 💽 기능 4-2. 전체 기능 동작 기능(학습, 시각화, 예측, 검증) : main_additional_test()


In [None]:
def main_additional_test():
    # 1-1. 데이터 불러오기 기능
    df = data_load(filepath=csv_dir)
    
    # 2-1. 데이터 뒤섞기 기능
    df_sf = data_shuffle(df)
    
    # 2-2. 데이터 표준화 기능
    df_sc = standard_scaler(df_sf, output_dim=1)
    
    # 2-3. 학습 및 테스트 데이터 분리 기능
    train, test = train_test_split(df_sc, train_ratio=0.8, batch_size=4)
    
    # 2-4. 독립변수 및 종속변수 분리
    X_train, y_train = X_y_split(train, output_dim=1)
    X_test, y_test = X_y_split(test, output_dim=1)
    
    # 3-1. 인공신경망(ANN) - 역전파, 학습 추가 구현
    NN = NeuralNetwork_additional(input_dim=8, output_dim=1)
    NN.training(X_train, y_train, mb_size=4, epochs=5, learning_rate=1, verbose=2)
    
    # 3-2. 학습곡선 시각화
    plot_history(NN, key='Accuracy', show_value=True, show_xticks=True)
    
    # 3-3. TestData 예측
    print(f"[Label_data] TestData\n{y_test}\n")
    y_pred = NN.predict(X_test)
    print(f"[Prediction] TestData\n{y_pred}\n")
    
    # 3-4. TestData 검증
    NN.evaluate(X_test, y_test)