### 1. 동작 환경 세팅 및 학습 데이터 불러오기
-----

In [None]:
!pip install --upgrade pytorch-lightning ta
!mkdir ./check_points ./lightning_logs ./lightning_logs/LSTM_MODEL

In [None]:
!unzip "./Huge_Stock_Market_Dataset.zip"

### 2. 데이터 전처리 및 데이터 로더 생성시키기
-----

In [4]:
import pandas as pd
import numpy as np
import glob
import ta

# 전처리되지 않은 주가 데이터들을 처리하는 과정을 관리하기 위해서
class Raw_Stock_Data_Processor :
  def __init__(self) :
    self.DATA_SETS = {"X_DATAS":np.array([]), "Y_DATAS":np.array([])}
  

  # 데이터를 전처리하고, 그 결과를 클래스 데이터로 저장시키기 위해서
  def __processing_Raw_Stock_Data(self, csv_path, used_prev_days=30, pred_after_days=7) :
    CSV_DATA = pd.read_csv(csv_path).loc[:, ["Open", "High", "Low", "Close"]] + 1e-6

    CSV_DATA = Raw_Stock_Data_Processor.add_Technic_Indexes(CSV_DATA)
    CSV_DATA = CSV_DATA.iloc[30:]

    LABELS = ((CSV_DATA["Close"].shift(-pred_after_days+1) - CSV_DATA["Open"]) / CSV_DATA["Open"]).dropna().tolist()
    DATAS = CSV_DATA.drop(["Open", "High", "Low", "Close"], axis=1).values.tolist()


    x_datas = []
    y_datas = []
    for data_index in range(len(DATAS)-used_prev_days-pred_after_days) :
      DATAS_CHANNEL = DATAS[data_index:data_index+used_prev_days]
      x_datas.append(np.stack([DATAS_CHANNEL, DATAS_CHANNEL, DATAS_CHANNEL]))
      y_datas.append(LABELS[data_index+used_prev_days+1])

    return np.array(x_datas, dtype=np.float32), np.array(y_datas, dtype=np.float32)
  
  # 다수의 경로에 존재하는 데이터들을 전처리하고 그 결과를 합치기 위해서
  def processing_Raw_Stock_Datas(self, csv_paths, used_prev_days=30, pred_after_days=7) :
    x_datas_stack, y_datas_stack = [], []
    for csv_path_index, csv_path in enumerate(csv_paths) :
      print(f"[*] {csv_path_index+1}/{len(csv_paths)} 번째 csv 파일 처리중... : {csv_path}")
      X_DATAS, Y_DATAS = self.__processing_Raw_Stock_Data(csv_path, used_prev_days, pred_after_days)
      if len(X_DATAS.shape) != 4 : continue

      x_datas_stack.append(X_DATAS)
      y_datas_stack.append(Y_DATAS)

    self.DATA_SETS["X_DATAS"] = np.concatenate(x_datas_stack, axis=0)
    self.DATA_SETS["Y_DATAS"] = np.concatenate(y_datas_stack, axis=0)

    print(self.DATA_SETS["X_DATAS"].shape)
    print(self.DATA_SETS["Y_DATAS"].shape)
  
  
  # 사용 할 주요 지표들을 추가시키기 위해서
  @staticmethod
  def add_Technic_Indexes(csv_datas) :
    csv_datas["Open_Pct"] = csv_datas["Open"].pct_change()
    csv_datas["High_Pct"] = csv_datas["High"].pct_change()
    csv_datas["Low_Pct"] = csv_datas["Low"].pct_change()
    csv_datas["Close_Pct"] = csv_datas["Close"].pct_change()

    csv_datas["Open_Close_Pct"] = (csv_datas["Close"] - csv_datas["Open"]) / csv_datas["Open"]
    csv_datas["High_Low_Pct"] = (csv_datas["High"] - csv_datas["Low"]) / csv_datas["Low"]

    csv_datas["Close_WMA_5_Pct"] = ta.trend.WMAIndicator(close=csv_datas["Close"], window=5).wma().pct_change()
    csv_datas["Close_WMA_10_Pct"] = ta.trend.WMAIndicator(close=csv_datas["Close"], window=10).wma().pct_change()

    csv_datas["BB_High_Band_Pct"] = ta.volatility.BollingerBands(close=csv_datas["Close"], window=20).bollinger_hband().pct_change()
    csv_datas["BB_Low_Band_Pct"] = ta.volatility.BollingerBands(close=csv_datas["Close"], window=20).bollinger_lband().pct_change()
    csv_datas["BB_Channel_Pct"] = ta.volatility.BollingerBands(close=csv_datas["Close"], window=20).bollinger_pband()
    csv_datas["BB_Width_Pct"] = ta.volatility.BollingerBands(close=csv_datas["Close"], window=20).bollinger_wband().pct_change()

    csv_datas["MACD_Pct"] = ta.trend.MACD(close=csv_datas["Close"]).macd().pct_change()
    csv_datas["RSI_Pct"] = ta.momentum.RSIIndicator(close=csv_datas["Close"], window=20).rsi().pct_change()
    csv_datas["CCI_Pct"] = ta.trend.CCIIndicator(high=csv_datas["High"], low=csv_datas["Low"], close=csv_datas["Close"], window=20).cci().pct_change()
    csv_datas["ATR_Pct"] = ta.volatility.AverageTrueRange(high=csv_datas["High"], low=csv_datas["Low"], close=csv_datas["Close"], window=20).average_true_range().pct_change()
    return csv_datas


  # 전처리한 데이터를 파일로 저장시키기 위해서
  def save(self, save_path) :
    if len(self.DATA_SETS["X_DATAS"]) == 0 or len(self.DATA_SETS["Y_DATAS"]) == 0 : raise Exception("Datas to save are not exist !")  
    np.save(save_path, self.DATA_SETS)
  
  # 이미 전처리된 데이터를 불러오기 위해서
  def load(self, load_path) :
    self.DATA_SETS = np.load(load_path, allow_pickle=True).item()
  
  
  def __len__(self) :
    return len(self.DATA_SETS["X_DATAS"])

  def __getitem__(self, name):
    return self.DATA_SETS[name]

