In [None]:
import os
import random

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import gym
from gym import spaces
from ray.rllib.agents import ppo
from ray.rllib.env.env_context import EnvContext

sns.set_style("darkgrid")

## Environment

In [None]:
def prepare_dict(df):
    price_array = df['price'].to_numpy(dtype=np.float32)[:, np.newaxis]
    df = df.drop(columns=['price'])
    obs_array = df.to_numpy(dtype=np.float32)
    data_dictionary = {'price_array': price_array, 'observations': obs_array}
    return data_dictionary


class CryptoEnv(gym.Env):
    def __init__(self, config: EnvContext):
        self._price_array = config['price_array']
        self._observations = config['observations']

        self._base_cash = config['initial_capital']
        self._cash_usd = None
        self._stocks_usd = None
        self._stocks = None
        self._total_asset = None
        self._initial_total_asset = None

        self._time_step = None
        self._initial_step = None
        self._max_steps = config['max_steps']
        self._final_step = None
        self._upper_bound_step = self._price_array.shape[0] - self._max_steps - 1

        self._gamma = config['gamma']
        self._gamma_return = None

        self._action_dim = self._price_array.shape[1]
        # buy or sell up to the base cash equivalent(usd)
        self.action_space = spaces.Box(low=-1.0, high=1.0, shape=(self._action_dim,), dtype=np.float32)
        # cash + stocks + observations
        self._state_dim = 2 + self._observations.shape[1]
        self.observation_space = spaces.Box(low=-5.0, high=5.0, shape=(self._state_dim,), dtype=np.float32)

        self._state = None
        self._episode_ended = None

    def reset(self):
        self._time_step = self._initial_step = random.randint(0, self._upper_bound_step)
        self._final_step = self._initial_step + self._max_steps
        self._cash_usd = random.random() * self._base_cash
        self._stocks_usd = random.random() * self._base_cash
        self._stocks = self._stocks_usd / self._price_array[self._time_step][0]
        self._total_asset = self._initial_total_asset = self._cash_usd + self._stocks_usd
        self._gamma_return = 0.0

        self._state = self._get_state()
        self._episode_ended = False
        return self._state

    def step(self, action):
        self._time_step += 1
        price = self._price_array[self._time_step][0]
        self._stocks_usd = self._stocks * price
        if action[0] < 0 and price > 0:  # sell
            sell_shares_usd = min(self._base_cash * -action[0], self._stocks_usd)
            self._stocks_usd -= sell_shares_usd
            self._cash_usd += sell_shares_usd
        elif action[0] > 0 and price > 0:  # buy
            money_to_spend = min(self._base_cash * action[0], self._cash_usd)
            self._stocks_usd += money_to_spend
            self._cash_usd -= money_to_spend
        self._stocks = self._stocks_usd / price

        self._episode_ended = self._time_step == self._final_step
        self._state = self._get_state()
        next_total_asset = self._cash_usd + self._stocks_usd

        reward = (next_total_asset - self._total_asset) / self._base_cash
        self._total_asset = next_total_asset
        self._gamma_return = self._gamma_return * self._gamma + reward
        if self._episode_ended:
            reward = self._gamma_return
            return self._state, reward, True, self._get_info()
        else:
            return self._state, reward, False, self._get_info()

    def _get_state(self):
        state = np.hstack(((self._cash_usd - self._base_cash) / self._base_cash, 
                           (self._stocks_usd - self._base_cash) / self._base_cash))
        observation = self._observations[self._time_step]
        state = np.hstack((state, observation)).astype(np.float32)
        return state

    def _get_info(self):
        return {"Initial step": self._initial_step, 
                "Final step": self._final_step, 
                "Initial total asset": self._initial_total_asset, 
                "Final total asset": self._total_asset, 
                "Gamma return": self._gamma_return}

## Prepare data

