# RNN모델을 통한 주가 예측 예제

In [30]:
import torch
import torch.nn as nn
import yfinance as yf
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt
import numpy as np

# 데이터 다운로드
data = yf.download('AAPL', '2018-1-1', '2023-1-1')
df = data['Close'].values
df = df.astype('float32')

# 데이터 정규화
scaler = MinMaxScaler(feature_range=(-1, 1))
df = scaler.fit_transform(df.reshape(-1, 1))

# 시퀀스 길이 및 예측할 미래 값 설정
seq_length = 30
future_pred = 30

# 시퀀스 생성
sequences = [([df[i+j] for j in range(seq_length)], [df[i+j+seq_length] for j in range(future_pred)]) for i in range(len(df)-seq_length-future_pred)]

# 훈련 및 테스트 데이터 분할
train_size = int(len(sequences) * 0.8)
train_data = sequences[:train_size]
test_data = sequences[train_size:]

# 모델 생성
class StockPredictor(nn.Module):
    def __init__(self, n_features, n_hidden, seq_len, n_layers=2):
        super(StockPredictor, self).__init__()
        self.seq_len = seq_len
        self.n_hidden = n_hidden
        self.n_layers = n_layers

        self.lstm = nn.LSTM(
          input_size=n_features,
          hidden_size=n_hidden,
          num_layers=n_layers,
          dropout=0.5
        )

        self.linear = nn.Linear(in_features=n_hidden, out_features=future_pred)

    def reset_hidden_state(self):
        self.hidden = (
            torch.zeros(self.n_layers, 1, self.n_hidden),
            torch.zeros(self.n_layers, 1, self.n_hidden)
        )

    def forward(self, sequences):
        lstm_out, self.hidden = self.lstm(sequences.view(len(sequences), 1, -1), self.hidden)
        last_time_step = lstm_out.view(self.seq_len, 1, self.n_hidden)[-1]
        y_pred = self.linear(last_time_step)

        return y_pred


model = StockPredictor(1, 512, seq_length)
loss_function = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# 훈련 루프
epochs = 10

for epoch in range(epochs):
    for seq, labels in train_data:
        optimizer.zero_grad()
        model.reset_hidden_state()

        seq = torch.FloatTensor(seq).view(seq_length, 1, 1)
        labels = torch.FloatTensor(labels).view(future_pred)

        y_pred = model(seq)

        loss = loss_function(y_pred, labels)
        loss.backward()
        optimizer.step()

    print(f'Epoch {epoch} Loss {loss.item()}')

# 모델 평가
model.eval()
test_preds = []

for seq, _ in test_data:
    seq = torch.FloatTensor(seq).view(seq_length, 1, 1)
    with torch.no_grad():
        model.reset_hidden_state()
        test_preds.append(model(seq).view(-1).tolist())






In [30]:
# 모델 평가
model.eval()
test_preds = []

for seq, _ in test_data:
    seq = torch.FloatTensor(seq).view(seq_length, 1, 1)
    with torch.no_grad():
        model.reset_hidden_state()
        test_preds.append(model(seq).view(-1).tolist())

In [34]:
# plot against original input
plt.figure(figsize=(14, 7))
plt.title('예측 결과')
plt.xlabel('Days')
plt.ylabel('Price')
plt.plot(df, 'b')
# plot test_preds
plt.plot(np.arange(len(df)-future_pred, len(df), 1), np.array(test_preds), 'r')

RNN output shape: torch.Size([1, 200, 20])
RNN hn shape: torch.Size([1, 1, 20])


---


In [40]:
import yfinance as yf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
import torch
from torch import nn

# Yahoo Finance에서 주식 데이터 다운로드
df = yf.download('AAPL', start='2020-01-01', end='2023-01-01')
df = df[['Close']]
df = df.dropna()

print(df.head())

tensor([ 0.1184,  0.5487,  0.3543, -0.2683,  0.8106,  0.1308, -0.7525,  0.3996,
         0.4866, -0.3970,  0.1684, -0.3137,  0.1926,  0.0098,  0.3533,  0.1244,
        -0.5722, -0.0304,  0.0546, -0.7848], grad_fn=<SqueezeBackward0>)

In [41]:
%matplotlib inline

plt.figure(figsize=(14,6))
plt.title('Apple Stock Prices')
plt.xlabel('Dates')
plt.ylabel('Prices')
plt.plot(df['Close'])
plt.show()

tensor([ 0.1184,  0.5487,  0.3543, -0.2683,  0.8106,  0.1308, -0.7525,  0.3996,
         0.4866, -0.3970,  0.1684, -0.3137,  0.1926,  0.0098,  0.3533,  0.1244,
        -0.5722, -0.0304,  0.0546, -0.7848], grad_fn=<SqueezeBackward0>)

In [32]:
# 데이터 스케일링
scaler = MinMaxScaler(feature_range=(-1, 1))
df['Close'] = scaler.fit_transform(df['Close'].values.reshape(-1,1))

# 학습과 테스트 데이터 분할
test_data_size = 60
train_data = df[:-test_data_size]
test_data = df[-test_data_size:]

train_data = torch.FloatTensor(train_data.values)

