In [None]:
import numpy as np
import matplotlib.pyplot as plt
import random
import torch
from torch import nn
from torch.utils.data import DataLoader, TensorDataset
import torch.optim as optim
import pandas as pd
from sklearn.preprocessing import MinMaxScaler

print(torch.__version__)

In [None]:
# Hyper-parameters
learning_rate = 0.001
n_epochs = 100
batch_size = 32
seq_length = 5
data_dim = 5
out_dim = 1
val_data_ratio = 0.2
random_seed = 777

In [None]:
# 종목이름
company_name = "현대차"

In [None]:
# Finance-datareader 설치
!pip install -U finance-datareader

In [None]:
import FinanceDataReader as fdr

In [None]:
# 한국거래소 상장종목 받아오기
stock_df = fdr.StockListing('KRX')

In [None]:
stock_df.head()

In [None]:
# 회사명과 종목코드만 선별
stock_df = stock_df[['Name', 'Symbol']]
stock_df.head()

In [None]:
# dictionary로 만들기
stock_dict = stock_df.set_index('Name').T.to_dict('list')
stock_dict

In [None]:
# 종목코드 가져오기
code = stock_dict[company_name][0]
code

In [None]:
# 주가 data 받아오기
df = fdr.DataReader(code)
df

In [None]:
# 최근 5000일 데이타 plot
df['Close'].iloc[5000:].plot()

In [None]:
# Data Normalization
scaler = MinMaxScaler()
df[['Open','High','Low','Close','Volume']] = scaler.fit_transform(df[['Open','High','Low','Close','Volume']])

In [None]:
# Normalization 결과 확인
df

In [None]:
df['Close'].iloc[5000:].plot()

In [None]:
df[['Open', 'High', 'Low', 'Volume', 'Close']].plot()

In [None]:
# device 설정
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print('Using {} device'.format(device))

In [None]:
# Data numpy로 가져오기
x = df[['Open', 'High', 'Low', 'Volume', 'Close']].values
y = df['Close'].values

In [None]:
# Sequence data로 변환
def seq_data(x, y, seq_length):
  x_seq = []
  y_seq = []
  for i in range(len(x) - seq_length):
    x_seq.append(x[i:i+seq_length])
    y_seq.append(y[i+seq_length])
  x_seq = np.array(x_seq)
  y_seq = np.array(y_seq)
  
  return torch.FloatTensor(x_seq).to(device), torch.FloatTensor(y_seq).to(device).view([-1,1])

In [None]:
# Training/Validation set으로 분리
x_seq, y_seq = seq_data(x, y, seq_length)
n_train = int(x_seq.size(0)*(1-val_data_ratio))

x_train_seq = x_seq[:n_train]
y_train_seq = y_seq[:n_train]
x_val_seq = x_seq[n_train:]
y_val_seq = y_seq[n_train:]

print(x_train_seq.size(), y_train_seq.size())
print(x_val_seq.size(), y_val_seq.size())

In [None]:
# Data Loader 만들기
train_data = TensorDataset(x_train_seq, y_train_seq)
val_data = TensorDataset(x_val_seq, y_val_seq)

train_dataloader = DataLoader(train_data, batch_size=batch_size, shuffle=True, drop_last=True)
val_dataloader = DataLoader(val_data, batch_size=batch_size, shuffle=False)

In [None]:
# LSTM Model 만들기
class LSTM(nn.Module):
  def __init__(self, input_size, hidden_size, num_layers, seq_length, device):
    super(LSTM, self).__init__()
    self.hidden_size = hidden_size
    self.num_layers = num_layers
    self.seq_length = seq_length
    self.device = device
    self.LSTM = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
    # self.fc = nn.Sequential(nn.Linear(hidden_size, out_dim),
    #                         nn.Sigmoid())
    self.fc = nn.Linear(hidden_size, out_dim)
    

  def forward(self, x):
    h = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(self.device)
    c = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(self.device)
    x, _ = self.LSTM(x, (h, c))
    x = x[:,-1,:].view(-1, self.hidden_size)
    x = self.fc(x)
    return x

In [None]:
# Model instance 생성, device 설정
input_size = data_dim
hidden_size = data_dim * 2
num_layers = 2
model = LSTM(input_size, hidden_size, num_layers, seq_length, device).to(device)

In [None]:
print(model)

In [None]:
# Loss Function
loss_fn = nn.MSELoss()

In [None]:
# Optimizer
optimizer = optim.Adam(params=model.parameters(), lr=learning_rate)

In [None]:
# Train Function
def train_loop(dataloader, model, loss_fn, optimizer):
  size = len(dataloader.dataset)
  n_batches = len(dataloader)
  model.train()
  train_loss = 0.
  for batch, (data, label) in enumerate(dataloader):
    data, label = data.to(device), label.to(device)
    pred = model(data)
    loss = loss_fn(pred, label)    

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    train_loss += loss.item()
  train_loss /= n_batches

  print(f"Training Loss: {train_loss:>8f}")

In [None]:
# Validation Function
def val_loop(dataloader, model, loss_fn):
  size = len(dataloader.dataset)
  n_batches = len(dataloader)
  model.eval()
  val_loss, correct = 0., 0
  with torch.no_grad():
    for data, label in dataloader:
      data, label = data.to(device), label.to(device)
      pred = model(data)
      val_loss += loss_fn(pred, label).item()
      # correct += (pred.argmax(1) == label).type(torch.float).sum().item()
  val_loss /= n_batches
  # correct /= size
  # print(f"Validation Loss: {val_loss:>8f} Validation Accuracy: {(100*correct):>0.4f}%, \n")
  print(f"Validation Loss: {val_loss:>8f}\n")

In [None]:
# 학습 진행하기
for epoch in range(n_epochs):
  print(f"<<Epoch {epoch+1}>>\n--------------------------------------------------------")
  train_loop(train_dataloader, model, loss_fn, optimizer)
  val_loop(val_dataloader, model, loss_fn)
print("Training Done!")

In [None]:
# 결과 확인
model.eval()
test_loader = DataLoader(val_data, batch_size=x_val_seq.size(0))
data, label = next(iter(test_loader))
data = data.to(device)
pred = model(data)

In [None]:
prediction = pred.to(torch.device('cpu')).detach().numpy()
label = label.to(torch.device('cpu')).detach().numpy()

In [None]:
plt.figure(figsize=(15,7))
plt.plot(label)
plt.plot(prediction)
plt.xlabel("Time Period")
plt.ylabel("Stock Price")
plt.legend(['real', 'prediction'])
plt.show()

In [None]:
df_temp = fdr.DataReader(code)

In [None]:
scaler_temp = MinMaxScaler()
df_temp['Close'] = scaler_temp.fit_transform(df_temp[['Close']])

In [None]:
# 오늘 주가 예측
scaler_temp.inverse_transform(prediction)[-1,0]

In [None]:
# 오늘 현재 주가
scaler_temp.inverse_transform(label)[-1,0]