In [5]:
from torch.utils.data.dataset import Dataset
import math

# 전처리된 주가 데이터들의 일부분을 데이터셋으로 생성시키기 위해서
class Stock_Dataset(Dataset) :
  def __init__(self, raw_stock_data_processor, div_ratio_start=0.0, div_ratio_end=1.0, unit=32) :
    START_INDEX, END_INDEX = int(len(raw_stock_data_processor)*div_ratio_start), int(len(raw_stock_data_processor)*div_ratio_end)
    ADJUST_END_INDEX = START_INDEX + math.floor((END_INDEX-START_INDEX)/unit)*unit
    self.X_DATAS = raw_stock_data_processor["X_DATAS"][START_INDEX:ADJUST_END_INDEX]
    self.Y_DATAS = raw_stock_data_processor["Y_DATAS"][START_INDEX:ADJUST_END_INDEX]

  def __len__(self) :
    return len(self.X_DATAS)
  
  def __getitem__(self, i) :
    return self.X_DATAS[i], self.Y_DATAS[i]

In [None]:
# 실제 주가 데이터들을 학습에 적합하도록 전처리시키기 위해서
RAW_STOCK_DATA_PROCESSOR = Raw_Stock_Data_Processor()
RAW_STOCK_DATA_PROCESSOR.processing_Raw_Stock_Datas(["/content/Huge_Stock_Market_Dataset/Stocks/a.us.txt"], 30, 7)
RAW_STOCK_DATA_PROCESSOR.save("train_sets_a_us.npy")

In [None]:
from torch.utils.data.dataloader import DataLoader

