In [1]:
import quandl
from pyspark.sql.functions import udf
from pyspark.sql.types import *
import pyspark
from pyspark.sql import functions as sqlf
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import matplotlib.lines as mlines
import numpy as np
import time
import pandas as pd
import StringIO
import random
from random import randint
from time import gmtime, strftime
import os
import re
import json
import matha
import pytz
import datetime

# Access config
# The values were randomized, update with your own Amazon S3 details.
ACCESS_KEY = "AKIZIGCCH4PVSGRZD7MQ"
SECRET_KEY = "LPv8QAFxmiSJNKJRNqlMzzhml1X0pXBCDOF1dpm6"
ENCODED_SECRET_KEY = SECRET_KEY.replace("/", "%2F")
AWS_BUCKET_NAME = "bucket-name"
LOGFILE = 'log'
TRIAL_LOGFILE = 'trial_log.parquet'
OPTIMIZED_TRIAL_LOGFILE = 'optimized_trial_log.parquet'
TEST_LOGFILE = 'test_log.parquet'
EXP_RESULTS = 'exp_results.json'

Q_FILE = 'Q.csv'
CONFIG_FILE = 'config.json'

# Unmount and mount disk. If there is an error, ucomment the following unmount code line.
dbutils.fs.unmount("/mnt")
dbutils.fs.mount("s3a://%s:%s@%s" % (ACCESS_KEY, ENCODED_SECRET_KEY, AWS_BUCKET_NAME), "/mnt")

# See if csv files in details directory exist.
PROJECT_DIR = "/mnt/rl_trading"
display(dbutils.fs.ls(PROJECT_DIR))

# Quandl setup - Update with your own Quandle API key.
quandl.ApiConfig.api_key = 'zhiR5Rz7eJqp_XNcZz3c'

In [2]:
# Helper functions
def discretize_bins(data, steps=10):
  """ Discretization thresholds.
  """
  npdata = np.array(data)
  if npdata.shape[0] < steps:
    raise ValueError(
      'number of discretization steps (currently {}) must be equal or higher than number of data (currently {})'.format(steps, npdata.shape[0]))
  stepsize = npdata.shape[0] / steps
  npdata = np.sort(npdata)
  th = np.arange(steps)
  th = npdata[((th+1)*stepsize)-1]
  return np.array(th)

def discretize(value, thresholds):
  """ Convert real value to integer index.
  """
  if type(thresholds) == list:
    thresholds = np.array(thresholds)
  idx_r = np.where(thresholds >= value)[0]
  if len(idx_r) == 0:
    if value < thresholds[0]:
      idx = 0
    elif value > thresholds[-1]:
      idx = thresholds.shape[0]-1      
  else:
    idx = idx_r[0]
  return idx

th = discretize_bins(np.array(
    [0.1, 0.2, 0.3, 0.31, 4, 5, 6, 20, 21, 22]), steps=3)
assert(discretize(0.15, th) == 0)
assert(discretize(5, th) == 1)
assert(discretize(20.5, th) == 2)
  
def combine_ints(intlist, digits=None):
  """ Combine multiple integers
  """
  if digits is None:
    digits = len(str(max(intlist)))
  combined = ''
  for i in intlist:
    combined += str(i).zfill(digits)
  return int(combined)

assert(combine_ints([10, 53, 10, 7]) == 10531007)
assert(combine_ints([0, 5, 10, 7]) == 51007)
assert(combine_ints([6, 0], digits=2) == 600)

def split_ints(combined, digits, size=None):
  """ Split combined integers
  
  Args:
  - digits(int): Number of digits, which can be found by counting the digits of
                 `discretization_steps` of a States object.
  - size(int): Size of final intlist. Can be found by counting
               number of columns of a States object. Inferred from digits when None.
  """
  combined = str(combined)
  if size is None:
    size = len(combined) / digits
  combined = combined.zfill(size*digits)
  intlist = []
  for i in range(digits, len(combined)+1, digits):
    pos1 = len(combined)-i
    pos2 = pos1 + digits
    intlist.insert(0, int(combined[pos1:pos2]))
  return intlist
assert(split_ints(10531007, digits=2) == [10, 53, 10, 7])
assert(split_ints(51007, digits=2, size=4) == [0, 5, 10, 7])
assert(split_ints(600, digits=2, size=2) == [6, 0])
    
def to_epoch(date_time, pattern='%Y-%m-%d %H:%M:%S'):
  """ Convert Pandas Timeframe object into epoch."""
  epoch = int(time.mktime(time.strptime(str(date_time), pattern)))
  return epoch

def to_datetime(epoch):
  """ Convert epoch into Pandas Timeframe object."""
  return pd.to_datetime(epoch, unit='s')

def str2t(text, pattern='%Y-%m-%d'):
  """ Convert date string to Timestamp object."""
  return pd.DatetimeIndex([
  datetime.datetime.strptime(
    text, pattern)])[0]

def create_Q_from_file(Q_file):
  s = StringIO.StringIO()
  s.write(dbutils.fs.head(Q_file))
  s.seek(0)
  df = pd.read_csv(s)
  Q = {}
  for equity in frozenset(df['equity']):
    Q[equity] = df.loc[df['equity'] == equity, df.columns[2:]] \
      .set_index('state').transpose().to_dict()
  return Q

def get_rolling_mean(values, window):
  """ Return rolling mean of given values, using specified window size.
  """
  return pd.rolling_mean(values, window=window)


def get_rolling_std(values, window):
  """ Return rolling standard deviation of given values, using specified window size.
  """
  return pd.rolling_std(values, window=window)


def get_bollinger_bands(rm, rstd):
  """ Return upper and lower Bollinger Bands. """
  upper_band = rm + 2 * rstd
  lower_band = rm - 2 * rstd
  return upper_band, lower_band
# END - Helper functions

In [3]:
# Report-related functions
def make_tz_aware(df):
  awaretime = []
  for idx in df.index:
    awaretime.append(idx.replace(tzinfo=pytz.UTC))
  df = df.reindex(awaretime)
  return df
  
def make_test_df_tz_aware(test_df):
  diff_df = test_df.copy()
  diff_df = diff_df.set_index('time')
  diff_df = make_tz_aware(diff_df)
  return diff_df

def select_test_df_treasury_duration(test_df):
  first_idx = test_df.index[0]
  last_idx = max(test_df.index)
  start_date = test_df['time'][first_idx]
  end_date = test_df['time'][last_idx]
  return select_treasury_duration(start_date, end_date)
  
def calculate_beta(test_df, bm, tdf):
  dur = select_test_df_treasury_duration(test_df)
  rfr = tdf[dur]
  df = make_test_df_tz_aware(test_df)
  df = df['return_pct'] - rfr
  bm = make_tz_aware(bm)
  bm = bm - rfr
  return df.cov(bm)/bm.var()

def calculate_sharpe(test_df, tdf, col='return_pct'):
  """
  Args:
    test_df(DataFrame): A pandas dataframe of the data. Must have column 'time' and 'return_pct'.
    tdf(DataFrame): A pandas dataframe returned by function `get_treasury_data`.
  """
  if col == 'return_pct':
    dur = select_test_df_treasury_duration(test_df)
    rfr = tdf[dur] # daily risk-free-return
    diff_df = make_test_df_tz_aware(test_df)
    diff_df = diff_df['return_pct'] - rfr
  else:
    start_date = min(test_df.index)
    end_date = max(test_df.index)
    dur = select_treasury_duration(start_date, end_date)
    rfr = tdf[dur] # daily risk-free-return
    diff_df = make_tz_aware(test_df)
    if col is None:
      diff_df = diff_df - rfr
    else:
      diff_df = diff_df[col] - rfr
  return math.sqrt(252) * diff_df.mean()/diff_df.std(ddof=1)

def max_dd(ser):
  """ Max Dropdown """
  max2here = pd.expanding_max(ser)
  dd2here = ser - max2here
  return dd2here.min()

In [4]:
# https://github.com/quantopian/zipline/blob/master/zipline/data/treasuries.py
#
# Copyright 2013 Quantopian, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.  
from operator import itemgetter
import re

import numpy as np
import pandas as pd


get_unit_and_periods = itemgetter('unit', 'periods')


def parse_treasury_csv_column(column):
    """
    Parse a treasury CSV column into a more human-readable format.

    Columns start with 'RIFLGFC', followed by Y or M (year or month), followed
    by a two-digit number signifying number of years/months, followed by _N.B.
    We only care about the middle two entries, which we turn into a string like
    3month or 30year.
    """
    column_re = re.compile(
        r"^(?P<prefix>RIFLGFC)"
        "(?P<unit>[YM])"
        "(?P<periods>[0-9]{2})"
        "(?P<suffix>_N.B)$"
    )

    match = column_re.match(column)
    if match is None:
        raise ValueError("Couldn't parse CSV column %r." % column)
    unit, periods = get_unit_and_periods(match.groupdict())

    # Roundtrip through int to coerce '06' into '6'.
    return str(int(periods)) + ('year' if unit == 'Y' else 'month')


def earliest_possible_date():
    """
    The earliest date for which we can load data from this module.
    """
    # The US Treasury actually has data going back further than this, but it's
    # pretty rare to find pricing data going back that far, and there's no
    # reason to make people download benchmarks back to 1950 that they'll never
    # be able to use.
    return pd.Timestamp('1980', tz='UTC')


def get_treasury_data(start_date, end_date):
    return pd.read_csv(
        "http://www.federalreserve.gov/datadownload/Output.aspx"
        "?rel=H15"
        "&series=bf17364827e38702b42a58cf8eaa3f78"
        "&lastObs="
        "&from="  # An unbounded query is ~2x faster than specifying dates.
        "&to="
        "&filetype=csv"
        "&label=omit"
        "&layout=seriescolumn"
        "&type=package",
        skiprows=1,  # First row is a useless header.
        parse_dates=['Time Period'],
        na_values=['ND'],  # Presumably this stands for "No Data".
        index_col=0,
    ).loc[
        start_date:end_date
    ].dropna(
        how='all'
    ).rename(
        columns=parse_treasury_csv_column
    ).tz_localize('UTC') * 0.01  # Convert from 2.57% to 0.0257.