# 시퀀스 데이터 생성
def create_sequences(data, seq_length):
    sequences = []
    for i in range(len(data)-seq_length-1):
        seq = data[i:i+seq_length]
        label = data[i+seq_length:i+seq_length+1]
        sequences.append((seq, label))
    return sequences

seq_length = 30
sequences = create_sequences(train_data, seq_length)

# LSTM 모델 정의
class StockPredictor(nn.Module):
    def __init__(self, n_features, n_hidden, seq_len, n_layers=2):
        super(StockPredictor, self).__init__()

        self.n_hidden = n_hidden
        self.seq_len = seq_len
        self.n_layers = n_layers

        self.lstm = nn.LSTM(
          input_size=n_features,
          hidden_size=n_hidden,
          num_layers=n_layers,
          dropout=0.5
        )

        self.linear = nn.Linear(in_features=n_hidden, out_features=future_pred)

    def reset_hidden_state(self):
        self.hidden = (
            torch.zeros(self.n_layers, self.seq_len, self.n_hidden),
            torch.zeros(self.n_layers, self.seq_len, self.n_hidden)
        )

    def forward(self, sequences):
        lstm_out, self.hidden = self.lstm(sequences.view(len(sequences), self.seq_len, -1), self.hidden)
        last_time_step = lstm_out.view(self.seq_len, len(sequences), self.n_hidden)[-1]
        y_pred = self.linear(last_time_step)

        return y_pred

model = StockPredictor(1, 128, 2, 1)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

epochs = 100
seq_length = 30
future_pred = 30
sequences = [([df[i+j] for j in range(seq_length)], [df[i+j+seq_length] for j in range(future_pred)]) for i in range(len(df)-seq_length-future_pred)]

train_size = int(len(sequences) * 0.8)
train_data = sequences[:train_size]
test_data = sequences[train_size:]

# 훈련 루프
for epoch in range(epochs):
    for seq, y_train in train_data:
        optimizer.zero_grad()
        model.hidden = (torch.zeros(1, 1, model.hidden_dim),
                        torch.zeros(1, 1, model.hidden_dim))

        seq = torch.FloatTensor(seq).view(seq_length, 1, 1)
        y_train = torch.FloatTensor(y_train).view(future_pred, 1)

        y_pred = model(seq)

        loss = criterion(y_pred, y_train)
        loss.backward()
        optimizer.step()

    print(f'Epoch {epoch} Loss {loss.item()}')





KeyError: 0

In [47]:
for seq, labels in sequences:
    break

LSTM output shape: torch.Size([1, 200, 20])
LSTM hn shape: torch.Size([1, 1, 20])
LSTM cn shape: torch.Size([1, 1, 20])


In [48]:
test_inputs = df['Close'][-test_data_size-seq_length:].values
print(test_inputs.shape)

tensor([-0.0362, -0.1588, -0.1701,  0.0795,  0.1226,  0.0474, -0.0110,  0.0525,
        -0.0491,  0.0599,  0.0788, -0.0597, -0.0438,  0.0516, -0.1031,  0.2357,
         0.0581, -0.1118,  0.1886,  0.0307], grad_fn=<SqueezeBackward0>)

In [49]:
# 예측 수행
model.eval()

test_inputs = df['Close'][-test_data_size-seq_length:].values
test_inputs = torch.FloatTensor(test_inputs).view(-1, 1)

future_pred = 30
test_preds = test_inputs[-seq_length:].tolist()
test_preds = [x[0] for x in test_preds]

for i in range(future_pred):
    seq = torch.FloatTensor(test_preds[-seq_length:])
    seq = seq.view(1, seq_length, 1)
    with torch.no_grad():
        model.hidden = (torch.zeros(1, 1, model.hidden_dim),
                        torch.zeros(1, 1, model.hidden_dim))
        test_preds.append(model(seq).item())


tensor([-0.0362, -0.1588, -0.1701,  0.0795,  0.1226,  0.0474, -0.0110,  0.0525,
        -0.0491,  0.0599,  0.0788, -0.0597, -0.0438,  0.0516, -0.1031,  0.2357,
         0.0581, -0.1118,  0.1886,  0.0307], grad_fn=<SqueezeBackward0>)

In [50]:
# 예측 결과를 원래의 스케일로 변환합니다.
actual_predictions = scaler.inverse_transform(np.array(test_preds[seq_length:]).reshape(-1, 1))

# 예측 결과를 플로팅합니다.
plt.figure(figsize=(10,6))

# 실제 데이터 플로팅
plt.plot(df.index, scaler.inverse_transform(df['Close'].values.reshape(-1, 1)), label='Actual')

# 예측 데이터 플로팅
plt.plot(df.index[-future_pred:], actual_predictions, label='Predicted')

# 예측의 시작을 나타내는 점선 추가
plt.axvline(df.index[-future_pred], color='r', linestyle='dashed', linewidth=2)

plt.title('Apple Stock Price Prediction')
plt.xlabel('Dates')
plt.ylabel('Price')
plt.legend()
plt.show()


tensor([-0.0800, -0.2543, -0.2359,  0.2524,  0.3194,  0.1062, -0.0356,  0.1488,
        -0.0877,  0.1343,  0.1579, -0.1364, -0.0794,  0.1050, -0.4178,  0.3564,
         0.1088, -0.1730,  0.4015,  0.0471], grad_fn=<SqueezeBackward0>)