# 전처리된 주가 데이터들을 각각 학습 / 검증 / 테스트 데이터셋으로 생성시키기 위해서
RAW_STOCK_DATA_PROCESSOR = Raw_Stock_Data_Processor()
RAW_STOCK_DATA_PROCESSOR.load("/content/train_sets_a_us.npy")


# 학습, 검증, 테스트 데이터셋 불러오기 및 데이터 로더 생성하기
TRAIN_DATASET = Stock_Dataset(RAW_STOCK_DATA_PROCESSOR, 0.0, 0.6, 32)
VALID_DATASET = Stock_Dataset(RAW_STOCK_DATA_PROCESSOR, 0.6, 0.8, 32)
TEST_DATASET = Stock_Dataset(RAW_STOCK_DATA_PROCESSOR, 0.8, 1.0, 32)

TRAIN_LOADER = DataLoader(TRAIN_DATASET, batch_size=32, shuffle=True, drop_last=True)
VALID_LOADER = DataLoader(VALID_DATASET, batch_size=32, shuffle=False, drop_last=True)
TEST_LOADER = DataLoader(TEST_DATASET, batch_size=32, shuffle=False, drop_last=True)

len(TRAIN_DATASET), len(VALID_DATASET), len(TEST_DATASET)

### 3. 신경망 생성하기
------

In [8]:
import pytorch_lightning as pl

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim.adam import Adam

# 특성들을 조합시키고, 크기를 조금씩 감소시키기 위한 레이어
class Basic_ResNet_Downsample_Layer(nn.Module) :
  def __init__(self, in_channels, out_channels, kernel_width, padding_width) :
    super().__init__()
    
    self.CONV_1 = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=(3, kernel_width), padding=(1, padding_width))
    self.CONV_2 = nn.Conv2d(in_channels=out_channels, out_channels=out_channels, kernel_size=(3, 3), padding=1)

    self.BN_1 = nn.BatchNorm2d(num_features=out_channels)
    self.BN_2 = nn.BatchNorm2d(num_features=out_channels)

    self.LERU = nn.ReLU()

    self.CONV_DOWN_SAMPLE = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=(3, kernel_width), padding=(1, padding_width))

    self.LAYER_SEQ = nn.Sequential(
      self.CONV_1, self.BN_1, self.LERU,
      self.CONV_2, self.BN_2
    )
  
  def forward(self, x) :
    x = self.LAYER_SEQ(x) + self.CONV_DOWN_SAMPLE(x)
    x = self.LERU(x)
    return x

# 순서가 있는 특성을 추출하기 위한 레이어
class Basic_Self_Attension_Skip_Layer(nn.Module) :
  def __init__(self, input_size) :
    super().__init__()

    self.ATTENTION = torch.nn.MultiheadAttention(input_size, 8)
    self.BN = nn.BatchNorm1d(num_features=32)
  
  def forward(self, x) :
    x_ = x
    x, _ = self.ATTENTION(x, x, x)
    x = self.BN(x)
    return x + x_

