In [None]:
import ray
import numpy as np
import pandas as pd
import yfinance as yf
import pandas_ta as ta
from datetime import datetime, timedelta

from ray import tune
from ray.tune.registry import register_env

import tensortrade.env.default as default

from tensortrade.feed.core import DataFeed, Stream
from tensortrade.oms.exchanges import Exchange,ExchangeOptions
from tensortrade.oms.services.execution.simulated import execute_order
from tensortrade.oms.wallets import Wallet, Portfolio
from tensortrade.env.default.rewards import TensorTradeRewardScheme
from tensortrade.feed.core import Stream, DataFeed

from gym.spaces import Discrete
from tensortrade.env.default.actions import TensorTradeActionScheme
from tensortrade.env.generic import ActionScheme, TradingEnv
from tensortrade.core import Clock
from tensortrade.oms.instruments import ExchangePair, Instrument
from tensortrade.oms.wallets import Portfolio
from tensortrade.oms.orders import (
    Order,
    proportion_order,
    TradeSide,
    TradeType
)

import matplotlib.pyplot as plt

from tensortrade.env.generic import Renderer

import ray.rllib.agents.ppo as ppo

### Renderer (Plot)

In [None]:
class PositionChangeChart(Renderer):
    """The Renderer renders a view of the environment and interactions."""
    
    def __init__(self, color: str = "orange"):
        self.color = "orange"

    def render(self, env, **kwargs):
        # The Observer generates the next observation for the agent.
        history = pd.DataFrame(env.observer.renderer_history)

        actions = list(history.action)
        p = list(history.price)

        buy = {}
        sell = {}

        for i in range(len(actions) - 1):
            a1 = actions[i]
            a2 = actions[i + 1]

            if a1 != a2:
                if a1 == 0 and a2 == 1:
                    buy[i] = p[i]
                else:
                    sell[i] = p[i]

        buy = pd.Series(buy)
        sell = pd.Series(sell)

        fig, axs = plt.subplots(1, 2, figsize=(15, 5))

        fig.suptitle("Performance")

        axs[0].plot(np.arange(len(p)), p, label="price", color=self.color)
        axs[0].scatter(buy.index, buy.values, marker="v", color="red") # BUY
        axs[0].scatter(sell.index, sell.values, marker="^", color="green") # SELL
        axs[0].set_title("Trading Chart")
        axs[0].legend(['Price', 'Buys', 'Sells'])

        performance_df = pd.DataFrame().from_dict(env.action_scheme.portfolio.performance, orient='index')
        performance_df.plot(ax=axs[1])
        axs[1].set_title("Net Worth")

        plt.show()

---

### DRL Model

### Train

Now in order to use our custom environment in ray we must first write a function that creates an instance of the TradingEnv from a configuration dictionary.

In [None]:
def generate_train_test_datasets(ticker, train_test_split):
    """Get Yahoo! Finance Data for Train/Test Splits."""

    yf_ticker = yf.Ticker(ticker=f'{ticker}.SA')

    df = yf_ticker.history(period='2y', interval='1h')
    df.drop(['Dividends', 'Stock Splits'], axis=1, inplace=True)
    df["Volume"] = df["Volume"].fillna(0).astype(int)
    df.ta.log_return(append=True, length=16)
    df.ta.rsi(append=True, length=14)
    df.ta.macd(append=True, fast=12, slow=26)
    df.dropna(inplace=True)

    next_day = datetime.strptime(train_test_split, '%Y-%m-%d')
    next_day = next_day + timedelta(days=1)
    next_day = next_day.strftime('%Y-%m-%d')

    df_training = df.loc[:train_test_split].copy()
    df_evaluation = df.loc[next_day:].copy()

    df_training.rename(columns={'Open': 'open', 'High': 'high', 'Low': 'low', 'Close': 'close', 'Volume': 'volume'}, inplace=True)
    df_evaluation.rename(columns={'Open': 'open', 'High': 'high', 'Low': 'low', 'Close': 'close', 'Volume': 'volume'}, inplace=True)

    df_training.dropna().to_csv('../../data/testing/training.csv', index=True)
    df_evaluation.dropna().to_csv('../../data/testing/evaluation.csv', index=True)

    return df_training, df_evaluation

In [None]:
# df_train, df_test = generate_train_test_datasets('PETR4', '2022-01-03')