def dataconverter(s):
    try:
        return float(s) / 100
    except:
        return np.nan


def get_daily_10yr_treasury_data():
    """Download daily 10 year treasury rates from the Federal Reserve and
    return a pandas.Series."""
    url = "http://www.federalreserve.gov/datadownload/Output.aspx?rel=H15" \
          "&series=bcb44e57fb57efbe90002369321bfb3f&lastObs=&from=&to=" \
          "&filetype=csv&label=include&layout=seriescolumn"
    return pd.read_csv(url, header=5, index_col=0, names=['DATE', 'BC_10YEAR'],
                       parse_dates=True, converters={1: dataconverter},
                       squeeze=True)
  
def select_treasury_duration(start_date, end_date):  
    td = end_date - start_date  
    if td.days <= 31:  
        treasury_duration = '1month'  
    elif td.days <= 93:  
        treasury_duration = '3month'  
    elif td.days <= 186:  
        treasury_duration = '6month'  
    elif td.days <= 366:  
        treasury_duration = '1year'  
    elif td.days <= 365 * 2 + 1:  
        treasury_duration = '2year'  
    elif td.days <= 365 * 3 + 1:  
        treasury_duration = '3year'  
    elif td.days <= 365 * 5 + 2:  
        treasury_duration = '5year'  
    elif td.days <= 365 * 7 + 2:  
        treasury_duration = '7year'  
    elif td.days <= 365 * 10 + 2:  
        treasury_duration = '10year'  
    else:  
        treasury_duration = '30year'

    return treasury_duration

In [5]:
class Commission(object):
  """ Commission Model """
  def __init__(self, per_share=0.0075, min_cost=1):
    self.per_share = per_share
    self.min_cost = min_cost
  def calculate(self, num_share):
    c = num_share * self.per_share
    if c < self.min_cost:
      c = self.min_cost
    return c

In [6]:
class Environment(object):
  """ Environment of this project.
  
  About equity_events property:
  Each equity has one huge events data stored via DataFrame.
    
  This is a bit different from experience tuple stored in the
  agent that does not require dates:
  (state, action, new_state, reward)
  """
  def __init__(self, capital=100000, logger=None,
               commission=None,
               discretization_steps=20, config=None):
    self.valid_actions = [None, 'buy', 'sell']
    self.commission = commission

    # All traders in the environment. Each trader
    # tracks a single equity.
    self.trader_states = {}

    # DataFrames
    self.equity_events = {}
        
    # List of thresholds returned by `discretize_bins` method.
    # Each equity has one list.
    # This will be used in discretization steps.
    self.equity_bins = {}
    
    self.trader = None
    self.done = False
    self.Q_to_load = None
    
    self.logger = logger
    self.capital = capital
    self.discretization_steps = discretization_steps
    
    self.resetted = False

    if config is not None:
      for key in config.keys():
        setattr(self, key, config[key])
    
  def _join_dfs(self, df1, df2,
                 exclude_cols='time', on=None, how='inner'):
    if type(exclude_cols) == str:
      exclude_cols = [exclude_cols]
      
    if not on:
      on = (df1[exclude_cols[0]] == df2[exclude_cols[0]])

    return df1.join(df2, on=on, how=how) \
      .select(map(lambda x: df1[x], df1.columns) + 
        map(lambda x: df2[x],
          [cols for cols in df2.columns if cols not in exclude_cols]))
  
  def log(self, text):
    if self.logger is None:
      print text
    else:
      self.logger.write(text)
  
  def log_trial(self, *args, **kwargs):
    if self.logger is None:
      print var_dict
    else:
      self.logger.log_trial(*args, **kwargs)

  def load_Q(self, Q_file):
    # Todo: Load csv
    self.Q_to_load = sc.parallelize()
    if self.trader != None:
      self.trader.load_Q(self.Q_to_load)
    return self
  
  def save_Q(self, Q_file):
    if self.trader != None:
      rows = []
      fields = ['equity', 'state', 'hold', 'buy', 'sell']
      for state, actions in self.trader.Q.iteritems():
        row = [self.trader.equity,
               state,
               actions['hold'], 
               actions['buy'],
               actions['sell']
              ]
        rows.append(row)
      df = pd.DataFrame(rows, columns=fields)
      s = StringIO.StringIO()
      df.to_csv(s)
      dbutils.fs.put(Q_file, s.getvalue(), overwrite=True)
      
  def prepare_discretization_bins(self):
    """ Prepare the bins/thresholds for discretization step.
    
    This is an expensive function, as it loops over each column of
    each event_df, get all data, and extract the bins using
    `discretize_bins` helper method.
    """
    for equity, event_df in self.equity_events.iteritems():
      for column in event_df.columns:
        data = event_df.select(column).rdd.flatMap(list).collect()
        if equity not in self.equity_bins:
          self.equity_bins[equity] = {}
        self.log("create bins from this data:\n{}\nsteps:{}".format(
            data, self.discretization_steps))
        self.equity_bins[equity][column] = discretize_bins(
          data,
          self.discretization_steps).tolist()
        self.log("created bins: {}".format(
            self.equity_bins[equity][column]))
        
  def current_event(self, events_df):
    """ A shortcut to get the current event."""
    if self.current_datetime_ is None:
      event = events_df.sort(sqlf.asc('time')).first()
    else:
      event = events_df.filter(
          events_df['time'] == self.current_datetime_
      ).first()
    return event
    
  def next_event(self, events_df):
    """ A shortcut to get the next event (usually next day)."""
    if self.current_datetime_ is None:
      event = self.current_event(events_df)
    else:
      event = events_df.filter(
          events_df['time'] > self.current_datetime_
        ).sort(sqlf.asc('time')).first()
    return event

  def add_feature(self, data, equity, name):
    """ Add another feature to stored events rdd (self.equity_events).
    """
    rows = [(
            to_epoch(index),
            float(data.values[idx, 0]) if type(data.values[0]) == list \
              else float(data.values[idx])
          ) for idx, index in enumerate(data.index)]
    df = sqlContext.createDataFrame(rows, schema=['time', name])
    if equity not in self.equity_events:
      self.equity_events[equity] = df
    else:
      self.equity_events[equity] = \
        self._join_dfs(self.equity_events[equity], df)
    
    return self
    
  def add_daily_returns(self, data, equity):
    """ Add a daily return feature.
    """
    return self.add_feature(data, equity, 'daily_return')

  def create_trader(self, trader_class, *args, **kwargs):
    trader = trader_class(self, *args, **kwargs)
    if self.Q_to_load is not None:
      trader.load_Q(self.Q_to_load)
    self.trader = trader
    return trader
  
  def discretize(self, value, equity, feature):
    """ Discretize a value of given equity and feature.

    Usage example:
    >>> discretize(3.3, 'GOOG', 'adj_sma_ratio')
    """
    bins = self.equity_bins[equity][feature]
    result = discretize(value, bins)
    return result
  
  def reset(self, testing=False, trial_num=0):
    """Called at the beginning of a new trial.
    """
    
    self.done = False

    # step() related variables.
    self.current_event_ = None
    self.previous_event_ = None
    self.current_datetime_ = None
    self.cash = self.capital
    self.total_portfolio = self.capital

    self.trader.reset(testing, trial_num=trial_num)
    
    if self.resetted is False:
      self.resetted = True
      for _, df in self.equity_events.iteritems():
        df.cache()
  
  def step(self, event=None):
    """ Stepping into next event.
    
    At the end of this step, return actions chosen by the trader.
    """
    
    # Stepping up for each equity.
    # Todo: Is it possible to do this in parallel?
    # Todo: If one trader needs to handle multiple equities, then
    #       equity events might need to be structured such that
    #       there is a single DataFrame object instead.
    if self.current_event_ is not None:
      self.previous_event_ = self.current_event_

    if event is not None:
      self.current_event_ = event
    else:
      events_df = self.equity_events[self.trader.equity]
      self.current_event_ = self.next_event(events_df)
      
    if self.current_event_ is None:
      self.done = True
      q_cell = {}
      varlog = {}
    else:
      self.current_datetime_ = self.current_event_['time']
      (q_cell, varlog) = self.trader.update(self.current_event_)
    self.log("epsilon: {}".format(self.trader.epsilon))
    self.log("set event to event ID {}".format(self.current_datetime_))
    return (q_cell, varlog)

  def evaluate_last_action(self, trader):
    """Evaluate trader's previous action.
    """
    varlog = {}
    reward = None
    next_state = None
    if trader.last_state is not None:
      event = self.previous_event_
      daily_return = self.current_event_['daily_return']
      
      # For reward calculation
      previous_close = event['adj_close']
      
      # For portfolio calculation
      current_close = self.current_event_['adj_close']
      
      hold_size = trader.hold_size
      initial_hold_size = hold_size
      action = trader.last_action
      action_size = 0
      clearance_size = trader.clearance_pct/100 * self.cash
      
      # Daily return since entry
      new_return = trader.return_since_entry
      
      commission = 0
      
      state = trader.last_state
      self.log("allow short? {}".format(trader.allow_short))
      self.log("evaluate state {}".format(state))
      self.log("  {} hold size (before action): {}".format(
          state, hold_size))
      self.log("  {} previous close price: {}".format(
          state, previous_close))
      self.log("  {} current close price: {}".format(
          state, current_close))
      self.log("  {} daily return: {}".format(state, daily_return))
      
      # Calculate reward from doing the action
      # --------------------------------------
      # Todo: For our first iteration, the agent would just
      #       go all-in when buying or selling. This means
      #       when the trader is in LONG position (hold_size > 0),
      #       it can't buy any more, and vice-versa for SHORT position.
      if hold_size > 0:
        if action is 'buy':
          # Todo: Later we may allow the agent to buy more.
          new_return += daily_return * hold_size
          action_size = 0 
        elif action is 'sell':
          reward = new_return
          hold_size = 0
          new_return = 0
          trader.entry_price = 0
          self.cash += initial_hold_size * previous_close
          if self.commission is not None:
            commission = self.commission.calculate(
              initial_hold_size)
          action_size = initial_hold_size 
        elif action is 'hold':
          new_return += daily_return * hold_size
          action_size = initial_hold_size       
         
      elif hold_size == 0:
        if action is 'buy':
          hold_size = int(clearance_size/previous_close)
          new_return += daily_return * hold_size
          trader.entry_price = previous_close
          self.cash -= hold_size * previous_close
          if self.commission is not None:
            commission = self.commission.calculate(hold_size)
        elif action is 'sell':
          hold_size = -int(clearance_size/previous_close)
          new_return -= daily_return * hold_size
          trader.entry_price = previous_close
          self.cash -= hold_size * previous_close
          if self.commission is not None:
            commission = self.commission.calculate(hold_size)
        elif action is 'hold':
          pass
        action_size = hold_size
        
      elif hold_size < 0:
        if action is 'buy':
          reward = new_return
          hold_size = 0
          new_return = 0
          trader.entry_price = 0
          self.cash += initial_hold_size * previous_close
          if self.commission is not None:
            commission = self.commission.calculate(initial_hold_size)
          action_size = initial_hold_size
        elif action is 'sell':
          new_return -= daily_return * hold_size
          action_size = initial_hold_size
        elif action is 'hold':
          new_return -= daily_return * hold_size
          action_size = 0
      # --------------------------------------
      
      if reward is None:
        reward = new_return

      self.cash -= commission
      self.log("  {} action: {} {}".format(
          state, action, action_size))
      self.log("  {} new return: {}".format(state, new_return))
      self.log("  {} reward: {}".format(state, reward))
      self.log("  {} cash: {}".format(state, self.cash))
      previous_total_portfolio = self.cash + \
        (hold_size * previous_close)
      self.total_portfolio = self.cash + (hold_size * current_close)
      self.log("  {} total portfolio: {}".format(
          state, previous_total_portfolio))
      
      next_state = trader.create_state(
        event=event,
        return_since_entry=new_return,
        hold_size=hold_size,
        update=True
      )
      self.log("  {} next state: {}".format(state, next_state))
      varlog['time'] = event['time']
      varlog['state'] = state
      varlog['cash'] = self.cash
      varlog['total_portfolio'] = previous_total_portfolio
      varlog['return_pct'] = (previous_total_portfolio - self.capital)/self.capital
      varlog['close'] = previous_close
      varlog['next_close'] = current_close
      varlog['daily_return'] = daily_return
      varlog['action'] = action
      varlog['action_size'] = action_size
      varlog['reward'] = reward
      varlog['initial_hold_size'] = initial_hold_size
      varlog['final_hold_size'] = hold_size
      
    return (reward, next_state, varlog)

