[모델 병렬적 ANN + 선형회귀]
1. ANN을 병렬적으로 사용해 특징을 추출한 후, 추가적인 ANN 모델에서 한번 더 학습한 후 최종적으로 선형 회귀 모델로 예측을 수행
- 1개의 은닉층을 사용하는 ANN 2개 + 1개
- 절단 정규 분포 (tf.truncated_normal), ReLU 활성화 함수 사용
- 선형 회귀 모델 y=W*x+b
2. 손실함수 : MSE
3. 옵티마이저 : Adam Optimizer (learning rate=0.0001)
4. 학습 epoch=400
- batch size=32

In [45]:
import tensorflow as tf
import numpy as np
from sklearn.preprocessing import StandardScaler

In [46]:
# 데이터 로드 함수
def load_csv(file_path):
    data = np.genfromtxt(file_path, delimiter=',', skip_header=1, usecols=range(8))
    x_data = np.delete(data, 1, axis=1)  # 두 번째 열 제거 (입력 변수)
    y_data = data[:, 1].reshape(-1, 1)  # 두 번째 열이 목표 변수
    return x_data, y_data

# 데이터 분할 및 표준화
def split_data(x_data, y_data):
    x_test = x_data[:400]
    y_test = y_data[:400]
    x_train = x_data[400:]
    y_train = y_data[400:]

    sc = StandardScaler()
    x_train = sc.fit_transform(x_train)
    x_test = sc.transform(x_test)
    
    return x_train, x_test, y_train, y_test

In [47]:
# 데이터 로드 및 분할
x_data, y_data = load_csv('data-ori.csv')
x_train, x_test, y_train, y_test = split_data(x_data, y_data)

In [48]:
# 하이퍼파라미터 설정
learning_rate = 0.0001
epochs = 400
batch_size = 32
input_size = x_train.shape[1]
hidden1_size = 64
hidden2_size = 32
output_size = 1

In [49]:
# 입력 플레이스홀더 정의
x = tf.placeholder(tf.float32, shape=[None, input_size])
y = tf.placeholder(tf.float32, shape=[None, output_size])

In [50]:
def feature_transform(x):
    # 첫 번째 특성 변환 (128차원 출력)
    W_trans1 = tf.Variable(tf.truncated_normal([input_size, hidden1_size], stddev=0.05))
    b_trans1 = tf.Variable(tf.zeros([hidden1_size]))
    transform1 = tf.nn.relu(tf.matmul(x, W_trans1) + b_trans1)

    # 두 번째 특성 변환 (64차원 출력)
    W_trans2 = tf.Variable(tf.truncated_normal([input_size, hidden2_size], stddev=0.05))
    b_trans2 = tf.Variable(tf.zeros([hidden2_size]))
    transform2 = tf.nn.relu(tf.matmul(x, W_trans2) + b_trans2)

    # 첫 번째 변환의 일부(64차원)와 두 번째 변환을 더함
    transform1_reduced = transform1[:, :hidden2_size]
    merged_features = transform1_reduced + transform2

    return merged_features

In [51]:
def residual_block(x):
    W_res = tf.Variable(tf.truncated_normal([x.shape[1].value, hidden1_size], stddev=0.05))
    b_res = tf.Variable(tf.zeros([hidden1_size]))
    transformed_x = tf.nn.relu(tf.matmul(x, W_res) + b_res)

    # 연결 수행
    res_output = tf.nn.relu(transformed_x + transformed_x)
    return res_output

In [52]:
transformed_features = feature_transform(x)

residual_output = residual_block(transformed_features)

In [53]:
# 최종 출력층 (회귀 예측)
W_out = tf.Variable(tf.truncated_normal([hidden1_size, output_size], stddev=0.1))
b_out = tf.Variable(tf.zeros([output_size]))
y_pred = tf.matmul(residual_output, W_out) + b_out

In [54]:
# 손실 함수 (MSE) 및 최적화 정의
loss = tf.reduce_mean(tf.square(y - y_pred))
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(loss)

In [55]:
# 세션을 열고 학습 진행
with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())

    total_batch = int(len(x_train) / batch_size)

    # 학습 과정
    for epoch in range(epochs):
        for i in range(total_batch):
            batch_x = x_train[i * batch_size:(i + 1) * batch_size]
            batch_y = y_train[i * batch_size:(i + 1) * batch_size]
            _, loss_val = sess.run([optimizer, loss], feed_dict={x: batch_x, y: batch_y})
        
        if epoch % 50 == 0:
            print(f"Epoch {epoch}, Loss: {loss_val}")

    # 테스트 데이터로 성능 평가
    y_pred_test = sess.run(y_pred, feed_dict={x: x_test})

    # 예측값과 실제값의 차이 합 계산
    difference_sum = sum(abs(y_pred_test - y_test))
    print(f'\n차이 합: {difference_sum}')

Epoch 0, Loss: 107.83441162109375
Epoch 50, Loss: 0.02653881534934044
Epoch 100, Loss: 0.0022442606277763844
Epoch 150, Loss: 0.0008728340035304427
Epoch 200, Loss: 0.0003840989666059613
Epoch 250, Loss: 0.0002302185312146321
Epoch 300, Loss: 0.00018801717669703066
Epoch 350, Loss: 0.00019421064644120634

차이 합: [4.88016796]
