#### 패키지 Import

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import time
from sklearn.preprocessing import MinMaxScaler
from keras.models import Sequential
from keras.layers import GRU, Dense

#### 전역변수 설정

In [None]:
trainGraphTitle = "Train Data"
testGraphTitle = "Test Data"
loss = "mse" # mse

resultComment = "DataSize - 40Year"
fileSavePath = "./result/DataSize/40year"

depth = "SimpleGRU" # SimpleGRU, TwoGRU , ThreeGRU
# depth = "TwoGRU"
# depth = "ThreeGRU"

# 변수 "" 가 Default
hiddenState = 32 # units: 16 "32" 64
timeStep = 20 # input_length 10 "20" 40
activation = "tanh" # "tanh" sigmoid
epochs = 100 # 50 "100" 200
batchSize = 64 # 32 "64" 256
dataSetYear = 40 # 5 "10" 40
optimizer = "adam" # "adam" sgd

#### Pandas Setting

In [None]:
pd.set_option('display.max_rows', None) # row 생략 없이 출력
pd.set_option('display.max_columns', None) # col 생략 없이 출력

#### Data Load

In [None]:
apple = pd.read_csv("Apple_5Y.csv")

if dataSetYear == 10:
    apple = pd.read_csv("Apple_10Y.csv")
elif dataSetYear == 40:
    apple = pd.read_csv("Apple_Whole_Period.csv")

#### Describe 확인

In [None]:
apple.describe()

#### trainData, testData 가공하는 함수

In [None]:
def transformData(data: [[float]]):
    # 날짜 제외
    data = data.drop(columns=["Date"])

    # 데이터 분리 test: 200개와 나머지
    trainSet = data[ : -200]
    testSet = data[-200 : ]
    
    # 데이터 0~1로 정규화
    sc = MinMaxScaler(feature_range=(0, 1)) 
    sc.fit(trainSet)
    trainSet = sc.transform(trainSet)
    testSet = sc.transform(testSet)
    
    # trainX, trainY, testX, testY 분리
    def parsingData(dataSet: [[float]]) -> ([[[float]]], [[float]]):
        dataX, dataY = [], []
        for index in range(len(dataSet) - timeStep):
            temp = []
            for step in range(timeStep):
                temp.append(dataSet[index + step])
            dataX.append(temp)
            dataY.append(dataSet[index + timeStep])

        return np.array(dataX), np.array(dataY)

    trainDataX, trainDataY = parsingData(trainSet)
    testDataX, testDataY = parsingData(testSet)
    
    
    return trainDataX, trainDataY, testDataX[:-30], testDataY, sc

#### Data Parsing

In [None]:
trainX, trainY, testX, testY, scaler = transformData(apple)

#### Model Implementation

In [None]:
model = Sequential()

if depth == "ThreeGRU":
    model.add(
        GRU(
            units = hiddenState,
            input_length = trainX.shape[1],
            input_dim = trainX.shape[2],
            activation = activation,
            return_sequences = True
        )
    )
    model.add(
        GRU(
            units = hiddenState,
            activation = activation,
            return_sequences = True
        )
    )
    model.add(
        GRU(
            units = hiddenState,
            activation = activation
        )
    )
    
    model.add(Dense(6))
    
elif depth == "TwoGRU":
    model.add(
        GRU(
            units = hiddenState,
            input_length = trainX.shape[1],
            input_dim = trainX.shape[2],
            activation = activation,
            return_sequences = True
        )
    )
    model.add(
        GRU(
            units = hiddenState,
            activation = activation
        )
    )
    
    model.add(Dense(6))
elif depth == "SimpleGRU":
    model.add(
    GRU(
        units = hiddenState,
        input_length = trainX.shape[1],
        input_dim = trainX.shape[2],
        activation = activation
        )
    )
    model.add(Dense(6))

model.summary()

#### Model Complie

In [None]:
model.compile(
    loss = loss,
    optimizer = optimizer,
    metrics = ["mae"]
)

#### Model Training

In [None]:
fitStartTime = time.time()
history = model.fit(
    trainX,
    trainY,
    epochs = epochs,
    batch_size = batchSize
)
fitEndTime = time.time()

#### 시간 및 평가 기록

In [None]:
fitTime = fitEndTime - fitStartTime
score = model.evaluate(testX, testY[:-30])

f = open("result.txt", "a")
f.write(f"{resultComment}\n모델 학습 시간: {fitTime:.3} sec\n평가 손실: {score[0]}\n\n")
f.close()

In [None]:
f"모델 학습 시간: {fitTime:.3} sec"

In [None]:
f"평가 손실: {score[0]}"

#### 예측

In [None]:
trainPrediction = scaler.inverse_transform(model.predict(trainX))
testPrediction = scaler.inverse_transform(model.predict(testX))

inversingTrainY = scaler.inverse_transform(trainY)
inversingTestY = scaler.inverse_transform(testY)

#### 그래프

#### Loss

In [None]:
loss = history.history["loss"]
plt.title("Loss")
plt.plot(loss, label="loss")
plt.grid(True)
plt.savefig(f"{fileSavePath}/Loss.png")
plt.legend()
plt.show()

Train Data Graph

In [None]:
plt.title(trainGraphTitle)
plt.plot(inversingTrainY[:, 4], label="Train ADJ.Close")
plt.plot(trainPrediction[:, 4], label="Predict ADJ.Close")
plt.grid(True)
plt.savefig(f"{fileSavePath}/Train.png")
plt.xlabel('Day')
plt.ylabel('Price')
plt.legend()
plt.show()

Test Data Graph

In [None]:
plt.title(testGraphTitle)
plt.plot(inversingTestY[:, 4], label="Test ADJ.Close")
plt.plot(testPrediction[:, 4], label="Predict ADJ.Close")
plt.grid(True)
plt.savefig(f"{fileSavePath}/Test.png")
plt.xlabel('Day')
plt.ylabel('Price')
plt.axvline(x=len(testY)-30, color='green', linestyle='-', linewidth=1)
plt.legend()
plt.show()

#### 30일 후 예측

In [None]:
for _ in range(30):
    nextPredict = model.predict(testX)
    newData = nextPredict[-timeStep:]
    newData = np.reshape(newData, (1, newData.shape[0], newData.shape[1]))
    testX = np.append(testX, newData, axis = 0)

30일 예측 그래프

In [None]:
ThirtyDaysAfterpredict = scaler.inverse_transform(model.predict(testX))

plt.title("30 Days After")
plt.plot(inversingTestY[:, 4], label="Test ADJ.Close")
plt.plot(ThirtyDaysAfterpredict[:, 4], label="30 Days predict ADJ.Close")
plt.grid(True)
plt.savefig(f"{fileSavePath}/30Predict.png")
plt.xlabel('Day')
plt.ylabel('Price')
plt.axvline(x=len(testY)-30, color='green', linestyle='-', linewidth=1)
plt.legend()
plt.show()