In [None]:
class PBR(TensorTradeRewardScheme):

    """ Position-based reward scheme (PBR).
    
    The RewardScheme computes the reward for 
    each time step based on the agent’s performance.
    """
    
    registered_name = "pbr"

    def __init__(self, price: 'Stream'):
        super().__init__()
        self.position = -1

        r = Stream.sensor(price, lambda p: p.value, dtype="float").diff()
        position = Stream.sensor(self, lambda rs: rs.position, dtype="float")

        reward = (r * position).fillna(0).rename("reward")

        self.feed = DataFeed([reward])
        self.feed.compile()

    def on_action(self, action: int):
        self.position = -1 if action == 0 else 1

    def get_reward(self, portfolio: 'Portfolio'):
        return self.feed.next()["reward"]

    def reset(self):
        self.position = -1
        self.feed.reset()

In [None]:
class BSH(TensorTradeActionScheme):
    """The ActionScheme interprets and applies the agent’s actions to the environment."""

    registered_name = "bsh"

    def __init__(self, cash: 'Wallet', asset: 'Wallet'):
        super().__init__()
        self.cash = cash
        self.asset = asset

        self.listeners = []
        self.action = 0

    @property
    def action_space(self):
        return Discrete(2)

    def attach(self, listener):
        self.listeners += [listener]
        return self

    def get_orders(self,
                   action: int,
                   portfolio: 'Portfolio'):
        
        order = None

        if abs(action - self.action) > 0:
            src = self.cash if self.action == 0 else self.asset
            tgt = self.asset if self.action == 0 else self.cash

            if src == self.cash:
                # Calculates proportional order size (n lots of 100 shares)
                lot_size = 100.00
                current_price = float(portfolio.exchange_pairs[0].price)
                source_balance = src.balance.as_float()

                qtd_assets = source_balance / (lot_size * current_price)

                num_shares = int(qtd_assets - (qtd_assets % 10)) * lot_size

                proportional_lot_size = (num_shares * current_price) / source_balance
            else:
                proportional_lot_size = 1.0

            print('--' * 50)
            
            if src == self.cash:
                print ('CASH TO ASSET')
                print('Source Balance: ', src.balance.as_float())
                print('Target Balance: ', tgt.balance.as_float())
                print('Proportional Lot Size', proportional_lot_size)
                print('Current Price: ', float(portfolio.exchange_pairs[0].price))
                print('# Shares: ', num_shares)
                print('Current Price x # Shares: ', num_shares * float(portfolio.exchange_pairs[0].price))
            else:
                print ('ASSET TO CASH')
                print('Source Balance: ', src.balance.as_float())
                print('Target Balance: ', tgt.balance.as_float())
                print('Proportional Lot Size', proportional_lot_size)

            order = proportion_order(
                        portfolio, 
                        src, 
                        tgt, 
                        proportional_lot_size
                    )

            self.action = action

        for listener in self.listeners:
            listener.on_action(action)

        return [order]

    def reset(self):
        super().reset()
        self.action = 0

---

In [None]:
def create_training_env(config):
    """Creates Trading Environment. """
    
    # 1. Data and TensorTrade Objects
    # dataset = pd.read_csv(filepath_or_buffer="../../data/testing/training.csv", 
    #                       parse_dates=True).fillna(method='backfill').fillna(method='ffill')

    # dataset.rename(columns={'Unnamed: 0': 'date'}, inplace=True)

    dataset = pd.read_csv('../../data/market/ohlcv_daily_TA/PETR3_train.csv')
    
    # Price Series
    price = Stream.source(list(dataset["close"]), dtype="float").rename("BRL-ASSETS")
    
    b3_commission = 0.0035
    b3_options = ExchangeOptions(commission=b3_commission)
    b3_exchange = Exchange("B3", service=execute_order, options=b3_options)(price)
    
    # Instruments
    BRL = Instrument("BRL", 2, "Brazilian Currency")
    ASSETS = Instrument("ASSETS", 2, "Assets")

    # Portfolio
    cash = Wallet(b3_exchange, 100000 * BRL) # Money
    asset = Wallet(b3_exchange, 0 * ASSETS) # Stocks/Assets
    
    portfolio = Portfolio(BRL, [cash, asset])
    
    features = []
    for c in dataset.columns[1:]:
        s = Stream.source(list(dataset[c]), dtype="float").rename(dataset[c].name)
        features += [s]
    feed = DataFeed(features)
    feed.compile()
    
    # 2. Rewards

    # Rewards - Risk Adjusted
    # reward_scheme = default.rewards.RiskAdjustedReturns(
    #     return_algorithm='sharpe',
    #     risk_free_rate=0,
    #     window_size= 7 * 5
    # )

    # Reward
    reward_scheme = PBR(
        price=price
    )
    
    # Actions
    action_scheme = BSH(
        cash=cash,
        asset=asset
    ).attach(reward_scheme)
    

    # Rewards - Simple Profit
    # reward_scheme = default.rewards.SimpleProfit()
    
    # 3. Actions

    # action_scheme = default.actions.ManagedRiskOrders(
    #     stop=[0.05],
    #     take=[0.075],
    #     min_order_pct=0.5
    # )

    # A discrete action scheme that determines actions based on managing risk
    # action_scheme = default.actions.BSH(cash, asset)

    # 4. Visualization
    renderer_feed = DataFeed([
        Stream.source(list(dataset['date'])).rename("date"),
        Stream.source(list(dataset["open"]), dtype="float").rename("open"),
        Stream.source(list(dataset["high"]), dtype="float").rename("high"),
        Stream.source(list(dataset["low"]), dtype="float").rename("low"),
        Stream.source(list(dataset["close"]), dtype="float").rename("close"),
        Stream.source(list(dataset["volume"]), dtype="float").rename("volume")
    ])

    # 5. Environment
    environment = default.create(
        feed=feed,
        portfolio=portfolio,
        action_scheme=action_scheme,
        reward_scheme=reward_scheme,
        renderer_feed=renderer_feed,
        renderer=default.renderers.PlotlyTradingChart(display=True, auto_open_html=False, save_format="png"),
        window_size=config["window_size"],
        max_allowed_loss=0.4
    )
    
    return environment