In [7]:
class Logger(object):
  def __init__(self, log_dirname, trial_filename=None, erase=False, buffer_size=65500):
    self.log_dirname = log_dirname
    self.trial_filename = trial_filename
    self.erase = erase
    self.buf = ""
    self.buffer_size = buffer_size
    if self.erase:
      dbutils.fs.rm(self.log_dirname, recurse=True)
      dbutils.fs.mkdirs(self.log_dirname)
      if self.trial_filename is not None:
        dbutils.fs.rm(self.trial_filename, recurse=True)
      self.trial_df = None
    else:
      if self.trial_filename is not None:
        self.trial_df = sqlContext.read.format('parquet') \
                        .load(self.trial_filename)
  
  def write(self, text):
    timestr = strftime("%Y-%m-%d %H:%M:%S", gmtime())
    self.buf += "[{}] {}\n".format(timestr, text)
    if len(self.buf) >= self.buffer_size:
      self.save(save_trial=False)
  
  def save(self, save_trial=True):
    if len(dbutils.fs.ls(self.log_dirname)) == 0:
      filename = 'log_1.txt'
    else:
      last_idx = sorted(map(
        lambda f: int(f.name.split('_')[1].split('.')[0]),
          dbutils.fs.ls(self.log_dirname)))[-1]
      filename = 'log_{}.txt'.format(last_idx+1)
    
    path = os.path.join(self.log_dirname, filename)
    dbutils.fs.put(path, self.buf, overwrite=True)
    self.buf = ""
    if save_trial and self.trial_df is not None:
      self.trial_df.write.format('parquet') \
        .save(self.trial_filename, mode='overwrite')
  
  def p(self, begin_with=None):
    """ Print from file.

    Sample use: `logger.p(begin_with='(Chosen|values)')` will print
    everything that begins with "Chosen" or "values".
    
    Args: 
      begin_with: Read lines that begin with given regex
    """
    for f in dbutils.fs.ls(self.log_dirname):
      path = str(f.path)
      if begin_with is not None:
        for match in re.finditer(
          '\[.*\] {}.*'.format(begin_with), dbutils.fs.head(path)):
          print match.group(0)
      else:
        print dbutils.fs.head(path)

  def log_trial(self, **kwargs):
    row = {}
    for arg in kwargs:
      row[arg] = kwargs[arg]

    schema = StructType([
        StructField('training', BooleanType(), True),
        StructField('trial', IntegerType(), True),
        StructField('equity', StringType(), True),
        StructField('start_time', TimestampType(), True),
        StructField('end_time', TimestampType(), True),        
        StructField('times_profit', IntegerType(), True),
        StructField('times_draw', IntegerType(), True),
        StructField('times_loss', IntegerType(), True),
        StructField('total_rewards', FloatType(), True),
        StructField('total_portfolio', FloatType(), True),
        StructField('return_pct', FloatType(), True),
        StructField('parameters', MapType(
            StringType(), FloatType()), True)
      ])

    df = sqlContext.createDataFrame(
      sc.parallelize([row]), schema=schema)
    if self.trial_df is None:
      self.trial_df = df
    else:
      self.trial_df = self.trial_df.unionAll(df)

In [8]:
class Trader(object):
  def __init__(self, env, equity=None, allow_short=False):
    self.env = env
    self.experiences = []
    self.equity = equity
    self.allow_short = allow_short
    self.hold_size = 0
    
    # return_since_entry = hold_size * equity value.
    self.return_since_entry = 0
    self.valid_actions = [
      'hold',
      'buy',
      'sell'
    ]
    
  def update(self, event):
    """ Run from Environment class each time there is a new event.
    """
    pass
    
class RLTrader(Trader):
  """ Reinforcement-Learning-powered Trader
  """
  def __init__(self, env, equity=None, learning=True, learning_rate=0.2,
               discount_rate=0.1, epsilon=1.0,
               test_learning_rate=0.02, test_discount_rate=0.01,
               test_epsilon=0, discretization_steps=20,
               allow_short=True, clearance_pct=100,
               earning_pct_bins=[-0.03, -0.01, 0, 0.005, 0.01, 0.02],
               config=None, Q={}, epsilon_decay_f=None):
    """ Initialize trader
    
    Args:
      clearance_pct(float): Percentage of capital the agent
                            is allowed to trade.
    """
    super(RLTrader, self).__init__(env, equity=equity, allow_short=allow_short)
    
    self.learning_rate = learning_rate
    self.discount_rate = discount_rate
    self.epsilon = epsilon
    self.test_learning_rate = test_learning_rate
    self.test_discount_rate = test_discount_rate
    self.test_epsilon = test_epsilon
    self.Q = Q
    self.discretization_steps=discretization_steps
    self.learning = learning
    self.clearance_pct = clearance_pct
    self.earning_pct_bins = earning_pct_bins
    self.entry_price = 0
    self.epsilon_decay_f = epsilon_decay_f
    if config is not None:
      for key in config.keys():
        setattr(self, key, config[key])
        
    if self.equity in self.Q:
      self.Q = self.Q[self.equity]
  
  def reset(self, testing=False, trial_num=0):
    self.last_state = None
    self.last_action = None
    self.last_exp = None
    self.testing = testing
    self.entry_price = 0
    self.hold_size = 0
    