class LSTM_Module(pl.LightningModule) :
  def __init__(self) :
    super(LSTM_Module, self).__init__()

    res_net_layers = []
    res_net_layers.append(Basic_ResNet_Downsample_Layer(in_channels=3, out_channels=32, kernel_width=16, padding_width=7))
    for layer_index in range(6+1) :
      res_net_layers.append(Basic_ResNet_Downsample_Layer(in_channels=32, out_channels=32, kernel_width=15-layer_index*2, padding_width=6-layer_index))
    self.RES_NET_DOWNSAMPLE_SEQ = nn.Sequential(*res_net_layers)

    attenion_layers = [Basic_Self_Attension_Skip_Layer(32) for _ in range(3)]
    self.SELF_ATTENTION_SKIP_SEQ = nn.Sequential(*attenion_layers)

    self.FC_1 = nn.Linear(960, 480)
    self.FC_2 = nn.Linear(480, 1)
    self.LERU = nn.ReLU()
    self.FC_LAYER_SEQ = nn.Sequential(self.FC_1, self.LERU, self.FC_2)
  
  def forward(self, x) :
    x = self.RES_NET_DOWNSAMPLE_SEQ(x)

    x = x.view(x.shape[0], 32, -1) 
    x = x.permute(2, 0, 1)

    x = self.SELF_ATTENTION_SKIP_SEQ(x)
    x = x.permute(1, 0, 2)
    x = torch.reshape(x, (x.shape[0], -1))

    x = self.FC_LAYER_SEQ(x)
    return x
  

  def training_step(self, batch, batch_idx) :
    LOSS = self.__calc_Loss(batch)
    self.log('train_loss', LOSS, on_epoch=True, prog_bar=True)
    return LOSS
  
  def validation_step(self, batch, batch_idx) :
    self.log('valid_loss', self.__calc_Loss(batch), on_epoch=True, prog_bar=True)
  
  def test_step(self, batch, batch_idx) :
    self.log('test_loss', self.__calc_Loss(batch), on_epoch=True, prog_bar=True)
  
  def predict_step(self, batch, batch_idx) :
    X, Y = batch
    return self(X)


  def __calc_Loss(self, batch) :
    X, Y = batch
    PREDS = self(X).flatten()
    return nn.MSELoss()(PREDS, Y)
  

  def configure_optimizers(self):
      return Adam(self.parameters(), lr=1e-4)

### 4. 신경망 학습하기
-----

In [None]:
%load_ext tensorboard
%tensorboard --logdir lightning_logs/

In [None]:
from pytorch_lightning.callbacks import ModelCheckpoint

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
CHECK_POINT_CALLBACK = ModelCheckpoint(dirpath="./check_points", monitor="valid_loss", mode="min", filename="lstm-{epoch:02d}-{valid_loss:.10f}")
LOGGER = pl.loggers.TensorBoardLogger(name=f'LSTM_MODEL', save_dir='lightning_logs')

TRAINER = pl.Trainer(max_epochs=300, accelerator=DEVICE, callbacks=[CHECK_POINT_CALLBACK], logger=LOGGER)


LSTM_MODULE = LSTM_Module()
TRAINER.fit(LSTM_MODULE, train_dataloaders=TRAIN_LOADER, val_dataloaders=VALID_LOADER)

### 5. 모델 테스트하기
-----

In [None]:
MODULE_TO_TEST = LSTM_Module.load_from_checkpoint("/content/check_points/lstm-epoch=228-valid_loss=0.0031873265.ckpt")
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
TRAINER = pl.Trainer(accelerator=DEVICE)

TRAINER.test(MODULE_TO_TEST, dataloaders=TRAIN_LOADER)
TRAINER.test(MODULE_TO_TEST, dataloaders=VALID_LOADER)
TRAINER.test(MODULE_TO_TEST, dataloaders=TEST_LOADER)

### 7. 모델로 실예측 해보기
-----

In [12]:
import statistics
import plotly.express as px
import plotly.graph_objs as go

# 데이터로더루부터 예측한 값 및 정답을 1차원 형태로 추출하기 위해서
def predict_From_Dataloader(trainer, model_to_pred, dataloader) :
  PREDS = trainer.predict(model_to_pred, dataloaders=dataloader)
  FLATTEN_PREDS = torch.cat([pred.flatten() for pred in PREDS]).tolist()
  FLATTEN_LABELS = dataloader.dataset[:][1].flatten().tolist()
  return FLATTEN_PREDS, FLATTEN_LABELS

# 훈련 데이터와 예측된 값을 통해서 조정값을 얻기위해서
def get_Adjust_Values(train_preds, train_labels) :
  ADJUST_MEAN = statistics.mean(train_preds)
  ADJUST_STD_DEV = statistics.stdev(train_labels)/statistics.stdev(train_preds)
  return ADJUST_MEAN, ADJUST_STD_DEV

# 학습데이터를 기반으로 얻어진 정규화 값을 이용해서 값을 맞춰주기 위해서
def adjust_Preds(adjust_mean, adjust_std_dev, preds_to_adjust) :
  return list(map(lambda e : (e - adjust_mean)*adjust_std_dev, preds_to_adjust))

