In [1]:
# 필요한 모듈 불러오기
import pandas as pd
import numpy as np
import csv
import time

# 난수 발생 패턴을 고정하기 위해 난수는 seed(1234)로 설정
np.random.seed(1234)

# 고정되지 않는 난수를 발생하기 위해서 randomize함수를 이용해 현재시간을 기준으로 난수를 발생하는 함수
def randomize() :
    np.random.seed(time.time)

In [2]:
# 정규분포 난수값의 평균과 표준편차를 설정하여 가중치의 파라미터를 초기화 할때 사용한다
RND_MEAN = 0
RND_STD = 0.0030

# 학습률을 설정(하이퍼파라미터)
LEARNING_LATE = 0.001

In [3]:
# 메인 함수인 abalone_exec를 정의한다
# 메인함수는 abalone데이터를 로드하는 함수와 모델의 파라미터를 초기화 하는 함수,학습 및 평가과정을 수행하는 함수로 구성되어 있다.
# 학습에 필요한 하이퍼파라미터값(학습횟수,미니배치 크기,중간보고주기)을 매개변수로 지정하여 받는다.
def abalone_exec(epoch_count=10, mb_size=10, report=1):
    load_abalone_dataset()
    init_model()
    train_and_test(epoch_count, mb_size, report)

In [4]:
# 데이터셋 파일을 읽어와서 활용 할 수 있게 하는 함수정의
# Next함수는 파일의 첫 행을 읽지 않고 건너뛰게 한다.
# 인풋과 아웃풋의 벡터 크기를 10,1로 지정해준다.
# 파일에 담긴 전복 개체별 정보를 rows리스트에 수집한다
# 비선형인 성별 정보는 웟-핫 벡터로 변환한다
def load_abalone_dataset():
    with open('abalone.csv') as csvfile:
        csvreader = csv.reader(csvfile)
        next(csvreader,None)
        rows = []
        for row in csvreader:
            rows.append(row)
    
    global data, input_cnt, output_cnt
    input_cnt, output_cnt = 10, 1
    data = np.zeros([len(rows), input_cnt+output_cnt])

    for n, row in enumerate(rows):
        if row[0] == 'I' : data[n, 0] = 1
        if row[0] == 'M' : data[n, 1] = 1
        if row[0] == 'F' : data[n, 2] = 1
        data[n, 3:] = row[1:]

In [5]:
# 파라미터를 초기화하는 함수 정의
# weight는 정규분포를 갖는 나숫값으로 초기화 한다.
# bias는 초기에 지난친 영향을 주지 않도록 0으로 초기화 한다.
def init_model():
    global weight, bias, input_cnt, output_cnt
    weight = np.random.normal(RND_MEAN, RND_STD,[input_cnt,output_cnt])
    bias = np.zeros([output_cnt])

In [6]:
# 학습과 평가를 하는 함수 정의
def train_and_test(epoch_count, mb_size, report):
    step_count = arrange_data(mb_size)
    test_x, test_y = get_test_data()

    for epoch in range(epoch_count):
        losses, accs = [], []

        for n in range(step_count):
            train_x, train_y = get_train_data(mb_size, n)
            loss, acc = run_train(train_x, train_y)
            losses.append(loss)
            accs.append(acc)

        if report > 0 and (epoch+1) % report == 0:
            acc = run_test(test_x, test_y)
            print('Epoch {}: loss ={:5.3f}, accuracy={:5.3f}/{:5.3f}'. \
                  format(epoch+1, np.mean(losses), np.mean(accs), acc))
    
    final_acc = run_test(test_x, test_y)
    print('\nFinal Test: final accuracy = {:5.3f}'.format(final_acc))      

In [7]:
# 데이터 수만큼 일련번호를 발생시킨 후 무작위로 순서를 섞는다
def arrange_data(mb_size):
    global data, shuffle_map, test_begin_idx
    shuffle_map = np.arange(data.shape[0])
    np.random.shuffle(shuffle_map)
    step_count = int(data.shape[0] * 0.8) // mb_size
    test_begin_idx = step_count * mb_size
    return step_count

# 평가 데이터 공급 함수 arrange_data함수가 test_begin_idx에 구해놓은 위치를 경계삼아 shuffle_map의 후반부를 평가용 데이터로 활용
def get_test_data():
    global data, shuffle_map, test_begin_idx, output_cnt
    test_data = data[shuffle_map[test_begin_idx:]]
    return test_data[:, :-output_cnt], test_data[:, -output_cnt:]

# 학습데이터 공급 함수
def get_train_data(mb_size, nth):
    global data, shuffle_map, test_begin_idx, output_cnt
    if nth == 0:
        np.random.shuffle(shuffle_map[:test_begin_idx])
    train_data = data[shuffle_map[mb_size*nth:mb_size*(nth+1)]]
    return train_data[:, :-output_cnt], train_data[:, -output_cnt:]

In [8]:
# 학습 실행 함수
def run_train(x, y):
    output, aux_nn = forward_neuralnet(x)
    loss, aux_pp = forward_postproc(output, y)
    accuracy = eval_accuracy(output, y)

    G_loss = 1.0
    G_output = backprop_postproc(G_loss, aux_pp)
    backprop_neuralnet(G_output, aux_nn)

    return loss, accuracy

# 테스트 실행 함수
def run_test(x, y):
    output, _ = forward_neuralnet(x)
    accuracy = eval_accuracy(output, y)
    return accuracy

In [9]:
# 순전파 함수 정의
def forward_neuralnet(x):
    global weight, bias
    output = np.matmul(x, weight) + bias
    return output, x

# 역전파 함수 정릐
def backprop_neuralnet(G_output, x):
    global weight, bias
    g_output_w = x.transpose()

    G_w = np.matmul(g_output_w, G_output)
    G_b = np.sum(G_output, axis = 0)

    weight -= LEARNING_LATE * G_w
    bias -= LEARNING_LATE * G_b

In [10]:
# 순전파 처리 함수
def forward_postproc(output, y):
    diff = output -y
    square = np.square(diff)
    loss = np.mean(square)
    return loss, diff

# 역전파 처리 함수
def backprop_postproc(G_loss, diff):
    shape = diff.shape

    g_loss_square = np.ones(shape) / np.prod(shape)
    g_square_diff = 2 * diff
    g_diff_output = 1

    G_square = g_loss_square * G_loss
    G_diff = g_square_diff * G_square
    G_output = g_diff_output * G_diff

    return G_output

In [11]:
# 정확도 계산 함수 정의
def eval_accuracy(output, y):
    mdiff = np.mean(np.abs((output - y)/y))
    return 1- mdiff