#     self.epsilon -= 0.05
    if self.learning:
      if self.epsilon_decay_f is not None:
        self.epsilon = self.epsilon_decay_f(trial_num)
      else:
        self.epsilon = math.cos(0.05*trial_num)

    
  def update(self, event):
    if self.last_action is not None:
      (reward, next_state, varlog) = self.env.evaluate_last_action(self)
      self.create_Q(next_state)
      self.add_experience(
        state=self.last_state,
        action=self.last_action,
        reward=reward,
        next_state=next_state
      )
    else:
      # First update
      next_state = self.create_state(
        event=event,
        hold_size=self.hold_size,
        return_since_entry=self.return_since_entry
      )
      self.create_Q(next_state)
      varlog = {}
    
    self.last_action = self.choose_action(next_state)
    self.last_state = next_state
    return (self.Q[next_state], varlog)
  
  def get_maxQ(self, state, valid_actions):
    """ The get_max_Q function is called when the trader is asked to
    find the maximum Q-value of all actions based on the 'state'
    the trader is in.
    
    """

    maxQ = None
    for key in valid_actions:
      if (maxQ == None) or (maxQ < self.Q[state][key]):
        maxQ = self.Q[state][key]
    return maxQ 
  
  def choose_action(self, state):
    # Set the agent state and default action
    action = 'hold'
    
    if self.testing:
      epsilon = self.test_epsilon
    else:
      epsilon = self.epsilon
      
    # If held no equity and short is not allowed, do not sell
    if self.hold_size == 0 and self.allow_short is False:
      valid_actions = list(self.valid_actions)
      valid_actions.remove('sell')
    else:
      valid_actions = list(self.valid_actions)

    # When not learning, choose a random action
    # When learning, choose a random action with 'epsilon' probability
    #   Otherwise, choose an action with the highest Q-value for
    #   the current state
    n = len(valid_actions) - 1
    if self.learning:
      rand = randint(0, 99)
      if(rand < epsilon*100):
        action  = valid_actions[randint(0, n)]
        self.env.log(
          "Chosen action '{}' randomly ({} < epsilon {})".format(
            action, rand/100.0, epsilon))
      else:
        # Find the maximum Q-value for the given state
        maxQ = self.get_maxQ(state, valid_actions)

        # Choose a best action based on that value
        action = random.choice([action for action in \
          valid_actions if self.Q[state][action] == maxQ])
        self.env.log(
          "Chosen max action '{}'".format(
            action))
    else:
      action  = valid_actions[randint(0, n)]
      self.env.log(
        "Chosen action {} randomly (not learning)".format(
          action))
    return action
  
  def create_state(self, event=None,
                   return_since_entry=None,
                   hold_size=None,
                   update=False):
    """ Create a state based on an inputs dictionary.
    """
    # Todo: return_since_entry is tricky since we don't know what
    #       thresholds we should use in its discretization step,
    #       so let's exclude it for now.
    values = []
    features = ['adj_sma_ratio',
                'bollinger_upper',
                'bollinger_lower',
                'daily_return']
    self.env.log("event: {}".format(event))
    for feature in features:
      self.env.log(
        "  setting event of feature {}".format(feature))
      self.env.log(
        "  {}".format(event[feature]))
      value = event[feature]
      values.append(self.env.discretize(
          value,
          self.equity,
          feature
        ))
    
    if hold_size*self.entry_price == 0:
      return_pct = 0
    else:
      return_pct = return_since_entry/(hold_size*self.entry_price)
      
    self.env.log(
      "return_since_entry: {}".format(return_since_entry))
    self.env.log(
      "hold_size: {}".format(hold_size))
    self.env.log(
      "entry_price: {}".format(self.entry_price))
    self.env.log(
      "return_pct: {}".format(return_pct))
    values.append(
      discretize(
        return_pct,
        self.earning_pct_bins
      )
    )
    if hold_size > 0:
      hold_position = 1
    elif hold_size == 0:
      hold_position = 0
    else:
      hold_position = -1
    values.append(discretize(hold_position,
                             np.array([-1,0,1])))
    digits = len(str(self.discretization_steps))
    state = combine_ints(values, digits=digits)
    self.env.log(
      "values: {}".format(values))
    self.env.log(
      "state: {}".format(state))
    
    if update is True:
      self.return_since_entry = return_since_entry
      self.hold_size = hold_size
      
    return state

  def create_Q(self, state):
    """ The create_Q function is called when a state is
    generated by the agent. """

    # Check if the 'state' is not in the Q-table
    # If it is not, create a new dictionary for that state.
    # Then, for each action available, set the initial Q-value to 0.
    if not (state in self.Q.keys()):
      self.Q[state] = {}
      for action in self.valid_actions:
        self.Q[state][action] = 0
    return

  def add_experience(self, state=None, action=None, reward=None,
                     next_state=None):
    """ Add experience dictionary to trader's Q.
    """
    if self.learning:
      if self.testing:
        learning_rate = self.test_learning_rate
        discount_rate = self.test_discount_rate
      else:
        learning_rate = self.learning_rate
        discount_rate = self.discount_rate

      old_value = (1-learning_rate)*self.Q[state][action]
      disc_value = discount_rate*self.Q[next_state][action]
      new_value = learning_rate*(reward+disc_value)
      self.Q[state][action] = int(old_value + new_value)
      self.last_exp = {'state': state,
                       'action': action,
                       'reward': reward,
                       'next_state': next_state}
    else:
      self.Q[state][action] = 0
      self.last_exp = {'state': state,
                       'action': action,
                       'reward': reward,
                       'next_state': next_state}

In [9]:
class Simulator(object):
  """ This class helps us to get started without having to trade in real time.
  """
  def __init__(self, env, save_config_to=None,
               hard_limit=100, trial=1):
    self.env = env
    self.save_config_to = save_config_to
    
    self.quit = False
    self.hard_limit = hard_limit
    self.trial = trial
        
  def _save_config(self):
    config = {
      'simulation': {
        'hard_limit': self.hard_limit,
        'trial': self.trial
      },
      'env': {
        'equity_bins': self.env.equity_bins,
        'discretization_steps': self.env.discretization_steps
      },
      'commission': {
        'per_share': self.env.commission.per_share,
        'min_cost': self.env.commission.min_cost
      },
      'trader': {
        'allow_short': self.env.trader.allow_short,
        'equity': self.env.trader.equity,
        'earning_pct_bins': self.env.trader.earning_pct_bins,
        'clearance_pct': self.env.trader.clearance_pct,
        'discretization_steps': self.env.trader.discretization_steps,
        'epsilon': self.env.trader.epsilon,
        'learning_rate': self.env.trader.learning_rate,
        'discount_rate': self.env.trader.discount_rate
      }
    }
    dbutils.fs.put(self.save_config_to, json.dumps(config),
                   overwrite=True)
    
  def run(self, n_test=10, tolerance=0.005):
    if len(self.env.equity_bins.keys()) == 0:
      self.env.prepare_discretization_bins()
    testing = False
    # Todo: Multiple traders or one trader handles multiple equities?
    trader = self.env.trader
    trial = 1
    self.env.log("training starts")
    self.env.reset(testing, trial_num=trial)
    equity = self.env.trader.equity
    start_time = to_datetime(self.env.equity_events[equity].sort('time').first()['time'])
    end_time = to_datetime(self.env.equity_events[equity].sort(sqlf.desc('time')).first()['time'])
    while True:
      if not testing:
        self.env.log("trader is learning, epsilon: {}, trial: {}".format(trader.epsilon, trial))
        if trader.epsilon < tolerance or trial > self.hard_limit:
          self.env.log("testing starts")
          testing = True
          trial = 1

      else:
        if trial > n_test:
          break
      self.env.reset(testing, trial_num=trial)
      total_rewards = 0
      times_profit = 0
      times_loss = 0
      times_draw = 0
      while True:
        self.env.log("stepping")
        (action, varlog) = self.env.step()
        if trader is not None and trader.last_exp is not None:
          total_rewards += trader.last_exp['reward']
          if trader.last_exp['reward'] > 0:
            times_profit += 1
          if trader.last_exp['reward'] == 0:
            times_draw += 1
          if trader.last_exp['reward'] < 0:
            times_loss += 1
        if self.env.done:
          break
  
      self.env.log_trial(
        training=(not testing),
        trial=trial,
        equity=trader.equity,
        start_time=start_time,
        end_time=end_time,
        times_profit=times_profit,
        times_draw=times_draw,
        times_loss=times_loss,
        total_rewards=total_rewards,
        total_portfolio=self.env.total_portfolio,
        return_pct=(self.env.total_portfolio - self.env.capital)/self.env.capital,
        parameters={
          'a': trader.learning_rate if not testing else trader.test_learning_rate,
          'e': trader.epsilon if not testing else trader.test_epsilon,
          'd': trader.discount_rate if not testing else trader.test_discount_rate
        }
      )
      trial += 1
      
      if self.save_config_to is not None:
        self._save_config()

In [10]:
def get_rolling_mean(values, window):
  """ Return rolling mean of given values, using specified window size.
  """
  return pd.rolling_mean(values, window=window)


def get_rolling_std(values, window):
  """ Return rolling standard deviation of given values, using specified window size.
  """
  return pd.rolling_std(values, window=window)


def get_bollinger_bands(rm, rstd):
  """ Return upper and lower Bollinger Bands. """
  upper_band = rm + 2 * rstd
  lower_band = rm - 2 * rstd
  return upper_band, lower_band

In [11]:
def max_dd(ser):
  max2here = pd.expanding_max(ser)
  dd2here = ser - max2here
  return dd2here.min()

def visualize(plt, trial_df, test_df, close_df, capital=100000, learning=True):
  fig = plt.figure(figsize=(13,17))
  gs_train = plt.GridSpec(4, 2)
  gs_test = plt.GridSpec(4, 2)
  gs_details = plt.GridSpec(4, 2)
  trial1 = fig.add_subplot(gs_train[0,0])
  trial2 = fig.add_subplot(gs_train[0,1])
  params = fig.add_subplot(gs_train[1,0])
  test_params = fig.add_subplot(gs_train[1,1])
  test_params.axis('off')
  # Left and right axes:
  test2 = fig.add_subplot(gs_test[2,:], frameon=False)
  test = fig.add_subplot(gs_test[2,:], sharex=test2)

