In [2]:
# Library
import sys, os
sys.path.append(os.path.abspath('..'))

from hdf5_loader import StockDatasetHDF5
from config import *
import subclass as sc

import pandas as pd
import numpy as np
import seaborn as sns
from tqdm import tqdm
import time
import matplotlib.pyplot as plt
from collections import defaultdict, OrderedDict
from datetime import datetime, timedelta
import os
import gc
import re
import h5py
import random
import importlib
import shutil

import torch
import torch.nn as nn
from torch.utils.data import IterableDataset, DataLoader
import torch.nn.functional as F

np.set_printoptions(precision=4, suppress=True, linewidth=120)

In [2]:
import gymnasium as gym
from gymnasium.spaces import Dict, Box, Discrete

In [3]:
def initialize_log_dir(base_log_dir="./tensorboard_logs"):
    if os.path.exists(base_log_dir):
        shutil.rmtree(base_log_dir)  # 기존 로그 디렉토리 삭제
    os.makedirs(base_log_dir, exist_ok=True)  # 새 디렉토리 생성
    return os.path.join(base_log_dir, datetime.now().strftime("%Y%m%d-%H%M%S"))

---

In [4]:
class StockTradingEnv(gym.Env):
    def __init__(self, hz_dim, targ_hz, ticker_list, date_range, reward_weight=None, render_mode=None, **kwargs):
        super().__init__(**kwargs)
        self.hz_dim = hz_dim
        self.targ_hz = targ_hz
        self.ticker_list = ticker_list
        self.date_range = date_range
        self.reward_weight = reward_weight if reward_weight else {hz:0.2 for hz in THZ}
        
        self.tout = 0
        self.tin = 0
        self.position = 0
        
        self.hdf5_inst = StockDatasetHDF5(
            ticker_list=ticker_list,
            date_range=date_range,
        )
        self.envgen = sc.get_samples(self.hdf5_inst, hz_dim, targ_hz)
        
        self.observation_space = Dict(
            {hz: Box(-100, 100, shape=( sc.FEATURE_NUM, hz_dim[hz]), dtype=np.float32) for hz in THZ}
        )
        self.action_space = Discrete(2)

        assert render_mode is None or render_mode in self.metadata["render_modes"]
        self.render_mode = render_mode
        
    def _get_obs(self):  # return s_t
        try:
            state, label = next(self.envgen)
            if state:
                self.current_price = state.pop('current_price')
            return state, label
        except StopIteration: return -1

    def _get_info(self):  # return aux. infos
        return dict()
        
    def reset(self, seed=None, options=None):
        super().reset(seed=seed)

        self.tout = 0
        self.tin = 0
        self.position = 0
        observation, _ = self._get_obs()
        info = dict()
        
        # if self.render_mode == "human":
        #     self._render_frame()

        return observation, info

    
    def step(self, action):
        def _get_reward(label):
            reward = sum([np.mean(label[hz])*self.reward_weight for hz in THZ])
            return reward
            
        terminated = False
        
        if self.position == 0 and action == 0:
            self.tout += 1
        elif self.position == 0 and action == 1:
            self.enter_price = self.current_price
            self.position = 1
        elif self.position == 1 and action == 1:
            self.tin += 1
        elif self.position == 1 and action == 0:
            terminated = True
            
        observation, label = self._get_obs()
        info = self._get_info()
        reward = _get_reward(label)
        truncated = (observation == 0)
        
        return observation, reward, terminated, truncated, info
    
    
    def render(self):
        pass
    
    
    def close(self):
        pass

In [5]:
class CustomAddLayer(nn.Module):
    def __init__(self, in_dim, out_dim):
        super(CustomAddLayer, self).__init__()
        self.affine = nn.Linear(in_dim, out_dim)
        self.relu = nn.ReLU()
        
    def forward(self, cur, next):
        cur = self.affine(cur)
        cur = torch.add(cur, next)
        cur = self.relu(cur)
        
        return cur