# 얻어진 예측, 정답 값을 그래프 형태로 출력시키기 위해서
def show_Graphs(preds, preds_name, labels, labels_name) :
  figures = []

  figures.append(go.Scatter(
    y=preds,
    name=preds_name
  ))
  figures.append(go.Scatter(
    y=labels,
    name=labels_name
  ))

  go.Figure(figures).show()

In [None]:
MODULE_TO_PRED = LSTM_Module.load_from_checkpoint("/content/check_points/lstm-epoch=228-valid_loss=0.0031873265.ckpt")
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
TRAINER = pl.Trainer(accelerator=DEVICE)

In [None]:
TRAIN_PREDS, TRAIN_LABELS = predict_From_Dataloader(TRAINER, MODULE_TO_PRED, TRAIN_LOADER)
ADJUST_MEAN, ADJUST_STD_DEV = get_Adjust_Values(TRAIN_PREDS, TRAIN_LABELS)
ADJUST_MEAN, ADJUST_STD_DEV

In [None]:
VALID_PREDS, VALID_LABELS = predict_From_Dataloader(TRAINER, MODULE_TO_PRED, VALID_LOADER)
TEST_PREDS, TEST_LABELS = predict_From_Dataloader(TRAINER, MODULE_TO_PRED, TEST_LOADER)

ADJUST_TRAIN_PREDS = adjust_Preds(ADJUST_MEAN, ADJUST_STD_DEV, TRAIN_PREDS)
ADJUST_VALID_PREDS = adjust_Preds(ADJUST_MEAN, ADJUST_STD_DEV, VALID_PREDS)
ADJUST_TEST_PREDS = adjust_Preds(ADJUST_MEAN, ADJUST_STD_DEV, TEST_PREDS)

In [None]:
ADJUST_MEAN, ADJUST_STD_DEV

In [None]:
MAX_SHOW_INDEX = 1000
show_Graphs(ADJUST_TRAIN_PREDS[:MAX_SHOW_INDEX], "adjust_train_preds", TRAIN_LABELS[:MAX_SHOW_INDEX], "train_labels")
show_Graphs(ADJUST_VALID_PREDS[:MAX_SHOW_INDEX], "adjust_valid_preds", VALID_LABELS[:MAX_SHOW_INDEX], "valid_labels")
show_Graphs(ADJUST_TEST_PREDS[:MAX_SHOW_INDEX], "adjust_test_preds", TEST_LABELS[:MAX_SHOW_INDEX], "test_labels")

### 8. 예측을 통한 투자시뮬레이션 해보기
-----

In [25]:
from torch.utils.data import TensorDataset, DataLoader
import numpy as np
import math

class Predict_Next_Percentage :
  def __init__(self, model_path) :
    self.MODEL = LSTM_Module.load_from_checkpoint(model_path)
    DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
    self.TRAINER = pl.Trainer(accelerator=DEVICE, enable_progress_bar=False, logger=False)

  # 주어진 데이터 프레임들을 전처리시킨 결과를 반환하기 위해서
  def processing_Raw_Stock_Data(self, chart_train_datas, used_prev_days) :
    datas = chart_train_datas.drop(["Open", "High", "Low", "Close"], axis=1).values.tolist()

    x_datas = []
    for data_index in range(len(datas)-used_prev_days+1) :
      DATAS_CHANNEL = np.array(datas[data_index:data_index+used_prev_days], dtype=np.float32)
      x_datas.append(torch.tensor(np.stack([DATAS_CHANNEL, DATAS_CHANNEL, DATAS_CHANNEL])))
    return x_datas
  
  # 다음 퍼센테이지들을 순차적으로 예측시키기 위해서
  def predict_Next_Percentages(self, chart_train_datas, used_prev_days, adjust_mean, adjust_std_dev, unit=32) :
      PROCEED_STOCK_DATAS = self.processing_Raw_Stock_Data(chart_train_datas, used_prev_days)


      DATA_PREDICT_LEN = len(PROCEED_STOCK_DATAS)
      BATCH_DATA_PREDICT_LEN = math.ceil(DATA_PREDICT_LEN/unit)*unit

      x_batch_datas = [proceed_stock_data for proceed_stock_data in PROCEED_STOCK_DATAS]
      while len(x_batch_datas) != BATCH_DATA_PREDICT_LEN :
        x_batch_datas.append(PROCEED_STOCK_DATAS[-1])
        

      DATAS_STACK = torch.stack(x_batch_datas)
      DATASET = TensorDataset(DATAS_STACK, torch.zeros(BATCH_DATA_PREDICT_LEN))
      DATA_LODAER = DataLoader(DATASET, batch_size=unit)
      
      PREDS = self.TRAINER.predict(self.MODEL, dataloaders=DATA_LODAER)
      FLATTEN_PREDS = torch.stack(PREDS).flatten().tolist()[:DATA_PREDICT_LEN]
      return [(pred - adjust_mean)*adjust_std_dev for pred in FLATTEN_PREDS]

