# GP2 Trading Model

### Imports

In [2]:
from data_processor import DataProcessor
from plot import backtest_stats, backtest_plot, get_baseline, get_daily_return, drop_dup_dates
from pprint import pprint
import numpy as np
import pandas as pd
import pickle
import os



In [None]:
from config_private import ALPACA_API_KEY, ALPACA_API_SECRET
API_BASE_URL = 'https://paper-api.alpaca.markets'
from config_tickers import GP2_TICKERS
from config import INDICATORS
from config import CDL

### Environment

In [None]:
import gymnasium as gym
import numpy as np
from numpy import random as rd


class StockTradingEnv(gym.Env):
    def __init__(
        self,
        config,
        gamma=0.99,
        turbulence_thresh=99,
        max_stock=None,
        min_stock_rate=0.1,
        initial_capital=1e5,
        reward_scaling=2**-11,
        initial_stocks=None,
    ):
        price_ary = config["price_array"]
        tech_ary = config["tech_array"]
        turbulence_ary = config["turbulence_array"]
        date_ary = config["date_array"]
        if_train = config["if_train"]
        self.price_ary = price_ary.astype(np.float32)
        self.tech_ary = tech_ary.astype(np.float32)
        self.turbulence_ary = turbulence_ary
        self.date_ary = date_ary

        self.tech_ary = self.tech_ary * 2**-7
        self.turbulence_bool = (turbulence_ary > turbulence_thresh).astype(np.float32)
        self.turbulence_ary = (
            self.sigmoid_sign(turbulence_ary, turbulence_thresh) * 2**-5
        ).astype(np.float32)

        stock_dim = self.price_ary.shape[1]
        self.gamma = gamma
        self.max_stock = max_stock
        self.min_stock_rate = min_stock_rate
        self.reward_scaling = reward_scaling
        self.initial_capital = initial_capital
        self.initial_stocks = (
            np.zeros(stock_dim, dtype=np.float32)
            if initial_stocks is None
            else initial_stocks
        )
        
        # reset()
        self.current_step = None
        self.num_trades = None
        self.cash = None
        self.stocks = None
        self.total_assets = None
        self.gamma_reward = None
        self.initial_total_assets = None

        # environment information
        self.env_name = "StockEnv"
        self.state_dim = 109 # Size of get_state() array 
        self.action_dim = stock_dim
        self.max_step = self.price_ary.shape[0] - 1
        self.if_train = if_train
        self.if_discrete = False
        self.episode_return = 0.0
        self.observation_space = gym.spaces.Box(
            low=-5000, high=5000, shape=(self.state_dim,), dtype=np.float32
        )
        self.action_space = gym.spaces.Box(
            low=-1, high=1, shape=(self.action_dim,), dtype=np.float32
        )
        
    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.current_step = 0
        self.num_trades = 0
        price = self.price_ary[self.current_step]
        
        if self.if_train:
            self.stocks = (
                self.initial_stocks + rd.randint(0, 17, size=self.initial_stocks.shape)
            ).astype(np.float32)
            self.stocks_cool_down = np.zeros_like(self.stocks)
            self.cash = (
                self.initial_capital * rd.uniform(0.95, 1.05)
                - (self.stocks * price).sum()
            )
        else:
            self.stocks = self.initial_stocks.astype(np.float32)
            self.stocks_cool_down = np.zeros_like(self.stocks)
            self.cash = self.initial_capital
        
        self.total_assets = self.cash + (self.stocks * price).sum()
        self.initial_total_assets = self.total_assets
        self.gamma_reward = 0.0
        observation = self.get_state(price)
        info = {}
        return observation, info

    def step(self, action):
        self.current_step += 1
        price = self.price_ary[self.current_step]
        self.max_stock = np.round(np.floor(100_000 / price)).astype(int)
        action = np.round((action * self.max_stock)).astype(int)
        min_action = np.round((self.max_stock * self.min_stock_rate)).astype(int)
        self.stocks_cool_down += 1
        
        if self.turbulence_bool[self.current_step] == 0:
            
            # Sell Logic
            for index in np.where((action < -min_action) & (self.stocks_cool_down > 0))[0]:
                if price[index] > 0:
                    sell_num_shares = min(self.stocks[index], -action[index])
                    sell_value = price[index] * sell_num_shares
                    self.stocks[index] -= sell_num_shares
                    self.cash += sell_value
                    self.stocks_cool_down[index] = 0
                    self.num_trades += 1

            # Buy Logic
            for index in np.where((action > min_action) & (self.stocks_cool_down > 0))[0]:
                if price[index] > 0:
                    buy_num_shares = min(self.cash // price[index], action[index])
                    buy_value = price[index] * buy_num_shares
                    self.stocks[index] += buy_num_shares
                    self.cash -= buy_value
                    self.stocks_cool_down[index] = 0
                    self.num_trades += 1

        # turbulence logic
        else:
            self.cash += (self.stocks * price).sum()
            self.num_trades += np.count_nonzero(self.stocks)
            self.stocks[:] = 0
            self.stocks_cool_down[:] = 0
        
        # Reward Calculations
        observation = self.get_state(price)
        total_assets = self.cash + (self.stocks * price).sum()
        reward = (total_assets - self.total_assets) * self.reward_scaling
        self.total_assets = total_assets
        self.gamma_reward = self.gamma_reward * self.gamma + reward
        terminated = self.current_step == self.max_step
        truncated = False
        info = {}
        if terminated:
            reward = self.gamma_reward
            self.episode_return = total_assets / self.initial_total_assets

        return observation, reward, terminated, truncated, info

    def get_state(self, price):
        cash = np.array(self.cash * (2**-12), dtype=np.float32)
        scale = np.array(2**-6, dtype=np.float32)
        observation = np.hstack(
            (
                cash,
                price * scale,
                self.stocks * scale,
                self.stocks_cool_down,
                self.tech_ary[self.current_step],
            )
        )
        # print(len(observation))
        return observation

    @staticmethod
    def sigmoid_sign(ary, thresh):
        def sigmoid(x):
            return 1 / (1 + np.exp(-x * np.e)) - 0.5

        return sigmoid(ary / thresh) * thresh

### DRL Agent Class

In [None]:
import ray

from ray import tune
from ray.tune.schedulers.pb2 import PB2 # Dependencies: pip install GPy sklearn
from ray.rllib.algorithms import Algorithm
from ray.tune import register_env

from ray.air import RunConfig, FailureConfig
from ray.tune.tune_config import TuneConfig
from ray.air.config import CheckpointConfig
from ray.tune.callback import Callback

from typing import Dict, Optional, Any, List, Union


class DRLlibv2:
    """
    It instantiates RLlib model with Ray tune functionality
    Params
    -------------------------------------
    trainable:
        Any Trainable class that takes config as parameter
    train_env:
        Training environment instance
    train_env_name: str
        Name of the training environment
    params: dict
        hyperparameters dictionary
    run_name: str
        tune run name
    framework: str
        "torch" or "tf" for tensorflow
    local_dir: str
         to save the results and tensorboard plots
    num_workers: int
        number of workers
    num_envs_per_worker: int
        number of vectorized environments per worker
    restart_failed_sub_environments: bool
        try to restart faulty sub-environments
    recreate_failed_workers: bool
        try to recreate failed workers
    num_samples: int
         Number of samples of hyperparameters config to run
    scheduler:
        Stopping suboptimal trials
    log_level: str = "WARN",
        Verbosity: "DEBUG"
    num_gpus: Union[float, int] = 1
        GPUs for trial
    num_cpus: Union[float, int] = 20
        CPUs for rollout collection
    dataframe_save: str
        Saving the tune results
    metric: str
        Metric for hyperparameter optimization in Bayesian Methods
    mode: str
        Maximize or Minimize the metric
    max_failures: int
        Number of failures to TuneError
    timeout: int
        Number of seconds to run the experiment
    checkpoint_num_to_keep: int
        Number of checkpoints to keep
    checkpoint_freq: int
        Checkpoint freq wrt training iterations
    reuse_actors:bool
        Reuse actors for tuning
    callbacks:
        callbacks integration for ray tune

    It has the following methods:
    Methods
    -------------------------------------
        train_tune_model: It takes in the params dictionary and fits in sklearn style to our trainable class
        restore_agent: It restores previously errored or stopped trials or experiments
        infer_results: It returns the results dataframe and trial informations
        get_test_agent: It returns the testing agent for inference

    Example
    ---------------------------------------
    def sample_ppo_params():
        return {
            "entropy_coeff": tune.loguniform(0.00000001, 0.1),
            "lr": tune.loguniform(5e-5, 0.001),
            "sgd_minibatch_size": tune.choice([ 32, 64, 128, 256, 512]),
            "lambda": tune.choice([0.1,0.3,0.5,0.7,0.9,1.0]),
        }
    drl_agent = DRLlibv2(
        trainable="PPO",
        train_env=train_env_instance,
        train_env_name="StockTradingEnv",
        framework="torch",
        log_level="WARN",
        run_name='gp2_train_mlp',
        local_dir="gp2_train_mlp",
        params=sample_ppo_params(),
        num_workers=16,
        num_envs_per_worker=32,
        restart_failed_sub_environments=True,
        recreate_failed_workers=True,
        num_samples=16,
        num_gpus=1,
        num_cpus=20,
        timeout=3600,
        checkpoint_num_to_keep=16,
        checkpoint_freq=5,
        scheduler=pb2,
    )
    #Tune or train the model
    res = drl_agent.train_tune_model()

    #Get the tune results
    results_df, best_result = drl_agent.infer_results()

    #Get the best testing agent
    test_agent = drl_agent.get_test_agent(test_env_instance,'StockTrading_testenv')
    """

    def __init__(
        self,
        trainable: Union[str, Any],
        params: Dict,
        train_env=None,
        train_env_name: str='',
        run_name: str = "tune_run",
        framework: str = "torch",
        local_dir: str = "tune_results",
        num_workers: int = 16,
        num_envs_per_worker: int = 32,
        restart_failed_sub_environments: bool = True,
        recreate_failed_workers: bool = True,
        num_samples: int = 0,
        scheduler=None,
        log_level: str = "WARN",
        num_gpus: Union[float, int] = 1,
        num_cpus: Union[float, int] = 20,
        dataframe_save: str = "tune.csv",
        metric: str = "episode_reward_mean",
        mode: Union[str, List[str]] = "max",
        max_failures: int = 1,
        timeout: int = 3600,
        checkpoint_num_to_keep: Union[None, int] = None,
        checkpoint_freq: int = 0,
        reuse_actors: bool = True,
        callbacks:Optional[List["Callback"]]=None
    ):

        if train_env is not None: register_env(train_env_name, lambda config: train_env(config))
        
        self.params = params
        self.params["framework"] = framework
        self.params["log_level"] = log_level
        self.params["num_gpus"] = num_gpus
        self.params["num_workers"] = num_workers
        self.params["num_envs_per_worker"] = num_envs_per_worker
        self.params["restart_failed_sub_environments"] = restart_failed_sub_environments
        self.params["recreate_failed_workers"] = recreate_failed_workers
        self.params["env"] = train_env_name

        self.run_name = run_name
        self.local_dir = local_dir
        self.scheduler = scheduler
        self.num_samples = num_samples
        self.trainable = trainable
        if isinstance(self.trainable, str):
            self.trainable.upper()
        self.num_cpus = num_cpus
        self.num_gpus = num_gpus
        self.dataframe_save = dataframe_save
        self.metric = metric
        self.mode = mode
        self.max_failures = max_failures
        self.timeout = timeout
        self.checkpoint_freq = checkpoint_freq
        self.checkpoint_num_to_keep = checkpoint_num_to_keep
        self.reuse_actors = reuse_actors
        self.callbacks = callbacks

    def train_tune_model(self):
        """
        Tuning and training the model
        Returns the results object
        """
        ray.init(
            num_cpus=self.num_cpus, num_gpus=self.num_gpus, ignore_reinit_error=True
        )

        tuner = tune.Tuner(
            self.trainable,
            param_space=self.params,
            tune_config=TuneConfig(
                num_samples=self.num_samples,
                metric=self.metric,
                mode=self.mode,
                time_budget_s=self.timeout,
                reuse_actors=self.reuse_actors,
                scheduler=self.scheduler
            ),
            run_config=RunConfig(
                name=self.run_name,
                local_dir=self.local_dir,
                callbacks=self.callbacks,
                failure_config=FailureConfig(
                    max_failures=self.max_failures, fail_fast=False
                ),
                checkpoint_config=CheckpointConfig(
                    num_to_keep=self.checkpoint_num_to_keep,
                    checkpoint_score_attribute=self.metric,
                    checkpoint_score_order=self.mode,
                    checkpoint_frequency=self.checkpoint_freq,
                    checkpoint_at_end=True,
                ),
                verbose=3,
            ),
        )

        self.results = tuner.fit()
        
        return self.results

    def infer_results(self, to_dataframe: str = None, mode: str = "a"):
        """
        Get tune results in a dataframe and best results object
        """
        results_df = self.results.get_dataframe()

        if to_dataframe is None:
            to_dataframe = self.dataframe_save

        results_df.to_csv(to_dataframe, mode=mode)

        best_result = self.results.get_best_result()
        print("Best hyperparameters found were: ", best_result.config)
        
        return results_df, best_result

    def restore_agent(
        self,
        checkpoint_path: str = "",
        resume_unfinished: bool = True,
        resume_errored: bool = True,
        restart_errored: bool = False,
    ):
        """
        Restore errored or stopped trials
        """
        if checkpoint_path == "":
            checkpoint_path = self.results.get_best_result().checkpoint._local_path

        restored_agent = tune.Tuner.restore(
            checkpoint_path,
            restart_errored=restart_errored,
            resume_unfinished=resume_unfinished,
            resume_errored=resume_errored,
        )
        print(restored_agent)
        self.results = restored_agent.fit()

        return self.results

    def get_test_agent(self, test_env, test_env_name: str, checkpoint=None):
        """
        Get test agent
        """
        if test_env is not None:register_env(test_env_name, lambda config: test_env)

        if checkpoint is None:
            checkpoint = self.results.get_best_result().checkpoint

        testing_agent = Algorithm.from_checkpoint(checkpoint)

        return testing_agent

### Train Environment Function

In [None]:
def get_train_env(
    start_date,
    end_date,
    ticker_list,
    data_source,
    time_interval,
    technical_indicator_list,
    env,
    model_name,
    if_vix=True,
    if_cdl=True,
    **kwargs,
):

    # Create 'train_data' folder in the current working directory if it doesn't exist
    folder_path = os.path.join(os.getcwd(), "train_data")
    os.makedirs(folder_path, exist_ok=True)

    # Set the file paths within the 'train_data' folder
    data_file = os.path.join(folder_path, f"data_{start_date}_{end_date}.pkl")
    arrays_file = os.path.join(folder_path, f"arrays_{start_date}_{end_date}.pkl")

    if os.path.exists(data_file) and os.path.exists(arrays_file):
        # Load the saved data
        with open(data_file, "rb") as f:
            data = pickle.load(f)
        with open(arrays_file, "rb") as f:
            price_array, tech_array, turbulence_array, date_array = pickle.load(f)
    
    else:
        # download data
        dp = DataProcessor(data_source, **kwargs)
        data = dp.download_data(ticker_list, start_date, end_date, time_interval)
        data = dp.clean_data(data)
        data = dp.add_technical_indicator(data, technical_indicator_list)
        if if_cdl:
            data = dp.add_cdl(data)
        if if_vix:
            data = dp.add_vix(data)
        else:
            data = dp.add_turbulence(data)
        price_array, tech_array, turbulence_array, date_array = dp.df_to_array(data, if_vix, if_cdl)
        
        # Save the data and arrays
        with open(data_file, "wb") as f:
            pickle.dump(data, f)
        with open(arrays_file, "wb") as f:
            pickle.dump((price_array, tech_array, turbulence_array, date_array), f)
            
    train_env_config = {
        "price_array": price_array,
        "tech_array": tech_array,
        "turbulence_array": turbulence_array,
        "date_array": date_array,
        "if_train": True,
    }
    
    return env(train_env_config)

### Test Environment Function

In [None]:
def get_test_env(
    start_date,
    end_date,
    ticker_list,
    data_source,
    time_interval,
    technical_indicator_list,
    env,
    model_name,
    if_vix=True,
    if_cdl=True,
    **kwargs,
):

    # Create 'test_data' folder in the current working directory if it doesn't exist
    folder_path = os.path.join(os.getcwd(), "test_data")
    os.makedirs(folder_path, exist_ok=True)

    # Set the file paths within the 'test_data' folder
    data_file = os.path.join(folder_path, f"data_{start_date}_{end_date}.pkl")
    arrays_file = os.path.join(folder_path, f"arrays_{start_date}_{end_date}.pkl")

    if os.path.exists(data_file) and os.path.exists(arrays_file):
        # Load the saved data
        with open(data_file, "rb") as f:
            data = pickle.load(f)
        with open(arrays_file, "rb") as f:
            price_array, tech_array, turbulence_array, date_array = pickle.load(f)
    
    else:
        # download data
        dp = DataProcessor(data_source, **kwargs)
        data = dp.download_data(ticker_list, start_date, end_date, time_interval)
        data = dp.clean_data(data)
        data = dp.add_technical_indicator(data, technical_indicator_list)
        if if_cdl:
            data = dp.add_cdl(data)
        if if_vix:
            data = dp.add_vix(data)
        else:
            data = dp.add_turbulence(data)
        price_array, tech_array, turbulence_array, date_array = dp.df_to_array(data, if_vix, if_cdl)
        
        # Save the data and arrays
        with open(data_file, "wb") as f:
            pickle.dump(data, f)
        with open(arrays_file, "wb") as f:
            pickle.dump((price_array, tech_array, turbulence_array, date_array), f)
            
    test_env_config = {
        "price_array": price_array,
        "tech_array": tech_array,
        "turbulence_array": turbulence_array,
        "date_array": date_array,
        "if_train": False,
    }
    
    return env(test_env_config)

### Build Environment

In [None]:
TRAIN_START_DATE = "2016-01-01"
TRAIN_END_DATE = "2022-12-31"

TEST_START_DATE = '2023-01-01'
TEST_END_DATE = '2023-04-30'

In [None]:
print(GP2_TICKERS)

In [None]:
print(INDICATORS)

In [None]:
print(CDL)

In [None]:
action_dim = len(GP2_TICKERS)

In [None]:
state_dim = 109 # length of env.get_state() array

In [None]:
train_env_instance = get_train_env(
    start_date=TEST_START_DATE,
    end_date=TRAIN_END_DATE,
    ticker_list=GP2_TICKERS,
    data_source="alpaca",
    time_interval="15Min",
    technical_indicator_list=INDICATORS,
    env=StockTradingEnv,
    model_name="PPO"
)

### MLP optimization

In [3]:
from ray.rllib.algorithms.ppo import PPOConfig


pprint(PPOConfig().to_dict())

{'_disable_action_flattening': False,
 '_disable_execution_plan_api': True,
 '_disable_preprocessor_api': False,
 '_enable_learner_api': False,
 '_enable_rl_module_api': False,
 '_fake_gpus': False,
 '_learner_hps': PPOLearnerHPs(kl_coeff=0.2,
                               kl_target=0.01,
                               use_critic=True,
                               clip_param=0.3,
                               vf_clip_param=10.0,
                               entropy_coeff=0.0,
                               vf_loss_coeff=1.0,
                               lr_schedule=None,
                               entropy_coeff_schedule=None),
 '_tf_policy_handles_more_than_one_loss': False,
 '_validate_exploration_conf_and_rl_modules': True,
 'action_space': None,
 'actions_in_input_normalized': False,
 'always_attach_evaluation_results': False,
 'auto_wrap_old_gym_envs': True,
 'batch_mode': 'truncate_episodes',
 'callbacks': <class 'ray.rllib.algorithms.callbacks.DefaultCallbacks'>,
 'ch

In [None]:
def sample_ppo_params():
    return {
        "entropy_coeff": tune.loguniform(0.00000001, 0.1),
        "lr": tune.loguniform(5e-5, 0.001),
        "sgd_minibatch_size": tune.choice([ 32, 64, 128, 256, 512]),
        "lambda": tune.choice([0.1,0.3,0.5,0.7,0.9,1.0]),
        "framework":'torch',
        'model':{
            'fcnet_hiddens': [256, 256]
        }
    }

In [None]:
pb2 = PB2(
    time_attr='episodes_total',
    metric="episode_reward_mean",
    mode="max",
    perturbation_interval=5,
    quantile_fraction=0.25,
    hyperparam_bounds={
        
    }
)
# https://docs.ray.io/en/latest/tune/api/doc/ray.tune.schedulers.pb2.PB2.html#ray-tune-schedulers-pb2-pb2

In [None]:
drl_agent = DRLlibv2(
    trainable="PPO",
    train_env=train_env_instance,
    train_env_name="StockTradingEnv",
    framework="torch",
    log_level="WARN",
    run_name='gp2_train_mlp',
    local_dir="gp2_train_mlp",
    params=sample_ppo_params(),
    num_workers=16,
    num_envs_per_worker=32,
    restart_failed_sub_environments=True,
    recreate_failed_workers=True,
    num_samples=16,
    num_gpus=1,
    num_cpus=20,
    timeout=3600,
    checkpoint_num_to_keep=16,
    checkpoint_freq=5,
    scheduler=pb2,
)