#   test = fig.add_subplot(gs_test[2,:])
  details = fig.add_subplot(gs_details[3,0])
  details.axis('off')
  ab = fig.add_subplot(gs_details[3,1])
  
  fmt = '${x:,.0f}'
  tick_dollar = mtick.StrMethodFormatter(fmt)
  fmt1 = '{x:,.2%}'
  tick_percentage = mtick.StrMethodFormatter(fmt1)
  
  first_idx = test_df.index[0]
  last_idx = max(test_df.index)
  start_date_obj = test_df['time'][first_idx]
  start_date = start_date_obj.strftime('%Y-%m-%d')
  end_date = test_df['time'][last_idx].strftime('%Y-%m-%d')

  spy = quandl.get('GOOG/NYSE_SPY', start_date=start_date,
           end_date=end_date, transform='rdiff')['Close']
  spy.ix[start_date_obj] = 0.0  # adding a row
  spy = spy.sort_index()

  # Training Returns
  
  trial_train = trial_df[trial_df['training']==True]
  
  trial1.plot(trial_train['trial'], trial_train['total_portfolio'].apply(lambda x: (x-capital)/capital))
  trial1.set_title("Training Portfolio\n(starting capital ${:,})".format(capital))
  trial1.get_yaxis().set_major_formatter(tick_percentage)
  trial1.set_xlim((1, max(trial_train['trial'])))
  trial1.set_xlabel("Trial")
  trial1.set_ylabel("Total Return")
  trial1.grid(True)
  
  # Training PDL
  
  p, = trial2.plot(trial_train['trial'], trial_train['times_profit'], color='green', label="# Profit")
  d, = trial2.plot(trial_train['trial'], trial_train['times_draw'], color='#AA8939', linestyle='-', label="# Draw")
  l, = trial2.plot(trial_train['trial'], trial_train['times_loss'], color='r', linestyle='-', label="# Loss")
  trial2.legend(loc='upper center', bbox_to_anchor=(0.5, 1.15),
                ncol=3, fontsize=10)
  trial2.set_xlim((1, max(trial_train['trial'])))
  trial2.set_xlabel("Trial")
  trial2.set_ylabel("Number of Occurences")
  trial2.grid(True)
  trial2.text(0.5, 1.16, "Training Profits, Draws, and Losses",
              horizontalalignment='center',
              transform=trial2.transAxes,
              fontsize=14)
  
  # Training Parameters
  
  if not learning:
    params.axis('off')
    params.text(0.5, 0.5, "Learning Disabled",
                horizontalalignment='center',
                verticalalignment='center',
                fontsize=20)
  else:
    learning_fct = params.plot(
      trial_train['trial'], trial_train['parameters'].apply(lambda x: x['a']),
      label="Learning Factor")
    exploration_fct = params.plot(
      trial_train['trial'], trial_train['parameters'].apply(lambda x: x['e']),
      'g--', label="Exploration Factor")
    discount_fct = params.plot(
      trial_train['trial'], trial_train['parameters'].apply(lambda x: x['d']),
      'r-.', label="Discount Factor")
    params.legend(loc='upper center', bbox_to_anchor=(0.5, 1.15),
                  ncol=3, fontsize=10)
    params.grid(True)
    params.set_xlim((1, max(trial_train['trial'])))
    params.set_xlabel("Trial")
    params.set_ylabel("Parameter Value")
    params.text(0.5, 1.16, "Training Parameters",
                horizontalalignment='center',
                transform=params.transAxes,
                fontsize=14)
  
  # More Details
  
  test_params.text(0, 1.16, 'More Details', fontweight='bold')
  test_learning_fct = trial_df.tail(1)['parameters'].values[0]['a']
  test_exploration_fct = trial_df.tail(1)['parameters'].values[0]['e']
  test_discount_fct = trial_df.tail(1)['parameters'].values[0]['d']
  train_num_days = trial_train['times_profit'][0] + \
    trial_train['times_draw'][0] + trial_train['times_loss'][0]
  text = """Equity: ${}

Training:
Date range: {} to {}
Num. trading days: {}
Avg. close price: ${:,.2f}
Max. close price: ${:,.2f}
Min. close price: ${:,.2f}
Standard Deviation: ${:,.2f}

Testing:
Date range: {} to {}
Num. trading days: {}
Avg. close price: ${:,.2f}
Max. close price: ${:,.2f}
Min. close price: ${:,.2f}
Standard Deviation: ${:,.2f}

Testing Learning Factor: {}
Testing Discount Factor: {}""".format(
    trial_train['equity'][0],
    trial_train['start_time'][0].strftime('%Y-%m-%d'),
    trial_train['end_time'][0].strftime('%Y-%m-%d'),
    train_num_days,
    close_df.ix[0:train_num_days, 'Adj. Close'].mean(),
    close_df.ix[0:train_num_days, 'Adj. Close'].max(),
    close_df.ix[0:train_num_days, 'Adj. Close'].min(),
    close_df.ix[0:train_num_days, 'Adj. Close'].std(),
    
    start_date,
    end_date,
    len(test_df),
    test_df['close'].mean(),
    test_df['close'].max(),
    test_df['close'].min(),
    test_df['close'].std(),
    test_learning_fct,
    test_discount_fct
  )
  test_params.text(0, 1.07, text, verticalalignment='top')
  
  # Test Performance  
  yvar = 'return_pct'
  ylabel1 = "Daily Return"
  ylabel2 = "Benchmark (SPY)"
  line1 = test.plot(test_df.index, test_df[yvar])
  line2 = test.plot(test_df.index, spy, 'r--')
  test.set_xticks(test_df.index)
  test.set_xticklabels(test_df['time'].apply(lambda x: x.strftime('%Y-%m-%d')))
  for tick in test.get_xticklabels():
    tick.set_rotation(45)
    tick.set_horizontalalignment('right')
  test.text(0.5, 1.16, "Testing Performance",
            horizontalalignment='center',
            transform=test.transAxes,
            fontsize=14)
  test.set_xlim((min(test_df.index), max(test_df.index)))
  test.get_yaxis().set_major_formatter(tick_percentage)
  test.set_ylabel(ylabel1)
  test.set_xlabel("Date")
  test.grid(True)
  
  # Right axis line
  ylabel3 = "Close Price"
  
  line3 = test2.plot(test_df.index,
                     test_df['close'], 'k--',
                     color='#B8B3B3')
  test2.yaxis.tick_right()
  test2.yaxis.set_label_position("right")
  test2.set_ylabel(ylabel3)
  plt.setp(test2.get_xticklabels(), visible=False)
  test2.get_yaxis().set_major_formatter(tick_dollar)

  # Actions
  yvar = 'close'
  point1 = test2.plot(test_df[test_df['action']=='hold'].index, test_df[test_df['action']=='hold'][yvar], 'wo', label="Hold")
  point2 = test2.plot(test_df[test_df['action']=='buy'].index, test_df[test_df['action']=='buy'][yvar], 'go', label="Buy")
  point3 = test2.plot(test_df[test_df['action']=='sell'].index, test_df[test_df['action']=='sell'][yvar], 'r^', label="Sell")

  action_sizes = test_df['action_size']
  t=test2.get_xticks().tolist()
  ymin = min(t)
  ymax = max(t)
  
  for i, value in enumerate(action_sizes):
    idx = action_sizes.index[i]
    text = "{} {}".format(test_df['action'][idx], value)
    yrange = (ymax - ymin)
    if (test_df[yvar][idx] - ymin) < yrange*10/100:
      rotation = 45
      ha = 'left'
      va = 'bottom'
    else:
      rotation = -45
      ha = 'left'
      va = 'top'

    if test_df.index[idx] == max(test_df.index):
      ha = 'right'
      va = 'bottom'
    test2.annotate(text, (test_df.index[idx],
                         test_df[yvar][idx]),
                  fontsize=9,
                  verticalalignment=va,
                  horizontalalignment=ha,
                  rotation=rotation)
    
    
  # Create legends manually with artist rendition:
  # http://matplotlib.org/users/legend_guide.html#creating-artists-specifically-for-adding-to-the-legend-aka-proxy-artists
  line1_a = mlines.Line2D([], [], color='blue', label=ylabel1)
  line2_a = mlines.Line2D([], [], color='red', linestyle='--', label=ylabel2)
  line3_a = mlines.Line2D([], [], color='#B8B3B3', linestyle='--', label=ylabel3)
  point1_a = mlines.Line2D([], [], color='white', marker='o', label="Hold")
  point2_a = mlines.Line2D([], [], color='white', markerfacecolor='green', marker='o', label="Buy")
  point3_a = mlines.Line2D([], [], color='white', markerfacecolor='red', marker='^', label="Sell")
  test.legend(
    handles=[line1_a, line2_a, line3_a, point1_a, point2_a, point3_a],
    loc='upper center', bbox_to_anchor=(0.5, 1.15),
              ncol=6, fontsize=10)
  
  # Details
  tdf = get_treasury_data(start_date, end_date)
  rfr = tdf[select_test_df_treasury_duration(test_df)]
  spy_returns = make_tz_aware(spy) - rfr
  m_returns = make_test_df_tz_aware(test_df)['return_pct'] - rfr
  ab.scatter(spy_returns, m_returns)
  (b, a) = np.polyfit(spy_returns, m_returns, 1)
  f = np.poly1d((b, a))
  
  alpha = a
  beta = b
  # Result of `calculate_beta` should equal `b`.
#   beta = calculate_beta(test_df, spy, tdf)
  sharpe = calculate_sharpe(test_df, tdf)
  spy_sharpe = calculate_sharpe(spy, tdf, col=None)
  
  details.text(0, 1.09, 'Testing Performance Result', fontweight='bold',
               verticalalignment='top')
  dtext = """
Starting Capital: ${:,.0f}
Final Portfolio: ${:,.0f}
Total Returns: {:.2%}
Alpha: {:.2f}
Beta: {:.2f}
Sharpe Ratio: {:.2f}
SPY Sharpe Ratio: {:.2f}
Max Drawdown: {:.2%}""".format(
    capital,
    test_df.tail(1)['total_portfolio'].values[0],
    test_df.tail(1)['return_pct'].values[0],
    alpha,
    beta,
    sharpe,
    spy_sharpe,
    max_dd(test_df['return_pct'])
  )
  details.text(0, 1.0, dtext,
               verticalalignment='top')

  # Alpha and Beta
  rng1 = max(m_returns) - min(m_returns)
  rng2 = max(spy_returns) - min(spy_returns)
  if rng1 > rng2:
    lim = (min(m_returns)-rng1/10, max(m_returns)+rng1/10)
  else:
    lim = (min(spy_returns)-rng2/10, max(spy_returns)+rng2/10)
  ab.set_xlim(lim)
  ab.set_ylim(lim)

  x = ab.get_xticks().tolist()
  ab.plot(x, f(x), 'r--')
  ab.get_yaxis().set_major_formatter(tick_percentage)
  ab.get_xaxis().set_major_formatter(tick_percentage)
  ab.set_xlabel("SPY Daily Return")
  ab.set_ylabel("Model Daily Return")
  ab.text(1, 0.96, "y=%.6fx+(%.6f)"%(b,a),
          horizontalalignment='right',
          verticalalignment='top',
          transform=ab.transAxes)
  ab.set_title("Daily Returns")
  
  plt.tight_layout(h_pad=2)
  display(plt.show())
  