register_env("TradingEnv", create_training_env)

---

Now that the environment is registered we can run the training algorithm using the Proximal Policy Optimization (PPO) algorithm implemented in rllib.

In [None]:
window_size = 14

env_config_training = {
    # We want to look at the last 14 samples (hours)
    "window_size": window_size, # hours * days
    # And calculate reward based on the actions taken in the next 7 hours
    #"reward_window_size": 7,
    # If it goes past 10% loss during the iteration, we don't want to waste time on a "loser".
    "max_allowed_loss": 0.10,
}

analysis = tune.run(
    run_or_experiment="PPO", # Builtin PPO agent in RLLib
    name="BrazilianStockTrading",
    metric='episode_reward_mean',
    mode='max',
    stop={
      "episode_reward_mean": 0.03
    },
    config={
        "env": "TradingEnv",
        "env_config": env_config_training,
        "log_level": "WARNING",
        "framework": "tf2",
        "eager_tracing": True,
        "ignore_worker_failures": True,
        "num_workers": 1, 
        "num_gpus": 1,
        "clip_rewards": True,
        "lr": 8e-6,
        "lr_schedule": [
            [0, 1e-1],
            [int(1e2), 1e-2],
            [int(1e3), 1e-3],
            [int(1e4), 1e-4],
            [int(1e5), 1e-5],
            [int(1e6), 1e-6],
            [int(1e7), 1e-7]
        ],
        "gamma": 0.5,
        "observation_filter": "MeanStdFilter",
        "lambda": 0.72,
        "vf_loss_coeff": 0.5,
        "entropy_coeff": 0.01
    },
    checkpoint_at_end=True
)

---

After training is complete, we would now like to get access to the agents policy. We can do that by restoring the agent using the following code.

In [None]:
# Get checkpoint
checkpoints = analysis.get_trial_checkpoints_paths(
    trial=analysis.get_best_trial("episode_reward_mean", mode="max"),
    metric="episode_reward_mean"    
)

checkpoint_path = checkpoints[0][0]

# Restore agent
agent = ppo.PPOTrainer(
    env="TradingEnv",
    config={
        "env_config": env_config_training,
        "log_level": "DEBUG",
        "framework": "tf2",
        "ignore_worker_failures": True,
        "num_workers": 1,
        "num_gpus": 1,
        "clip_rewards": True,
        "lr": 8e-6,
        "lr_schedule": [
            [0, 1e-1],
            [int(1e2), 1e-2],
            [int(1e3), 1e-3],
            [int(1e4), 1e-4],
            [int(1e5), 1e-5],
            [int(1e6), 1e-6],
            [int(1e7), 1e-7]
        ],
        "gamma": 0.5,
        "observation_filter": "MeanStdFilter",
        "model": {
            "fcnet_hiddens": [256, 256], # Hyperparameter grid search defined above
        },
        "lambda": 0.72,
        "vf_loss_coeff": 0.5,
        "entropy_coeff": 0.01
    },
)

---

After training is complete, we would now like to get access to the agents policy. We can do that by restoring the agent using the following code.

In [None]:
# Restore agent
agent.restore(checkpoint_path)

# Instantiate the environment
env = create_training_env({
    "window_size": window_size
})

# Run until episode ends
episode_reward = 0
done = False
obs = env.reset()

while not done:
    action = agent.compute_single_action(obs)
    obs, reward, done, info = env.step(action)
    episode_reward += reward

env.render()

---

### Validation Set

