Skip to content

Commit

Permalink
Don't early-terminate episodes, then we can fit batch_size & remove a…
Browse files Browse the repository at this point in the history
…utoencoder. Switch from abs advantage to Sharpe & cummulative return. Conv step_window back-indexing
  • Loading branch information
lefnire committed Feb 10, 2018
1 parent d653b34 commit e0345c8
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 117 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ This project is a [TensorForce](https://github.com/reinforceio/tensorforce)-base

- [Sutton & Barto](http://amzn.to/2EWvnVf): de-facto textbook on RL basics
- [CS 294](http://rll.berkeley.edu/deeprlcourse/): the modern deep-learning spin on ^.
- [Machine Learning for Trading](https://www.udacity.com/course/machine-learning-for-trading--ud501): teaches you algo-trading, stock stuff, and applied RL.

This project goes with Episode 26+ of [Machine Learning Guide](http://ocdevel.com/podcasts/machine-learning). Those episodes are tutorial for this project; including an intro to Deep RL, hyperparameter decisions, etc.

Expand Down
165 changes: 80 additions & 85 deletions btc_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
env back to Gym format. Anyone wanna give it a go?
"""

import random, time, requests, pdb, gdax
import random, time, requests, pdb, gdax, math
from enum import Enum
import numpy as np
import pandas as pd
Expand Down Expand Up @@ -59,12 +59,10 @@ class Scaler(object):
def __init__(self):
self.scalers = {
self.REWARD: RobustScaler(quantile_range=(5., 95.)),
self.SERIES: RobustScaler(quantile_range=(5., 95.)),
self.STATIONARY: RobustScaler(quantile_range=(5., 95.))
}
self.data = {
self.REWARD: [],
self.SERIES: [],
self.STATIONARY: []
}
self.done = False
Expand Down Expand Up @@ -103,12 +101,10 @@ def transform(self, input, kind, force=False):
# keep this globally around for all runs forever
scalers = {}

# We don't want random-seeding for reproducibilityy! We _want_ two runs to give different results, because we only
# trust the hyper combo which consistently gives positive results!
ALLOW_SEED = False


class BitcoinEnv(Environment):
EPISODE_LEN = 5000

def __init__(self, hypers, name='ppo_agent'):
"""Initialize hyperparameters (done here instead of __init__ since OpenAI-Gym controls instantiation)"""
self.hypers = Box(hypers)
Expand All @@ -118,18 +114,19 @@ def __init__(self, hypers, name='ppo_agent'):
# cash/val start @ about $3.5k each. You should increase/decrease depending on how much you'll put into your
# exchange accounts to trade with. Presumably the agent will learn to work with what you've got (cash/value
# are state inputs); but starting capital does effect the learning process.
self.start_cash, self.start_value = .3, .3
self.start_cash, self.start_value = 1., 1.

# We have these "accumulator" objects, which collect values over steps, over episodes, etc. Easier to keep
# same-named variables separate this way.
self.acc = Box(
episode=dict(
i=0,
total_steps=0,
advantages=[],
sharpes=[],
returns=[],
uniques=[]
),
step=dict(i=0), # setup in reset()
step=dict(), # setup in reset()
tests=dict(
i=0,
n_tests=0
Expand All @@ -150,8 +147,8 @@ def __init__(self, hypers, name='ppo_agent'):

# Our data is too high-dimensional for the way MemoryModel handles batched episodes. Reduce it (don't like this)
all_data = data.db_to_dataframe(self.conn, arbitrage=self.hypers.arbitrage)
self.all_observations, self.all_prices = self._xform_data(all_data)
self.all_prices_diff = self._diff(self.all_prices, percent=True)
self.all_observations, self.all_prices = self.xform_data(all_data)
self.all_prices_diff = self.diff(self.all_prices, percent=True)

# Calculate a possible reward to be used as an average for repeat-punishing
self.possible_reward = self.start_value * np.median([p for p in self.all_prices_diff if p > 0])
Expand Down Expand Up @@ -193,21 +190,17 @@ def states(self): return self.states_
@property
def actions(self): return self.actions_

def seed(self, seed=None):
if not ALLOW_SEED: return
# self.np_random, seed = seeding.np_random(seed)
# return [seed]
random.seed(seed)
np.random.seed(seed)
tf.set_random_seed(seed)
# We don't want random-seeding for reproducibilityy! We _want_ two runs to give different results, because we only
# trust the hyper combo which consistently gives positive results.
def seed(self, seed=None): return

def update_btc_price(self):
try:
self.btc_price = int(requests.get(f"https://api.cryptowat.ch/markets/{EXCHANGE.value}/btcusd/price").json()['result']['price'])
except:
self.btc_price = self.btc_price or 8000

def _diff(self, arr, percent=False):
def diff(self, arr, percent=False):
series = pd.Series(arr)
diff = series.pct_change() if percent else series.diff()
diff.iloc[0] = 0 # always NaN, nothing to compare to
Expand All @@ -219,14 +212,14 @@ def _diff(self, arr, percent=False):
# then forward-fill the NaNs.
return diff.replace([np.inf, -np.inf], np.nan).ffill().bfill().values

def _xform_data(self, df):
def xform_data(self, df):
columns = []
use_indicators = self.hypers.indicators and self.hypers.indicators > 100
tables_ = data.get_tables(self.hypers.arbitrage)
percent = self.hypers.pct_change
for table in tables_:
name, cols, ohlcv = table['name'], table['cols'], table.get('ohlcv', {})
columns += [self._diff(df[f'{name}_{k}'], percent) for k in cols]
columns += [self.diff(df[f'{name}_{k}'], percent) for k in cols]

# Add extra indicator columns
if ohlcv and use_indicators:
Expand All @@ -236,10 +229,10 @@ def _xform_data(self, df):
ind[k] = df[f"{name}_{v}"]
columns += [
# TODO this is my naive approach, I'm not a TA expert. Could use a second pair of eyes
self._diff(SMA(ind, timeperiod=self.hypers.indicators), percent),
self._diff(EMA(ind, timeperiod=self.hypers.indicators), percent),
self._diff(RSI(ind, timeperiod=self.hypers.indicators), percent),
self._diff(ATR(ind, timeperiod=self.hypers.indicators), percent),
self.diff(SMA(ind, timeperiod=self.hypers.indicators), percent),
self.diff(EMA(ind, timeperiod=self.hypers.indicators), percent),
self.diff(RSI(ind, timeperiod=self.hypers.indicators), percent),
self.diff(ATR(ind, timeperiod=self.hypers.indicators), percent),
]

states = np.column_stack(columns)
Expand All @@ -258,18 +251,17 @@ def _xform_data(self, df):
# Currently we're reducing the dimensionality of our states (OHLCV + indicators + arbitrage => 5 or 6 weights)
# because TensorForce's memory branch changed Policy Gradient models' batching from timesteps to episodes.
# This takes of way too much GPU RAM for us, so we had to cut back in quite a few areas (num steps to train
# per episode, episode batch_size, and especially this:)
ae = AutoEncoder()
states = ae.fit_transform_tied(states)
# per episode, episode batch_size, and especially this:
# ae = AutoEncoder()
# states = ae.fit_transform_tied(states)

return states, prices

def use_dataset(self, mode, no_kill=False):
def use_dataset(self, mode, full_set=False):
"""Fetches, transforms, and stores the portion of data you'll be working with (ie, 80% train data, 20% test
data, or the live database). Make sure to call this before reset()!
"""
self.mode = mode
self.no_kill = no_kill
if mode in (Mode.LIVE, Mode.TEST_LIVE):
self.conn = data.engine_live.connect()
# Work with 6000 timesteps up until the present (play w/ diff numbers, depends on LSTM)
Expand All @@ -285,25 +277,24 @@ def use_dataset(self, mode, no_kill=False):
split = .9 # Using 90% training data.
n_train, n_test = int(row_ct * split), int(row_ct * (1 - split))
if mode == mode.TEST:
limit, offset = n_test, n_train
if no_kill is False:
limit = 50000 # he's not likely to get past that, so save some RAM (=time)
offset = n_train
limit = 30000 if full_set else 8000 # should be `n_test` in full_set, getting idx errors
else:
# Grab a random window from the 90% training data. The random bit is important so the agent
# sees a variety of data. The window-size bit is a hack: as long as the agent doesn't die (doesn't cause
# `terminal=True`), PPO's MemoryModel can keep filling up until it crashes TensorFlow. This ensures
# there's a stopping point (limit). I'd rather see how far he can get w/o dying, figure out a solution.
limit = 6000
offset = random.randint(0, n_train - limit)
limit = self.EPISODE_LEN
offset_start = 0 if not self.conv2d else self.hypers.step_window + 1
offset = random.randint(offset_start, n_train - self.EPISODE_LEN)

# self.observations, self.prices = self._xform_data(df)
# self.prices_diff = self._diff(self.prices, percent=True)
self.observations = self.all_observations[offset:offset+limit]
self.offset, self.limit = offset, limit
self.prices = self.all_prices[offset:offset+limit]
self.prices_diff = self.all_prices_diff[offset:offset+limit]

def _get_next_state(self, i, cash, value, repeats):
series = self.observations[i]
def get_next_state(self, i, cash, value, repeats):
i = i + self.offset
series = self.all_observations[i]
stationary = [cash, value, repeats]
if self.hypers.scale:
# series already scaled in self._xform_data()
Expand All @@ -315,27 +306,24 @@ def _get_next_state(self, i, cash, value, repeats):
# Take note of the +1 here. LSTM uses a single index [i], which grabs the list's end. Conv uses a window,
# [-something:i], which _excludes_ the list's end (due to Python indexing). Without this +1, conv would
# have a 1-step-behind delayed response.
window = self.observations[i - self.hypers.step_window + 1:i + 1]
window = self.all_observations[i - self.hypers.step_window + 1:i + 1]
series = np.expand_dims(window, axis=1)
return dict(series=series, stationary=stationary)

def reset(self):
step_acc, ep_acc = self.acc.step, self.acc.episode
# Cash & value are the real scores - how much we end up with at the end of an episode
step_acc.i = 0
step_acc.cash, step_acc.value = self.start_cash, self.start_value
# But for our purposes, we care more about "how much better is what we made than if we held". We're training
# a trading bot, not an investing bot. So we compare these at the end, calling it "advantage"
step_acc.hold = Box(value=self.start_cash, cash=self.start_value)
start_timestep = 1
if self.conv2d:
# for conv2d, start at the end of the first window (grab a full window)
start_timestep = self.hypers.step_window
step_acc.i = start_timestep
step_acc.signals = [0] * start_timestep
step_acc.hold_value = self.start_value
step_acc.totals = Box(
trade=[self.start_cash + self.start_value],
hold=[self.start_cash + self.start_value]
)
step_acc.signals = []
step_acc.repeats = 0
ep_acc.i += 1

return self._get_next_state(start_timestep, self.start_cash, self.start_value, 0.)
return self.get_next_state(0, self.start_cash, self.start_value, 0.)

def execute(self, actions):
if self.hypers.single_action:
Expand All @@ -360,32 +348,35 @@ def execute(self, actions):
}[EXCHANGE]
reward = 0
abs_sig = abs(signal)
before = Box(cash=step_acc.cash, value=step_acc.value, total=step_acc.cash+step_acc.value)
total_before = step_acc.cash + step_acc.value
# Perform the trade. In training mode, we'll let it dip into negative here, but then kill and punish below.
# In testing/live, we'll just block the trade if they can't afford it
if signal > 0 and not (self.no_kill and abs_sig > step_acc.cash):
if signal > 0 and abs_sig <= step_acc.cash:
step_acc.value += abs_sig - abs_sig*fee
step_acc.cash -= abs_sig
elif signal < 0 and not (self.no_kill and abs_sig > step_acc.value):
elif signal < 0 and abs_sig <= step_acc.value:
step_acc.cash += abs_sig - abs_sig*fee
step_acc.value -= abs_sig

# next delta. [1,2,2].pct_change() == [NaN, 1, 0]
diff_loc = step_acc.i + 1
pct_change = self.prices_diff[diff_loc]
pct_change = self.prices_diff[step_acc.i + 1]

step_acc.value += pct_change * step_acc.value
total = step_acc.value + step_acc.cash
reward += total - before.total
total_now = step_acc.value + step_acc.cash
step_acc.totals.trade.append(total_now)
# Reward is in dollar-change. As we build a great portfolio, the reward should get bigger and bigger (and
# the agent should notice this)
reward += (total_now - total_before)

# calculate what the reward would be "if I held", to calculate the actual reward's _advantage_ over holding
before = step_acc.hold
before.value += pct_change * before.value
step_acc.hold_value += pct_change * step_acc.hold_value
step_acc.totals.hold.append(step_acc.hold_value + self.start_cash)

# Collect repeated same-action count (homogeneous actions punished below)
recent_actions = np.array(step_acc.signals[-step_acc.repeats:])
if np.any(recent_actions > 0) and np.any(recent_actions < 0) and np.any(recent_actions == 0):
step_acc.repeats = 0 # reset repeat counter
else:
elif self.hypers.punish_repeats < self.EPISODE_LEN:
step_acc.repeats += 1
# by the time we hit punish_repeats, we're doubling punishments / canceling rewards. Note: we don't want to
# multiply by `reward` here because repeats are often 0, which means 0 penalty. Hence `possible_reward`
Expand All @@ -396,16 +387,11 @@ def execute(self, actions):
step_acc.i += 1
ep_acc.total_steps += 1

next_state = self._get_next_state(step_acc.i, step_acc.cash, step_acc.value, step_acc.repeats)
next_state = self.get_next_state(step_acc.i, step_acc.cash, step_acc.value, step_acc.repeats)
if self.hypers.scale:
reward = self.scaler.transform([reward], Scaler.REWARD)[0]

terminal = int(step_acc.i + 1 >= len(self.observations))
# Kill and punish if (a) agent ran out of money; (b) is doing nothing for way too long
# The repeats bit isn't just for punishment, but because training can get stuck too long on losers
if not self.no_kill and (step_acc.cash < 0 or step_acc.value < 0 or step_acc.repeats >= self.hypers.punish_repeats):
reward -= 1. # Big penalty. BTC, like $12k
terminal = True
terminal = int(step_acc.i + 1 >= self.limit)
if terminal and self.mode in (Mode.TRAIN, Mode.TEST):
# We're done.
step_acc.signals.append(0) # Add one last signal (to match length)
Expand Down Expand Up @@ -455,8 +441,8 @@ def execute(self, actions):
time.sleep(20)
self.last_timestamp = new_timestamp
self.df = pd.concat([self.df.iloc[-1000:], new_data], axis=0) # shed some used data, add new
self.observations, self.prices = self._xform_data(self.df)
self.prices_diff = self._diff(self.prices, percent=True)
self.observations, self.prices = self.xform_data(self.df)
self.prices_diff = self.diff(self.prices, percent=True)
step_acc.i = self.df.shape[0] - n_new - 1

if live:
Expand All @@ -475,22 +461,31 @@ def execute(self, actions):
def episode_finished(self, runner):
step_acc, ep_acc, test_acc = self.acc.step, self.acc.episode, self.acc.tests
signals = step_acc.signals

advantage = ((step_acc.cash + step_acc.value) - (self.start_cash + self.start_value)) - \
((step_acc.hold.value + step_acc.hold.cash) - (self.start_cash + self.start_value))
# per step average advantage, then bring it to a reasonable number (up from ~.0001)
advantage = advantage / step_acc.i * 10000
if advantage == 0.: advantage = -.01 # no HODLing!
self.acc.episode.advantages.append(advantage)
totals = step_acc.totals
n_uniques = float(len(np.unique(signals)))
self.acc.episode.uniques.append(n_uniques)

# Calculate the Sharpe ratio.
diff = (pd.Series(totals.trade).pct_change() - pd.Series(totals.hold).pct_change())[1:]
mean, std, sharpe = diff.mean(), diff.std(), 0
if (std, mean) != (0, 0):
# Usually Sharpe has `sqrt(num_trades)` in front (or `num_trading_days`?). Experimenting being creative w/
# trade-diversity, etc. Give Sharpe some extra info
# breadth = math.sqrt(np.uniques(signals))
breadth = np.std([np.sign(x) for x in signals]) # get signal direction, amount not as important (and adds complications)
sharpe = breadth * (mean / std)

cumm_ret = (totals.trade[-1] / totals.trade[0] - 1) - (totals.hold[-1] / totals.hold[0] - 1)

ep_acc.sharpes.append(float(sharpe))
ep_acc.returns.append(float(cumm_ret))
ep_acc.uniques.append(n_uniques)

# Print (limit to note-worthy)
lt_0 = len([s for s in signals if s < 0])
eq_0 = len([s for s in signals if s == 0])
gt_0 = len([s for s in signals if s > 0])
completion = int(test_acc.i / test_acc.n_tests * 100)
print(f"{completion}%\tSteps: {step_acc.i}\tAdvantage: {'%.3f'%advantage}\tTrades:\t{lt_0}[<0]\t{eq_0}[=0]\t{gt_0}[>0]")
print(f"{completion}%\tSteps: {step_acc.i}\tSharpe: {'%.3f'%sharpe}\tReturn: {'%.3f'%cumm_ret}\tTrades:\t{lt_0}[<0]\t{eq_0}[=0]\t{gt_0}[>0]")
return True

def run_deterministic(self, runner, print_results=True):
Expand All @@ -515,8 +510,8 @@ def train_and_test(self, agent, n_steps, n_tests, early_stop):
self.use_dataset(Mode.TEST)
self.run_deterministic(runner, print_results=True)
if early_stop > 0:
advantages = np.array(self.acc.episode.advantages[-early_stop:])
if test_acc.i >= early_stop and np.all(advantages > 0):
sharpes = np.array(self.acc.episode.sharpes[-early_stop:])
if test_acc.i >= early_stop and np.all(sharpes > 0):
test_acc.i = n_tests
test_acc.i += 1
except KeyboardInterrupt:
Expand All @@ -527,7 +522,7 @@ def train_and_test(self, agent, n_steps, n_tests, early_stop):

# On last "how would it have done IRL?" run, without getting in the way (no killing on repeats, 0-balance)
print('Running no-kill test-set')
self.use_dataset(Mode.TEST, no_kill=True)
self.use_dataset(Mode.TEST, full_set=True)
self.run_deterministic(runner, print_results=True)

def run_live(self, agent, test=True):
Expand All @@ -544,5 +539,5 @@ def run_live(self, agent, test=True):
print(f'Starting total: {self.start_cash + self.start_value}')

runner = Runner(agent=agent, environment=self)
self.use_dataset(Mode.TEST_LIVE if test else Mode.LIVE, no_kill=True)
self.use_dataset(Mode.TEST_LIVE if test else Mode.LIVE)
self.run_deterministic(runner, print_results=True)
Loading

0 comments on commit e0345c8

Please sign in to comment.