# visualize(plt, e2_trial_df, e2_test_df, close_df=e2_close_df, capital=100000)

# Experiments

In [13]:
def split_df(df, end):
  total = len(df)
  return (df.head(total - end), df.tail(end))
  
def experiment(start_date, end_date, equity,
               recalculate=False, plt=None,
               rolling_window=20, test_days=20,
               learning=True, capital=100000,
               dsteps=10, a=0.2, e=1.0, d=0.1, ta=0, te=0, td=0,
               epsilon_decay_f=None, allow_short=False,
               epb=[-0.03, -0.01, 0, 0.005, 0.01, 0.02],
               hard_limit=200, tol=0.005,
               trial_logfile=TRIAL_LOGFILE, test_logfile=TEST_LOGFILE,
               config_file=CONFIG_FILE, Q_file=Q_FILE,
               project_dir=PROJECT_DIR, logfile=LOGFILE):
  # Training
  trial_logfile = os.path.join(project_dir, trial_logfile)
  test_logfile = os.path.join(project_dir, test_logfile)
  
  # If not recalculate and files already exist, just load them
  # and visualize.
  load_dfs = False
  if not recalculate:
    try:
      dbutils.fs.ls(trial_logfile)
      dbutils.fs.ls(test_logfile)
      load_dfs=True
    except:
      print "{} or {} not found, recalculate.".format(
        trial_logfile, test_logfile)

  adj = quandl.get(equity, start_date=start_date,
                   end_date=end_date, column_index=11)

  if load_dfs:
    trial_df = sqlContext.read.format('parquet') \
      .load(trial_logfile).toPandas()

    test_df = sqlContext.read.format('parquet') \
      .load(test_logfile).toPandas()
  else:
    Q_file = os.path.join(project_dir, Q_file)
    logfile = os.path.join(project_dir, logfile)
    trial_logfile = os.path.join(
      project_dir, trial_logfile)
    config_file = os.path.join(project_dir, config_file)

    # Use logger = None to print out the log instead.
    logger = Logger(logfile, trial_logfile, erase=True)

    commission_mdl = Commission(per_share=0.0075, min_cost=1)
    env = Environment(capital=capital,
                      logger=logger,
                      commission=commission_mdl,
                      discretization_steps=dsteps)


    sma = get_rolling_mean(
      adj['Adj. Close'], rolling_window)[rolling_window-1:]
    adj_sma_ratio = adj[rolling_window-1:]/sma
    rstd = get_rolling_std(
      adj['Adj. Close'], rolling_window)[rolling_window-1:]
    upper_band, lower_band = get_bollinger_bands(sma, rstd)
    daily_returns = quandl.get(equity,
                               start_date=start_date,
                               end_date=end_date,
                               column_index=11,
                               transform='diff')[rolling_window-1:]

    num_days_min_w = len(adj_sma_ratio)
    if num_days_min_w <= test_days:
      raise ValueError(
        "number of test days (currently {}) must be smaller than number of total days minus rolling window (currently {}). Choose smaller test days or a wider range of dates.".format(test_days, num_days_min_w))

    train_adj_sma_ratio, test_adj_sma_ratio = split_df(adj_sma_ratio, test_days)
    train_sma, test_sma = split_df(sma, test_days)
    train_rstd, test_rstd = split_df(rstd, test_days)
    train_upper_band, test_upper_band = split_df(upper_band, test_days)
    train_lower_band, test_lower_band = split_df(lower_band, test_days)
    train_adj, test_adj = split_df(adj, test_days)
    train_daily_returns, test_daily_returns = split_df(daily_returns, test_days)

    env.add_feature(train_adj, equity=equity, name='adj_close')
    env.add_feature(train_adj_sma_ratio, equity=equity, name='adj_sma_ratio')
    env.add_feature(train_upper_band, equity=equity, name='bollinger_upper')
    env.add_feature(train_lower_band, equity=equity, name='bollinger_lower')

    env.add_daily_returns(train_daily_returns, equity=equity)

    env.create_trader(RLTrader, equity=equity,
                      learning=learning, learning_rate=a,
                      epsilon=e, discount_rate=d,
                      allow_short=allow_short, test_learning_rate=ta,
                      test_epsilon=te, test_discount_rate=td,
                      earning_pct_bins=epb,
                      epsilon_decay_f=epsilon_decay_f)
    sim = Simulator(env,
                    save_config_to=config_file,
                    hard_limit=hard_limit)
    sim.run(n_test=1, tolerance=tol)
    env.save_Q(Q_file)

    # Testing

    env.reset(testing=True)
    test_logfile = os.path.join(project_dir, test_logfile)
    schema = StructType([
        StructField('time', TimestampType(), True),
        StructField('total_portfolio', FloatType(), True),
        StructField('return_pct', FloatType(), True),
        StructField('action', StringType(), True),
        StructField('action_size', IntegerType(), True),
        StructField('close', FloatType(), True),
        StructField('action_total', FloatType(), True),
        StructField('cash', FloatType(), True),
        StructField('commission', FloatType(), True)
      ])
    test_df = None
    for idx in test_adj_sma_ratio.index:
      event = {
               'time': to_epoch(idx),
               'adj_close': test_adj.ix[idx, 0],
               'adj_sma_ratio': test_adj_sma_ratio.ix[idx, 0],
               'bollinger_upper': test_upper_band.ix[idx, 0],
               'bollinger_lower': test_lower_band.ix[idx, 0],
               'daily_return': test_daily_returns.ix[idx, 0]
              }
      (q_cell, varlog) = env.step(event)
      if varlog is not None and 'total_portfolio' in varlog:
        row = {
          'time': idx,
          'total_portfolio': float(varlog['total_portfolio']),
          'return_pct': float(varlog['return_pct']),
          'action': varlog['action'],
          'action_size': varlog['action_size'],
          'close': float(varlog['close']),
          'action_total': float(varlog['action_size'] * varlog['close']),
          'cash': float(varlog['cash']),
          'commission': float(varlog['cash'])
        }
        df = sqlContext.createDataFrame(
          sc.parallelize([row]), schema=schema)
        if test_df is None:
          test_df = df
        else:
          test_df = test_df.unionAll(df)

    if test_df is not None:
      test_df.write.format('parquet') \
        .save(test_logfile, mode='overwrite')

    logger.save()

    trial_df = sqlContext.read.format('parquet') \
      .load(trial_logfile).toPandas()
      
    test_df = test_df.toPandas()
  
  if plt is not None:
    visualize(plt, trial_df, test_df, close_df=adj, capital=capital, learning=learning)
  return (trial_df, test_df, adj)

In [14]:
# Initial benchmark
# learning rate, exploration rate, discount factor

import tabletext

def absmin(l):
  min_v = np.inf
  for i, v in enumerate(l):
    if abs(v) < abs(min_v):
      min_v = v
  return min_v

def call(report, f, metric='sharpe'):
  items = []
  for a, r1 in report.iteritems():
    row = [a]
    for e, r2 in r1.iteritems():
      gs = []
      for g, r3 in r2.iteritems():
        items.append(r3[metric])
  f = eval(f)
  return(f(items))
  
def show(report, metric='sharpe', best='max'):
  disp = []
  head = ['']
  best_v = "{:.2f}".format(call(report, best, metric=metric))
  for e, r in report[report.keys()[0]].iteritems():
    head.append(e)
  disp.append(head)
  for a, r1 in iter(sorted(report.iteritems())):
    row = [a]
    for e, r2 in iter(sorted(r1.iteritems())):
      gs = []
      for g, r3 in iter(sorted(r2.iteritems())):
        v = "{:.2f}".format(r3[metric])
        if v == best_v:
          v = "{}*".format(v) 
        gs.append(v)
      cell = "{}/{}/{}".format(*gs)
      row.append(cell)
    disp.append(row)
  print tabletext.to_text(disp)

results = {}
start_date = '2016-05-02'
end_date = '2016-09-14'
equity = 'WIKI/TSLA'
alphas = [0.1, 0.2, 0.3]
def f1(t):
  return math.cos(0.05*t)
def f2(t):
  return 0.05**t
def f3(t):
  t2 = t**2
  if t2 == 0:
    return 0
  else:
    return 1/(t2)
epsilons = [f1, f2, f3]
gammas = [0.05, 0.1, 0.2]

save_to = os.path.join(PROJECT_DIR, EXP_RESULTS)

try:
  results = json.loads(dbutils.fs.head(save_to))
except:

  for a in alphas:
    if a not in results:
      results[a] = {}
    for e in epsilons:
      if e.__name__ not in results[a]:
        results[a][e.__name__] = {}
      for g in gammas:
        if g not in results[a][e.__name__]:
          results[a][e.__name__][g] = {}
        fname = 'a{}_e{}_g{}'.format(a,e.__name__,g)
        trial_df, test_df, close_df = experiment(
          start_date, end_date, equity, learning=True,
          a=a, epsilon_decay_f=e, d=g,
          trial_logfile='{}_trial.parquet'.format(fname),
          test_logfile='{}_test.parquet'.format(fname),
          Q_file='{}_Q.csv'.format(fname),
          config_file='{}_config.json'.format(fname),
          logfile='{}_log.parquet'.format(fname)
        )
        
        first_idx = test_df.index[0]
        last_idx = max(test_df.index)
        t_start_date_obj = test_df['time'][first_idx]
        t_start_date = t_start_date_obj.strftime('%Y-%m-%d')
        t_end_date = test_df['time'][last_idx].strftime('%Y-%m-%d')
        
        tdf = get_treasury_data(t_start_date, t_end_date)
        sharpe = calculate_sharpe(test_df, tdf)

        spy = quandl.get('GOOG/NYSE_SPY', start_date=t_start_date,
           end_date=t_end_date, transform='rdiff')['Close']
        spy.ix[t_start_date_obj] = 0.0  # adding a row
        spy = spy.sort_index()

        rfr = tdf[select_test_df_treasury_duration(test_df)]
        spy_returns = make_tz_aware(spy) - rfr
        m_returns = make_test_df_tz_aware(test_df)['return_pct'] - rfr
        (beta, alpha) = np.polyfit(spy_returns, m_returns, 1)
        results[a][e.__name__][g] = {
          'alpha': alpha,
          'beta': beta,
          'sharpe': sharpe
        }

  dbutils.fs.put(save_to, json.dumps(results),
           overwrite=True)