In [None]:
def create_eval_env(config):
    """Creates the Evaluation Environment."""
    # dataset = pd.read_csv(filepath_or_buffer="../../data/testing/evaluation.csv", 
    #                       parse_dates=True).fillna(method='backfill').fillna(method='ffill')

    # dataset.rename(columns={'Unnamed: 0': 'date'}, inplace=True)

    dataset = pd.read_csv('../../data/market/ohlcv_daily_TA/PETR3_test.csv')

    # Price Series
    price = Stream.source(list(dataset["close"]), dtype="float").rename("BRL-ASSETS")
    
    b3_commission = 0.0035
    b3_options = ExchangeOptions(commission=b3_commission)
    b3_exchange = Exchange("B3", service=execute_order, options=b3_options)(price)
    
    # Instruments
    BRL = Instrument("BRL", 2, "Brazilian Currency")
    ASSETS = Instrument("ASSETS", 2, "Assets")

    # Portfolio
    cash = Wallet(b3_exchange, 100000 * BRL) # Money
    asset = Wallet(b3_exchange, 0 * ASSETS) # Stocks/Assets
    
    portfolio = Portfolio(BRL, [cash, asset])
    
    features = []
    for c in dataset.columns[1:]:
        s = Stream.source(list(dataset[c]), dtype="float").rename(dataset[c].name)
        features += [s]
    feed = DataFeed(features)
    feed.compile()
    
    # Rewards - Risk Adjusted
    # reward_scheme = default.rewards.RiskAdjustedReturns(
    #     return_algorithm='sharpe',
    #     risk_free_rate=0,
    #     window_size= 7 * 5
    # )

    # Rewards - Simple Profit
    reward_scheme = default.rewards.SimpleProfit()
    
    # Actions
    action_scheme = default.actions.ManagedRiskOrders(
        stop=[0.05],
        take=[0.075],
        min_order_pct=0.5
    )

    # A discrete action scheme that determines actions based on managing risk
    # action_scheme = default.actions.ManagedRiskOrders()
    
    # Visualization
    renderer_feed = DataFeed([
        Stream.source(list(dataset['date'])).rename("date"),
        Stream.source(list(dataset["open"]), dtype="float").rename("open"),
        Stream.source(list(dataset["high"]), dtype="float").rename("high"),
        Stream.source(list(dataset["low"]), dtype="float").rename("low"),
        Stream.source(list(dataset["close"]), dtype="float").rename("close"),
        Stream.source(list(dataset["volume"]), dtype="float").rename("volume")
    ])

    environment = default.create(
        feed=feed,
        portfolio=portfolio,
        action_scheme=action_scheme,
        reward_scheme=reward_scheme,
        renderer_feed=renderer_feed,
        renderer=default.renderers.PlotlyTradingChart(display=True, auto_open_html=False, save_format="png"),
        window_size=config["window_size"],
        max_allowed_loss=0.4
    )
    
    return environment, portfolio

In [None]:
# 1. Instantiate the evaluation environment
env, portfolio = create_eval_env({
    "window_size": window_size
})

# 2. Run until episode ends
episode_reward = 0
done = False
obs = env.reset()

while not done:
    action = agent.compute_single_action(obs)
    obs, reward, done, info = env.step(action)
    episode_reward += reward
    
env.render()

portfolio.ledger.as_frame().head(10)

In [None]:
portfolio.ledger.as_frame().head(10)

In [None]:
df = pd.DataFrame(portfolio.performance)

In [None]:
df.T.plot()

In [None]:
df.loc["B3:/BRL-ASSETS"].plot()

In [None]:
df.loc["B3:/ASSETS:/worth"].plot()

---

### Net Worth Analysis (with Quantstats)

In [None]:
import quantstats as qs

# extend pandas functionality with metrics, etc.
qs.extend_pandas()

In [None]:
dataset = pd.read_csv(filepath_or_buffer="../../data/testing/evaluation.csv", 
                          parse_dates=True).fillna(method='backfill').fillna(method='ffill')

dataset.rename(columns={'Unnamed: 0': 'Date'}, inplace=True)

net_worth = df.loc["net_worth"].rename('Close')
net_worth.index = dataset['Date'].loc[1:]
net_worth.index = pd.to_datetime(net_worth.index)

net_worth = net_worth.resample('D').last()
net_worth.index = net_worth.index.date
net_worth.index = net_worth.index.rename('Date')

In [None]:
net_returns = net_worth.pct_change().dropna()
net_returns.index = pd.to_datetime(net_returns.index)
net_returns

In [None]:
# show sharpe ratio
qs.stats.sharpe(net_returns)

In [None]:
# Show overall performance
qs.plots.snapshot(net_returns, title=f'Evaluation Set Performance')

In [None]:
# Convert datetime to date (keep end of day result)
qs.reports.html(net_returns, "^BVSP")

---