In [9]:
import shutup
shutup.please()

import gym
import src.environments.continuous.stock_trading  

import numpy as np
import pandas as pd 

from tqdm.notebook import trange, tqdm

### Unit tests for the trading environment

In [10]:
def environment():
    env = gym.make('StockTradingEnvironment-v0',
        use_technical_indicators= [
        "macd",
        "boll_ub",
        "boll_lb",
        "rsi_30",
        "cci_30",
        "dx_30",
        "close_30_sma",
        "close_60_sma",
    ])
    
    env.success_threshold =0.25 # 25%
    return env


In [11]:
env = environment()
original_state = env.reset(dataset_id=1);

#### Dataset tests

In [12]:
env.df_norm.describe().T

Unnamed: 0,count,mean,std,min,25%,50%,75%,max
open,147.0,0.00126,0.01581,-0.041566,-0.00879,0.000942,0.008769,0.053349
high,147.0,0.001226,0.01279,-0.029289,-0.006018,0.000722,0.00661,0.050532
low,147.0,0.001218,0.015288,-0.041874,-0.00544,0.001935,0.008419,0.051613
close,147.0,0.001293,0.015363,-0.04339,-0.005748,0.000524,0.008084,0.05891
volume,147.0,0.045043,0.326347,-0.7005,-0.17233,0.00068,0.196014,1.856035
day,147.0,0.505102,0.349252,0.0,0.25,0.5,0.75,1.0
vix,147.0,0.159149,0.045959,0.0982,0.1263,0.1479,0.17965,0.3732
turbulence,147.0,0.090042,0.065095,0.018619,0.047781,0.067448,0.109249,0.39741
boll_lb,147.0,0.901515,0.049326,0.786436,0.864879,0.915659,0.939232,0.977058
boll_ub,147.0,1.000958,0.038575,0.934068,0.97142,0.992185,1.021261,1.141195


In [13]:
len(env.df_norm),len(env.df)

(147, 147)

In [14]:
assert np.any(env.df_norm.max() > 2) == False , 'Normalized dataset has unusual values'
assert np.any(env.df_norm.min() < -2) == False , 'Normalized dataset has unusual values'

In [15]:
# Train and Test datasets

In [16]:
assert env.train_dataframe_id_range[1] + env.window_size == env.test_dataframe_id_range[0] , 'Train and test datasets are intersecting'
assert env.test_dataframe_id_range[1] == env.n_dataframes -1 , 'Train dataset indices not matching'

In [28]:
# Verify if testing in seen days
env.load_dataset_by_index(env.train_dataframe_id_range[1])
df_train_end = env.df.copy()

env.load_dataset_by_index(env.test_dataframe_id_range[0])
df_test_start = env.df.copy()


assert df_train_end.iloc[-1].name == df_test_start.iloc[env.lookback-1].name , 'Not a perfect test but the train should end [lookback -1] days after of first test dataset start'

(Timestamp('2019-04-03 00:00:00'), Timestamp('2019-04-03 00:00:00'))

#### Target tests

#### Target tests

#### Trade tests

#### Reset tests

In [9]:
from collections import deque

In [10]:
orders_history=deque(maxlen=env.lookback)
portfolio_history=deque(maxlen=env.lookback)
market_history=deque(maxlen=env.lookback)

##### 1. State and next state functions

In [11]:
def _state():
    state = np.concatenate((
    orders_history,
    portfolio_history,
    market_history)
    ,axis=1)

    return state

# def _next_state(self):
#     i = env.current_step

#     held = 1
#     if env.stock_sold > 0 or env.stock_bought > 0:
#         held = 0

#     # # Add order tracking
#     self.orders_history.append([held,self.stock_sold,self.stock_bought])

#     # # Add portfoluio state tracking
#     self.portfolio_history.append(self._normalize_portfolio(i))  # % % %

#         # # Market history tracks OHLC
#         self.market_history.append(self.df_norm.iloc[i])

#         self._state()

In [12]:
orders_history.clear()
portfolio_history.clear()
market_history.clear()

#print('Quer-se current_steps de 0 a lookback -1')
current_step = env.lookback
current_steps = []
for i in reversed(range(env.lookback)):
    
    current_step = env.lookback -i -1
    # Orders history tracks recent trader activity - held bought sold
    orders_history.append([0,0,0]) # Held, Sold, Bought
    # Portfolio
    portfolio_history.append([1,1,0,1])  # portfolio_value_% =>  cash_held_% => stocks_held_% stock_price_avg_comp_%

    # Market history 
    market_history.append(env.df_norm.iloc[current_step]) #env.df_norm.iloc[current_step])
    
    current_steps.append(current_step)
    