print "Sharpe"
show(results, metric='sharpe', best='max')
print "Alpha"
show(results, metric='alpha', best='max')
print "Beta"
show(results, metric='beta', best='absmin')

In [15]:
# Scenario 1
e1_trial_df, e1_test_df, e1_close_df = experiment(
  '2014-05-09', '2015-03-27', 'WIKI/TSLA', plt=plt, learning=False,
  allow_short=False, hard_limit=30, recalculate=False,
  trial_logfile='e1_trial.parquet', test_logfile='e1_test.parquet',
  Q_file='e1_Q.csv', config_file='e1_config.json', logfile='e1_log')

In [16]:
# Scenario 2
e2_trial_df, e2_test_df, e2_close_df = experiment(
  '2014-05-09', '2015-03-27', 'WIKI/TSLA', plt=plt,
  a=0.2, e=1.0, d=0.1, ta=0, te=0, td=0, allow_short=False,
  recalculate=False,
  trial_logfile='e2_trial.parquet',
  test_logfile='e2_test.parquet',
  Q_file='e2_Q.csv', config_file='e2_config.json', 
  logfile='e2_log')

In [17]:
# Scenario 3
e3_trial_df, e3_test_df, e3_close_df = experiment(
  '2014-05-09', '2015-03-27', 'WIKI/TSLA', plt=plt,
  a=0.2, e=1.0, d=0.1, ta=0.1, te=0, td=0.05, allow_short=False,
  recalculate=False,
  trial_logfile='e3_trial.parquet',
  test_logfile='e3_test.parquet',
  Q_file='e3_Q.csv', config_file='e3_config.json', 
  logfile='e3_log')

In [18]:
# Scenario 4 - 3 months training data, agent stops learning.
e4_trial_df, e4_test_df, e4_close_df = experiment(
  '2014-10-27', '2015-03-27', 'WIKI/TSLA', plt=plt, learning=True,
  allow_short=False, recalculate=False,
  trial_logfile='e4_trial.parquet', test_logfile='e4_test.parquet',
  Q_file='e4_Q.csv', config_file='e4_config.json', logfile='e4_log')

In [19]:
# Scenario 5 - 3 months training data, agent keeps learning.
e5_trial_df, e5_test_df, e5_close_df = experiment(
  '2014-10-27', '2015-03-27', 'WIKI/TSLA', plt=plt, learning=True,
  recalculate=False,
  a=0.2, e=1.0, d=0.1, ta=0.1, te=0, td=0.05, allow_short=False,
  trial_logfile='e5_trial.parquet', test_logfile='e5_test.parquet',
  Q_file='e5_Q.csv', config_file='e5_config.json', logfile='e5_log')

# Exploratory Visualization

In [21]:
train_df = e3_close_df['Adj. Close'][
  str2t('2014-05-09'):str2t('2015-02-27')]

test_df = e3_close_df['Adj. Close'][
  str2t('2015-03-03'):str2t('2015-03-27')]

all_df = e3_close_df['Adj. Close'][
  str2t('2014-05-09'):str2t('2015-03-27')]

fmt = '${x:,.0f}'
tick_dollar = mtick.StrMethodFormatter(fmt)

fmt1 = '{x:,.2%}'
tick_percentage = mtick.StrMethodFormatter(fmt1)

In [22]:
ax = e3_close_df.plot()
ax.set_title("Adjusted Close for TSLA")
ax.set_ylabel("Adjusted Close")
ax.get_yaxis().set_major_formatter(tick_dollar)
display(plt.show())

In [23]:
fig = plt.figure()
gs_main = plt.GridSpec(1,2)
ax = fig.add_subplot(gs_main[0,0])
ax1 = fig.add_subplot(gs_main[0,1])
ax.hist(train_df)
ax.set_title("Training Data Distribution (TSLA)")
ax.set_ylabel("Count")
ax.set_xlabel("Adjusted Close")
ax.get_xaxis().set_major_formatter(tick_dollar)

ax1.hist(test_df)
ax1.set_title("Testing Data Distribution (TSLA)")
ax1.set_ylabel("Count")
ax1.set_xlabel("Adjusted Close")
ax1.get_xaxis().set_major_formatter(tick_dollar)
ax1.set_yticks([1, 2, 3, 4, 5])

display(plt.show())

In [24]:
dr = quandl.get('WIKI/TSLA',
                start_date='2014-05-09',
                end_date='2015-03-27',
                column_index=11,
                transform='diff')
spy_dr = quandl.get('GOOG/NYSE_SPY',
                    start_date='2014-05-09',
                    end_date='2015-03-27',
                    column_index=4,
                    transform='diff')

fig = plt.figure()
gs_main = plt.GridSpec(1,1)
ax = fig.add_subplot(gs_main[0,0])
ax.plot(dr.index, dr['Adj. Close'], 'b-', label='TSLA')
ax.plot(spy_dr.index, spy_dr['Close'], 'r-', label='SPY')
ax.set_title("Daily Returns (TSLA vs SPY)")
ax.set_xlabel("Date")
ax.set_ylabel("Daily Return")
ax.get_yaxis().set_major_formatter(tick_dollar)
for tick in ax.get_xticklabels():
  tick.set_rotation(45)
  tick.set_horizontalalignment('right')
ax.legend(loc='upper center', bbox_to_anchor=(0.5, 0.98),
              ncol=2, fontsize=10)
plt.tight_layout(h_pad=2)
display(plt.show())

In [25]:
rolling_window = 20
dr1 = get_rolling_mean(dr['Adj. Close'], rolling_window)[rolling_window-1:]
spy_dr1 = get_rolling_mean(spy_dr['Close'], rolling_window)[rolling_window-1:]

fig = plt.figure()
gs_main = plt.GridSpec(1,1)
ax = fig.add_subplot(gs_main[0,0])
ax.plot(dr1.index, dr1, 'b-', label='TSLA')
ax.plot(spy_dr1.index, spy_dr1, 'r-', label='SPY')
ax.set_title("20-days Rolling Daily Returns (TSLA vs SPY)")
ax.set_xlabel("Date")
ax.set_ylabel("Daily Return")
ax.get_yaxis().set_major_formatter(tick_dollar)
for tick in ax.get_xticklabels():
  tick.set_rotation(45)
  tick.set_horizontalalignment('right')
ax.legend(loc='upper center', bbox_to_anchor=(0.5, 0.98),
              ncol=2, fontsize=10)
plt.tight_layout(h_pad=2)
display(plt.show())

In [26]:
rdr = quandl.get('WIKI/TSLA',
                start_date='2014-05-09',
                end_date='2015-03-27',
                column_index=11,
                transform='rdiff')
rspy_dr = quandl.get('GOOG/NYSE_SPY',
                    start_date='2014-05-09',
                    end_date='2015-03-27',
                    column_index=4,
                    transform='rdiff')
rolling_window = 20
rdr1 = get_rolling_mean(rdr['Adj. Close'], rolling_window)[rolling_window-1:]
rspy_dr1 = get_rolling_mean(rspy_dr['Close'], rolling_window)[rolling_window-1:]


fig = plt.figure()
gs_main = plt.GridSpec(1,1)
ab = fig.add_subplot(gs_main[0,0])
ab.scatter(rspy_dr1, rdr1)
(b, a) = np.polyfit(rspy_dr1, rdr1, 1)
f = np.poly1d((b, a))

x = ab.get_xticks().tolist()
ab.plot(x, f(x), 'r--')
ab.get_yaxis().set_major_formatter(tick_percentage)
ab.get_xaxis().set_major_formatter(tick_percentage)
ab.set_xlabel("SPY Daily Return")
ab.set_ylabel("TSLA Daily Return")
ab.text(1, 0.96, "y=%.6fx+(%.6f)"%(b,a),
        horizontalalignment='right',
        verticalalignment='top',
        transform=ab.transAxes)
rng = max(rdr1) - min(rdr1)
lim = (min(rdr1)-rng/10, max(rdr1)+rng/10)
ab.set_xlim(lim)
ab.set_ylim(lim)
display(plt.show())

In [27]:
rolling_window = 20

fig = plt.figure(figsize=(10, 6))
gs_main = plt.GridSpec(2,2)
ax = fig.add_subplot(gs_main[0,0])
ax1 = fig.add_subplot(gs_main[0,1])
ax2 = fig.add_subplot(gs_main[1,0])
ax3 = fig.add_subplot(gs_main[1,1])
ax.plot(all_df.index, all_df)
ax.set_title("Adjusted Close of Training Data (TSLA)")
ax.set_ylabel("Adjusted Close")
ax.set_xlabel("Date")
ax.get_yaxis().set_major_formatter(tick_dollar)
for tick in ax.get_xticklabels():
  tick.set_rotation(45)
  tick.set_horizontalalignment('right')

sma = get_rolling_mean(
      all_df, rolling_window)[rolling_window-1:]
ax1.plot(sma.index, sma)
ax1.set_title("SMA 20-day rolling window (TSLA)")
ax1.set_ylabel("SMA")
ax1.set_xlabel("Date")
ax1.get_yaxis().set_major_formatter(tick_dollar)
for tick in ax1.get_xticklabels():
  tick.set_rotation(45)
  tick.set_horizontalalignment('right')