In [26]:
USED_PREV_DAYS = 30 # 사용되는 이전 날짜
DATA_UNIT = 32 # 배치가 되는 데이터의 단위
PREV_AFTER_DAYS = 7 # 예측하는 변화율의 총 시간

In [None]:
import pandas as pd

CHART_DATAS = pd.read_csv("/content/Huge_Stock_Market_Dataset/Stocks/a.us.txt")[:2656]
# CHART_DATAS = pd.read_csv("/content/Huge_Stock_Market_Dataset/Stocks/a.us.txt")[2656:]

CHART_DATAS = CHART_DATAS.loc[:, ["Open", "High", "Low", "Close"]] + 1e-6
CHART_DATAS = Raw_Stock_Data_Processor.add_Technic_Indexes(CHART_DATAS)
CHART_DATAS = CHART_DATAS[30:]

PREDICT_NEXT_PERCENTAGE = Predict_Next_Percentage("/content/check_points/lstm-epoch=228-valid_loss=0.0031873265.ckpt")
PERCENTAGE_PREDS = PREDICT_NEXT_PERCENTAGE.predict_Next_Percentages(CHART_DATAS, USED_PREV_DAYS, ADJUST_MEAN, ADJUST_STD_DEV, DATA_UNIT)

In [34]:
SLICED_BEHAVE_PREDS = np.array(PERCENTAGE_PREDS[:-PREV_AFTER_DAYS:PREV_AFTER_DAYS]) > 0.00
SLICED_ADVANTAGES = ((CHART_DATAS["Close"].shift(-PREV_AFTER_DAYS+1) - CHART_DATAS["Open"]) / CHART_DATAS["Open"]).dropna().tolist()[USED_PREV_DAYS+1::PREV_AFTER_DAYS]
SIMULATED_REVENU = np.cumsum(SLICED_ADVANTAGES * SLICED_BEHAVE_PREDS)

In [None]:
import plotly.express as px
import plotly.graph_objs as go
from plotly.subplots import make_subplots

CLOSE_GRAPH = CHART_DATAS["Close"].tolist()[USED_PREV_DAYS::PREV_AFTER_DAYS]
HAVE_REVEN_GRAPH = [0] + np.cumsum(SLICED_ADVANTAGES)
SIMUL_REVEN_GRAPH = [0] + SIMULATED_REVENU.tolist()


figures = make_subplots(specs=[[{"secondary_y": True}]])

figures.add_trace(go.Scatter(y=CLOSE_GRAPH, name="Close"), secondary_y=False)
figures.add_trace(go.Scatter(y=HAVE_REVEN_GRAPH, name="have_revenu"), secondary_y=True)
figures.add_trace(go.Scatter(y=SIMUL_REVEN_GRAPH, name="simul_revenu"), secondary_y=True)

go.Figure(figures).show()