state = _state()

assert current_steps[0] == 0 , "Wrong first df iloc"
assert current_steps[-1] == env.lookback -1 , "Wrong last df iloc"
assert env.lookback == len(state), "Wrong state length"
assert env.current_step == env.lookback , 'Wrong current step'
assert state.shape==env.observation_space.shape , 'Wrong state vs observation shapes'
assert np.all(state == original_state) , 'Test state doesn\'t match original state'
assert np.all(state[-1,7:] == env.df_norm.iloc[env.current_step-1].values) , 'Normalized technical indicators dont match'
#print(env.current_step, env.lookback, state.shape,env.observation_space.shape)

In [13]:
### NEXT STATE
state = env.step(1)[0]

assert np.all(state[-1,7:] == env.df_norm.iloc[env.current_step-1].values) , 'Normalized technical indicators dont match'

##### Actions

In [14]:
stock_held = []
stock_prices = []

global cash_in_hand
cash_in_hand = env.initial_investment
cash_in_hand

408.4999847412109

In [15]:
assert cash_in_hand == env.cash_in_hand , 'Cash in hand doesnt match'

In [16]:
assert env.extract_action(0) == (0,0) , "Sell action failed"
assert env.extract_action(1) == (1,0), "Hold action failed"
assert env.extract_action(2) == (2,1), "Failed to buy one stock"

In [17]:
assert env.get_current_price() == env.df.iloc[env.current_step -1].close, 'Current price doesnt match'
assert env.get_current_buying_price() == env.df.iloc[env.current_step -1].close * (1+env.fees.BUY) , 'Current buying price doesnt match'
assert env.get_current_selling_price() == env.df.iloc[env.current_step -1].close * (1-env.fees.SELL) , 'Current selling price doesnt match'

In [18]:
# Buy one stock
current_buying_price = env.get_current_buying_price()
def _buy():
    current_price = env.get_current_buying_price()
    stock_held.append(1)
    stock_prices.append(current_price)
    global cash_in_hand
    cash_in_hand -= current_price
    
_buy()    

env.step(2);

assert env.cash_in_hand == cash_in_hand , 'Cash in hand doenst match'
assert env.stock_held == 1, 'Failed to buy stock'
assert env.initial_investment - current_buying_price == cash_in_hand , 'Cash in hand update after buy doesnt match'

In [19]:
# Buy all stocks possible
for i in range(env.maximum_stocks_held-2):
    _buy()
    env.step(2);

# Try to buy past that 
for i in range(5):
    env.step(2);
    
assert env.stock_held == env.maximum_stocks_held -1, 'Failed to buy stock'
assert env.cash_in_hand > 0, 'Cannot be in debt'
assert np.all(stock_prices == env.stock_prices) , 'Stock purchase prices must match'

In [20]:
## means 

In [21]:
assert env.extract_action(0) == (0,env.maximum_stocks_held -1) , "Sell action failed"

transaction_profits = []
def _sell():
    current_price = env.get_current_selling_price()
    for i in range(len(stock_held)):
        sold_price = stock_held[i] * current_price
        bought_price = stock_held[i] * stock_prices[i]
        global cash_in_hand
        cash_in_hand += sold_price
        transaction_profits.append((sold_price-bought_price)/bought_price)
    
sold_price = env.get_current_selling_price()
_sell()
# Sell all stocks
env.step(0)

assert env.stock_held == 0 , 'Sold all stocks'
assert env.cash_in_hand, cash_in_hand == 'Cash in hand doesn\'t match'

In [22]:
for i in range(len(env.trading_history)):
    th = env.trading_history[i]
    if th['action']=="buy":
        assert th['current_price'] == stock_prices[i], 'Buying prices dont match'
        
    if th['action']=="sell":
        assert th['current_price'] == sold_price, 'Selling prices dont match'
    
    if th['action']=="sell":
        assert th['current_price'] == sold_price, 'Selling prices dont match'

In [23]:
rewards = [1,2,3,4,-1,0,-2,1]

r = np.array(rewards[-1000:])#  < 0)
len(r[r<=0])

3

#### Step tests