# Transformer-Based Stock Price Prediction(Custom)

- 해당 파일은 Custom 데이터를 이용하여 자유롭게 파인튜닝할 수 있도록 설계되었습니다.

# 0. Import Libraries

# 1. Load Pre-trained Model

In [5]:
import numpy as np
import torch
import torch.nn as nn

# 파라미터 (수정 X)
context_length = 120 # 모델이 참조할 데이터 길이 (original: 512)
forecast_horizon = 20 # 모델이 예측할 데이터 길이 (original: 96)
patch_length = 15 # 모델의 patch 길이. context_length의 약수여야 한다. (original: 16)
num_workers = 1 # dataloader worker 개수

from transformers import (
    EarlyStoppingCallback,
    PatchTSMixerConfig,
    PatchTSMixerForPrediction,
    Trainer,
    TrainingArguments,
)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# PatchTSMixer Model
config = PatchTSMixerConfig(
    context_length=context_length,
    prediction_length=forecast_horizon,
    patch_length=patch_length,
    num_input_channels=6, # close, high, low, open, volume, stock_log_return
    patch_stride=patch_length,
    d_model=16,
    num_layers=8,
    expansion_factor=2,
    dropout=0.2,
    head_dropout=0.2,
    mode="common_channel",
    scaling="std",
    prediction_channel_indices=[5] # 'stock_log_return'만을 예측하도록 하자.
)

ts_model = PatchTSMixerForPrediction(config)
ts_model.to(device)

# Custom Model
class CustomModel(nn.Module):
    def __init__(self, ts_model=ts_model):
        super().__init__()
        self.ts_model = ts_model
        self.fc = nn.Linear(forecast_horizon, 1)

    def forward(self, past_values):
        prediction = self.ts_model(past_values=past_values)
        x = prediction.prediction_outputs.squeeze()
        x = self.fc(x)
        return x

model = CustomModel()
model.to(device)

CustomModel(
  (ts_model): PatchTSMixerForPrediction(
    (model): PatchTSMixerModel(
      (encoder): PatchTSMixerEncoder(
        (patcher): Linear(in_features=15, out_features=16, bias=True)
        (mlp_mixer_encoder): PatchTSMixerBlock(
          (mixers): ModuleList(
            (0-7): 8 x PatchTSMixerLayer(
              (patch_mixer): PatchMixerBlock(
                (norm): PatchTSMixerNormLayer(
                  (norm): LayerNorm((16,), eps=1e-05, elementwise_affine=True)
                )
                (mlp): PatchTSMixerMLP(
                  (fc1): Linear(in_features=8, out_features=16, bias=True)
                  (dropout1): Dropout(p=0.2, inplace=False)
                  (fc2): Linear(in_features=16, out_features=8, bias=True)
                  (dropout2): Dropout(p=0.2, inplace=False)
                )
                (gating_block): PatchTSMixerGatedAttention(
                  (attn_layer): Linear(in_features=8, out_features=8, bias=True)
                  (attn_s

In [6]:
check_point = "ckpt_TS/model_01/ckpt-7.pt" # 불러오고자 하는 모델을 여기에!

ckpt = torch.load(check_point)
model.load_state_dict(ckpt['model'])

<All keys matched successfully>

# 2. Fine-tuning

In [67]:
from torch.optim import AdamW

# 하이퍼 파라미터
epochs = 1
learning_rate=0.001

# optimizer
optimizer = AdamW(model.parameters(), lr=learning_rate)

# loss function
loss_fn = nn.MSELoss()

In [69]:
from tqdm.auto import tqdm
import json

train_loss_list = []
valid_loss_list = []
TRAIN_INPUT_LIST = [] # 데이터는 리스트 안에 들어있어야 되며, 각 원소는 (batch_size, 120, 5) 형태의 텐서여야 한다.
    # 만약 batch_size=1로 한다면, 각 데이터에는 unsqueeze(0)을 적용해주어야 한다.
TRAIN_LABEL_LIST = [] # 데이터는 리스트 안에 들어있어야 하며, 각 원소는 (batch_size) 형태의 텐서여야 한다. (각 원소는 stock_log_return 20개의 합이다.)
VALID_INPUT_LIST = []
VALID_LABEL_LIST = []

for epoch in range(1, epochs+1):
    # 훈련
    model.train()
    progress_bar = tqdm(range(len(TRAIN_INPUT_LIST)))
    train_loss = 0
    for input, label in zip(TRAIN_INPUT_LIST, TRAIN_LABEL_LIST):
        past_values = input.to(device)
        future_values = label.to(device)

        outputs = model(past_values)
        loss = loss_fn(outputs.squeeze(), future_values.squeeze())
        loss.backward()

        optimizer.step()
        optimizer.zero_grad()
        progress_bar.update(1)

        train_loss += loss

    # 검증
    model.eval()
    progress_bar = tqdm(range(len(VALID_INPUT_LIST)))
    valid_loss = 0
    for input, label in zip(VALID_INPUT_LIST, VALID_LABEL_LIST):
        past_values = input.to(device)
        future_values = label.to(device)
        
        outputs = model(past_values)
        loss = loss_fn(outputs.squeeze(), future_values.squeeze())

        progress_bar.update(1)

        valid_loss += loss

    train_loss /= len(TRAIN_INPUT_LIST)
    valid_loss /= len(VALID_INPUT_LIST)
    
    train_loss_list.append(train_loss)
    valid_loss_list.append(valid_loss)
    print(f"[epoch {epoch}] train loss: {train_loss:.7f}, valid loss: {valid_loss:.7f}")

    folder_path = 'ckpt_TS/model_01' # 저장하고자 하는 파일 명을 입력!
    os.makedirs(folder_path, exist_ok=True)
    torch.save({
        'model': model.state_dict(),
        'optimizer': optimizer.state_dict(),
        'epoch': epoch
    }, os.path.join(folder_path, f'ckpt-{epoch}.pt'))

data = {'train_loss_list': [loss.item() for loss in train_loss_list],
        'valid_loss_list': [loss.item() for loss in valid_loss_list],
        'best_epoch': int(np.argmin([loss.item() for loss in valid_loss_list])),
       }

# JSON 파일로 저장
with open('ckpt_TS/model_01/loss.json', 'w') as json_file: # 같은 저장 공간에 loss 데이터를 json 파일 형식으로 저장!
    json.dump(data, json_file)

  0%|          | 0/115 [00:00<?, ?it/s]

KeyboardInterrupt: 