In [6]:
class CustomCNN(nn.Module):
    def __init__(self, hz_dim:dict, hz_order:list):
        super(CustomCNN, self).__init__()
        
        def get_conv1d_output_length(conv_layer:nn.Conv1d, input_length):
            return (input_length + 2 * conv_layer.padding[0] 
                    - conv_layer.dilation[0] * (conv_layer.kernel_size[0] - 1) 
                    - 1) // conv_layer.stride[0] + 1
            
        self.hzs = hz_order
        extractors = {}; hz_latent_dim = {}
        
        #region extractors
        for hz, feat_dim in hz_dim.items():
            layers = []
            for layer_num in range(5):
                layers.append((f"{hz}_depth_conv{layer_num}", 
                               nn.Conv1d(feat_dim, feat_dim, groups=feat_dim, kernel_size=5, stride=2, padding=2)))
                layers.append((f"{hz}_batch_norm{layer_num}-1", nn.BatchNorm1d(feat_dim)))
                layers.append((f"{hz}_relu{layer_num}-1", nn.ReLU()))
                # feat_dim *= 2
                
                layers.append((f"{hz}_point_conv{layer_num}", nn.Conv1d(feat_dim, feat_dim*2, kernel_size=1)))
                layers.append((f"{hz}_batch_norm{layer_num}-2", nn.BatchNorm1d(feat_dim*2)))
                layers.append((f"{hz}_relu{layer_num}-2", nn.ReLU()))
                
                feat_dim *= 2
                
            layers.append((f"{hz}_avg_pool", nn.AdaptiveAvgPool1d(1)))
            layers.append((f"{hz}_flatten", nn.Flatten()))
            layers = OrderedDict(layers)
            
            extractors[hz] = nn.Sequential(layers)  # -> (latent_dim)
            hz_latent_dim[hz] = feat_dim
        #regionend

        self.extractors = nn.ModuleDict(extractors)
        
        self.mergers = dict()
        for hz_idx in range(len(self.hzs)-1):
            cur_hz, next_hz = self.hzs[hz_idx], self.hzs[hz_idx+1]
            self.mergers[cur_hz] = CustomAddLayer(hz_latent_dim[cur_hz], hz_latent_dim[next_hz])

        # Update the features dim manually
        self.features_dim = hz_latent_dim[next_hz]
        
        self.actor = nn.Sequential(
            nn.Linear(self.features_dim, 2)
        )
        self.critic = nn.Sequential(
            nn.Linear(self.features_dim, 1)
        )
        

    def forward(self, observations) -> tuple[torch.Tensor, torch.Tensor]:
        latent_tensors = dict()
        for hz in self.hzs:
            latent_tensors[hz] = self.extractors[hz](observations[hz])
        
        state = latent_tensors[self.hzs[0]]
        for hz_idx in range(len(self.hzs)-1):
            state = self.mergers[self.hzs[hz_idx]](state, latent_tensors[self.hzs[hz_idx+1]])
        
        actor_out = self.actor(state)
        critic_out = self.critic(state)
        return actor_out, critic_out

In [None]:
class CustomA2C:
    def __init__(self, env:gym.Env, model:nn.Module):
        self.env = env  
        self.model = model
        
        
    def episode(self):
        def step(state):
            logits, value = self.model(state)
            prob = F.softmax(logits, dim=-1)
            action = torch.multinomial(prob, 1).item()
            
            next_state, reward, terminated, truncated, info = self.env.step(action)
            
            log_prob = F.log_softmax(logits, dim=-1)[0, action]
            log_probs.append(log_prob)
            values.append(value)
            rewards.append(reward)
            
            return next_state, terminated or truncated
            
        log_probs = []
        values = []
        rewards = []
        
        state = self.env.reset()[0]
        while True:
            next_state, done = step(state)
            state = next_state
            
            if done: break

In [9]:
ticker_list=[
    'AAPL',  # Apple Inc.
    'MSFT',  # Microsoft Corporation
    'GOOGL', # Alphabet Inc. (Google)
    'META',  # Meta Platforms, Inc. (Facebook)
    'IBM',   # International Business Machines Corporation
    'INTC',  # Intel Corporation
]
date_range=[ST, ED]
hz_dim = {hz:128 for hz in THZ}
targ_hz = '5m'

In [None]:
hdf5_inst = sc.StockDatasetHDF5(ticker_list, date_range)