In [None]:
PERIODS = ['3min', '5min', '15min', '30min', '1h', '2h', '4h', '6h', '8h', '12h', '1d', '3d', '7d', '30d']
START_DATE = '2022-01-01'
INITIAL_DATE = '2022-01-31'
FINISH_DATE = '2022-03-01'

In [None]:
try:
    df = pd.read_parquet('data/BTC-USDT.parquet')
except FileNotFoundError:
    os.makedirs('data', exist_ok=True)
    !kaggle datasets download -d jorijnsmit/binance-full-history -f BTC-USDT.parquet -p ./data --unzip

In [None]:
fig, ax = plt.subplots(1, 1, figsize=(14, 5))
ax.plot(df[START_DATE:FINISH_DATE].index, df[START_DATE:FINISH_DATE]['close'])
ax.set_xlabel("Time")
ax.set_ylabel("Price")

In [None]:
df_train = df[START_DATE:FINISH_DATE][['open', 'high', 'low', 'close']].copy()
df_train[['open', 'high', 'low', 'close']] = df_train[['open', 'high', 'low', 'close']] - df_train[['open', 'high', 'low', 'close']].mean().mean()
df_train[['open', 'high', 'low', 'close']] = df_train[['open', 'high', 'low', 'close']] / df_train[['open', 'high', 'low', 'close']].std().mean()
df_train['price'] = df[START_DATE:FINISH_DATE]['close']

for period in PERIODS:
    df_train['open' + period] = df_train['open'].rolling(period).agg(lambda rows: rows[0])
    df_train['high' + period] = df_train['high'].rolling(period).max()
    df_train['low' + period] = df_train['low'].rolling(period).min()

In [None]:
df_train = df_train[INITIAL_DATE:]

## Train agent

In [None]:
stop_iters = 200
n_workers = 5
n_envs_per_worker = 20
r_fragment_length = 2000
train_batch_size = n_workers * n_envs_per_worker * r_fragment_length
sgd_minibatch_size = int(train_batch_size / 10)
train_dict = prepare_dict(df_train)
config = {
    "env": CryptoEnv,
    "env_config": {
        "price_array": train_dict['price_array'],
        "observations": train_dict['observations'],
        "initial_capital": 1e4,
        "gamma": 0.99,
        "max_steps": r_fragment_length,
    },
    "num_gpus": 1,
    "model": {
        "vf_share_layers": False,
    },
    "num_workers": n_workers,
    "num_envs_per_worker": n_envs_per_worker,
    "rollout_fragment_length": r_fragment_length,
    "train_batch_size": train_batch_size,
    "sgd_minibatch_size": sgd_minibatch_size,
    "batch_mode": "complete_episodes",
    "framework": "tf",
}

In [None]:
ppo_config = ppo.DEFAULT_CONFIG.copy()
ppo_config.update(config)
ppo_config["lr"] = 1e-5
trainer = ppo.PPOTrainer(config=ppo_config, env=CryptoEnv)
for _ in range(stop_iters):
    result = trainer.train()
    checkpoint = trainer.save()

## Visualize results 

In [None]:
config["env_config"]["max_steps"] = df_train.shape[0] - 1
env = CryptoEnv(config["env_config"])
actions = []
total_assets = []
rewards = []
observation = env.reset()
while True:
    action = trainer.compute_action(observation)
    observation, reward, done, info = env.step(action)
    rewards.append(reward)
    total_assets.append(info["Final total asset"])
    actions.append(action)
    if done:
        break

In [None]:
init_step, final_step = info["Initial step"], info["Final step"]

In [None]:
fig, ax = plt.subplots(1, 1, figsize=(12, 5))
ax.plot(df_train.iloc[init_step: final_step].index, df_train.iloc[init_step: final_step]['price'])
ax.set_xlabel("Time")
ax.set_ylabel("Price")
ax.set_title("Train set")

In [None]:
fig, ax = plt.subplots(1, 1, figsize=(12, 5))
ax.scatter(df_train.iloc[init_step: final_step].index, np.array(total_assets))
ax.set_xlabel("Time")
ax.set_ylabel("Total asset")