ratio = all_df[rolling_window-1:]/sma
ax2.plot(ratio.index, ratio)
ax2.set_title("Close SMA Ratio (TSLA)")
ax2.set_ylabel("Close SMA Ratio")
ax2.set_xlabel("Date")
for tick in ax2.get_xticklabels():
  tick.set_rotation(45)
  tick.set_horizontalalignment('right')

rdr = quandl.get('WIKI/TSLA',
                start_date=min(ratio.index).strftime('%Y-%m-%d'),
                end_date=max(ratio.index).strftime('%Y-%m-%d'),
                column_index=11)
nratio = ratio / ratio.ix[0,:]
nrdr = rdr / rdr.ix[0,:]
adj = all_df / all_df.ix[0,:]
ax3.plot(nratio.index, nratio, label='Ratio')
ax3.plot(nrdr.index, nrdr['Adj. Close'], label='Daily Return')
ax3.plot(adj.index, adj, label='Close')
ax3.set_xlabel("Date")
for tick in ax3.get_xticklabels():
  tick.set_rotation(45)
  tick.set_horizontalalignment('right')
ax3.legend(loc='upper center', bbox_to_anchor=(0.5, 1.14),
              ncol=3, fontsize=10)
ax3.text(0.5, 1.16, "Normalized Ratio vs Daily Return vs Close",
            horizontalalignment='center',
            transform=ax3.transAxes,
            fontsize=14)
  
plt.tight_layout()

display(plt.show())

In [28]:
adj

In [29]:
rdr = quandl.get('WIKI/TSLA',
                start_date=min(ratio.index).strftime('%Y-%m-%d'),
                end_date=max(ratio.index).strftime('%Y-%m-%d'),
                column_index=11)
rdr / rdr.ix[0, :]

In [30]:
rstd = get_rolling_std(
  all_df, rolling_window)[rolling_window-1:]
upper_band, lower_band = get_bollinger_bands(sma, rstd)

fig = plt.figure(figsize=(8, 5))
gs_main = plt.GridSpec(1,1)
ax = fig.add_subplot(gs_main[0,0])
ax.plot(upper_band.index, upper_band, label='Bol. upper')
ax.plot(lower_band.index, lower_band, label='Bol. lower')
ax.plot(lower_band.index, sma, label='SMA')
ax.plot(lower_band.index, all_df[rolling_window-1:], color='#B8B3B3', label='Adj. Close')
ax.set_title("Bollinger Bands (TSLA)")
ax.set_ylabel("Adjusted Close")
ax.set_xlabel("Date")
ax.get_yaxis().set_major_formatter(tick_dollar)
for tick in ax.get_xticklabels():
  tick.set_rotation(45)
  tick.set_horizontalalignment('right')
ax.legend()

plt.tight_layout()

display(plt.show())

# Archives

The blocks below are for references and testing only and do not need to be run.

In [32]:
# Training
equity = 'WIKI/TSLA'
Q_file = os.path.join(PROJECT_DIR, Q_FILE)
logfile = os.path.join(PROJECT_DIR, LOGFILE)
# trial_logfile = os.path.join(PROJECT_DIR, TRIAL_LOGFILE)
trial_logfile = os.path.join(
  PROJECT_DIR, OPTIMIZED_TRIAL_LOGFILE)
config_file = os.path.join(PROJECT_DIR, CONFIG_FILE)

# Use logger = None to print out the log instead.
logger = Logger(logfile, trial_logfile, erase=True)
commission_mdl = Commission()
env = Environment(capital=100000,
                  logger=logger,
                  commission=commission_mdl,
                  discretization_steps=10)

start_date = '2016-6-01'
end_date = '2016-7-20'
window = 20

adj = quandl.get(equity, start_date=start_date,
                 end_date=end_date, column_index=11)
sma = get_rolling_mean(adj['Adj. Close'], window)[window-1:]
adj_sma_ratio = adj[window-1:]/sma
rstd = get_rolling_std(adj['Adj. Close'], window)[window-1:]
upper_band, lower_band = get_bollinger_bands(sma, rstd)

env.add_feature(adj, equity=equity, name='adj_close')
env.add_feature(adj_sma_ratio, equity=equity, name='adj_sma_ratio')
env.add_feature(upper_band, equity=equity, name='bollinger_upper')
env.add_feature(lower_band, equity=equity, name='bollinger_lower')
# env.add_feature(pe_ratio, equity=equity, name='pe_ratio')

daily_returns = quandl.get(equity,
                           start_date=start_date,
                           end_date=end_date,
                           column_index=11,
                           transform='diff')[window-1:]

env.add_daily_returns(daily_returns, equity=equity)

env.create_trader(RLTrader, equity=equity,
                  learning=False, learning_rate=0.2, allow_short=True,
                  earning_pct_bins=[-0.03, -0.01, 0, 0.005, 0.01, 0.02])
sim = Simulator(env,
                save_config_to=config_file,
                hard_limit=1)
sim.run(n_test=1, tolerance=0.005)
logger.save()
env.save_Q(Q_file)

In [33]:
# Use this block to find the right test_start_date
test_start_date = '2016-11-03'
test_end_date = '2016-12-31'
window = 20

adj = quandl.get('WIKI/GOOG', start_date=test_start_date,
                 end_date=test_end_date, column_index=11)
sma = get_rolling_mean(adj['Adj. Close'], window)[window-1:]
adj_sma_ratio = adj[window-1:]/sma
adj_sma_ratio

In [34]:
# Testing - Keep this as a reference for real trading.

equity = 'WIKI/TSLA'
Q_file = os.path.join(PROJECT_DIR, Q_FILE)
logfile = os.path.join(PROJECT_DIR, LOGFILE)
# trial_logfile = os.path.join(PROJECT_DIR, TRIAL_LOGFILE)
test_logfile = os.path.join(
  PROJECT_DIR, TEST_LOGFILE)
config_file = os.path.join(PROJECT_DIR, CONFIG_FILE)

# Use logger = None to print out the log instead.
logger = Logger(logfile, test_logfile, erase=True)
config = json.loads(dbutils.fs.head(config_file))
Q = create_Q_from_file(Q_file)
commission_mdl = Commission(
  per_share=config['commission']['per_share'],
  min_cost=config['commission']['min_cost']
)
env = Environment(capital=100000,
                  logger=logger,
                  commission=commission_mdl,
                  config=config['env'])
env.create_trader(RLTrader, config=config['trader'], Q=Q)

# Find a start date so that sma.indexes starts one day after
# the training's last date
test_start_date = '2016-11-03'
test_end_date = '2016-12-31'

window = 20

adj = quandl.get(equity, start_date=test_start_date,
                 end_date=test_end_date, column_index=11)
sma = get_rolling_mean(adj['Adj. Close'], window)[window-1:]
adj_sma_ratio = adj[window-1:]/sma
rstd = get_rolling_std(adj['Adj. Close'], window)[window-1:]
upper_band, lower_band = get_bollinger_bands(sma, rstd)

daily_returns = quandl.get(equity,
                           start_date=test_start_date,
                           end_date=test_end_date,
                           column_index=11,
                           transform='diff')[window-2:]

# Begin testing
env.reset(testing=True)
test_logfile = os.path.join(PROJECT_DIR, TEST_LOGFILE)
schema = StructType([
    StructField('time', TimestampType(), True),
    StructField('total_portfolio', FloatType(), True),
    StructField('return_pct', FloatType(), True),
    StructField('action', StringType(), True),
    StructField('action_size', IntegerType(), True),
    StructField('close', FloatType(), True),
    StructField('action_total', FloatType(), True),
    StructField('cash', FloatType(), True),
    StructField('commission', FloatType(), True)
  ])
test_df = None
for idx in adj_sma_ratio.index:
  event = {
           'time': to_epoch(idx),
           'adj_close': adj.ix[idx, 0],
           'adj_sma_ratio': adj_sma_ratio.ix[idx, 0],
           'bollinger_upper': upper_band.ix[idx, 0],
           'bollinger_lower': lower_band.ix[idx, 0],
           'daily_return': daily_returns.ix[idx, 0]
          }
  (q_cell, varlog) = env.step(event)
  if varlog is not None and 'total_portfolio' in varlog:
    row = {
      'time': idx,
      'total_portfolio': float(varlog['total_portfolio']),
      'return_pct': float(varlog['return_pct']),
      'action': varlog['action'],
      'action_size': varlog['action_size'],
      'close': float(varlog['close']),
      'action_total': float(varlog['action_size'] * varlog['close']),
      'cash': float(varlog['cash']),
      'commission': float(varlog['commission'])
    }
    df = sqlContext.createDataFrame(
      sc.parallelize([row]), schema=schema)
    df.set_index('time')
    if test_df is None:
      test_df = df
    else:
      test_df = test_df.unionAll(df)

test_df.write.format('parquet') \
  .save(test_logfile, mode='overwrite')

In [35]:
trial_logfile = os.path.join(PROJECT_DIR, OPTIMIZED_TRIAL_LOGFILE)
trial_df = sqlContext.read.format('parquet') \
  .load(trial_logfile).toPandas()

test_logfile = os.path.join(PROJECT_DIR, TEST_LOGFILE)
test_df = sqlContext.read.format('parquet') \
  .load(test_logfile).toPandas()

In [36]:
# Example on how to use logfile with regex selection

logfile = os.path.join(PROJECT_DIR, 'e5_log')
logger = Logger(logfile, erase=False)
logger.p(begin_with='({})'.format(
  'evaluate state |  [0-9]* hold|  [0-9]* daily|  [0-9]* previous close|  [0-9]* current close|  [0-9]* action|  [0-9]* new return|  [0-9]* reward|  [0-9]* total portfolio|  [0-9]* cash'
#   'evaluate state 90109080201|  90109080201 hold|  90109080201 daily|  90109080201 action|  90109080201 new return|  90109080201 reward|Chosen[ a-z]*action'
  ))