# Libraries & Sample Data
The first step is to load our Python Libraries and download the sample data. The dataset represents Apple stock price (1d bars) for the year 2010

In [None]:
# Load Python Libraries
import math
import keras
import random
import datetime
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from collections import deque
from tqdm.notebook import tqdm
from IPython.display import display, HTML

# for dataframe display
pd.set_option("display.max_rows", None)
def display_df(df):
    # Puts the scrollbar next to the DataFrame
    display(HTML("<div style='height: 200px; overflow: auto; width: fit-content'>" + df.to_html() + "</div>"))

# for reproducability of answers
keras.utils.set_random_seed(42)


In [None]:
# Download Sample Data GOOG_2008-2009_6m_RAW_1d 
data = pd.read_csv('GOOG_2008-2009_6m_RAW_1d.csv')
# track index to remember which feature is which
idx_close = 0
idx_bb_upper = 1
idx_bb_lower = 2

# Clean Data

In [None]:
# Check for null values
print('Number of Null Values =', data.isnull().sum())

In [None]:
# forward fill missing values
data=data.ffill()
display_df(data)

In [None]:
print('Number of Null Values =', data.isnull().sum())

# Define Features (copy from training code)

In [None]:
data['MA20'] = data['Close'].rolling(window=20).mean()
data['STD20'] = data['Close'].rolling(window=20).std()
data['BB_upper'] = data['MA20'] + (data['STD20'] * 2)
data['BB_lower'] = data['MA20'] - (data['STD20'] * 2)
display_df(data)

In [None]:
# Remove rows with MA=NaN
data = data.dropna(axis=0)
display_df(data)

In [None]:
# Plot Features: Close, MA20, BB Upper, BB Lower
data['Close'].plot()
data['MA20'].plot()
data['BB_upper'].plot()
data['BB_lower'].plot(rot=45)

# State Space Matrix

In [None]:
# Construct the State Space Matrix
dataset = data.reset_index()[['Date', 'Close', 'MA20', 'BB_upper', 'BB_lower']]
display_df(dataset)

# Normalize (match normailizer from training)

In [None]:
# Normalize Dataset with StandardScaler
normlist = []
static_normed_dataset = pd.DataFrame(index=dataset.index)
for col in dataset.columns:
    if col == 'Date':
        static_normed_dataset[col] = dataset[col]
        continue
    normalizer = StandardScaler()
    column_data = pd.DataFrame(dataset[col])
    normalizer.fit(column_data)
    static_normed_dataset[col] = normalizer.transform(column_data).flatten()
    normlist.append(normalizer)

In [None]:
# Plot Normalized Features: Close, MA20, BB Upper, BB Lower
static_normed_dataset['Close'].plot()
static_normed_dataset['MA20'].plot()
static_normed_dataset['BB_upper'].plot()
static_normed_dataset['BB_lower'].plot()

# Type Conversion 

In [None]:
# convert train and test dfs to np arrays with dtype=float
X_backtest = static_normed_dataset.values.astype(float)
# print the shape of X_backtest to remind yourself how many examples and features are in the dataset
X_backtest.shape

# Agent Class & Helper Functions (copy from training code)


In [None]:
class Agent:
    def __init__(self, window_size, num_features, test_mode=False, model_name=''):
        self.window_size = window_size # How many days of historical data do we want to include in our state representation?
        self.num_features = num_features # How many training features do we have?
        self.state_size = window_size*num_features # State size includes number of training features per day, and number of lookback days 
        self.action_size = 3 # 0=hold, 1=buy, 2=sell
        self.memory = deque(maxlen=1000) # Bound memory size: once the memory reaches 1000 units, the lefthand values are discarded as righthand values are added
        self.inventory = [] # Inventory to hold trades
        self.model_name = model_name # filename for saved model checkpoint loading
        self.test_mode = test_mode # flag for testing (allows model load from checkpoint model_name)

        self.gamma = 0.95
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        
        self.model = keras.models.load_model(model_name) if test_mode else self._model()


    # DQL Predict (with input reshaping)
    #   Input = State
    #   Output = Q-Table of action Q-Values
    def get_q_values_for_state(self, state):
        return self.model.predict(state.flatten().reshape(1, self.state_size))

    # Agent Action Selector
    #   Input = State
    #   Policy = epsilon-greedy (to minimize possibility of overfitting)
    #   Intitially high epsilon = more random, epsilon decay = less random later
    #   Output = Action (0, 1, or 2)
    def act(self, state): 
        # Choose any action at random (Probablility = epsilon for training mode, 0% for testing mode)
        if not self.test_mode and random.random() <= self.epsilon:
            return random.randrange(self.action_size)   
        # Choose the action which has the highest Q-value (Probablitly = 1-epsilon for training mode, 100% for testing mode)
        options = self.get_q_values_for_state(state)
        return np.argmax(options[0]) 

In [None]:
# Format price string
def format_price(n):
    return ('-$' if n < 0 else '$') + '{0:.2f}'.format(abs(n))

def sigmoid(x):
    return 1 / (1 + math.exp(-x))

# Plot behavior of trade output
def plot_behavior(data_input, bb_upper_data, bb_lower_data, states_buy, states_sell, profit, train=True):
    fig = plt.figure(figsize = (15,5))
    plt.plot(data_input, color='k', lw=2., label= 'Close Price')
    plt.plot(bb_upper_data, color='b', lw=2., label = 'Bollinger Bands')
    plt.plot(bb_lower_data, color='b', lw=2.)
    plt.plot(data_input, '^', markersize=10, color='r', label = 'Buying signal', markevery = states_buy)
    plt.plot(data_input, 'v', markersize=10, color='g', label = 'Selling signal', markevery = states_sell)
    plt.title('Total gains: %f'%(profit))
    plt.legend()
    plt.xticks(range(0, len(static_normed_dataset.index.values), int(len(static_normed_dataset.index.values)/2)), static_normed_dataset.index.values[0::int(len(static_normed_dataset.index.values)/2)], rotation=45, fontsize='small')
    plt.show()

# returns an an n-day state representation ending at time t
def get_state(data, t, n):    
    d = t - n
    if d >= 0:
        block = data[d:t] 
    else:
        block =  np.array([data[0]]*n) 
    res = []
    for i in range(n - 1):
        feature_res = []
        for feature in range(data.shape[1]):
            feature_res.append(sigmoid(block[i + 1, feature] - block[i, feature]))
        res.append(feature_res)
    # display(res)
    return np.array([res])

# Define Parameters

In [None]:
window_size = 1

episode_to_load = 5

l_test = len(X_backtest) - 1
state = get_state(X_backtest, 0, window_size + 1)
total_profit = 0
done = False
states_sell_test = []
states_buy_test = []

#Get the trained model
agent = Agent(window_size, num_features=X_backtest.shape[1], test_mode=True, model_name=f'model_ep{episode_to_load}.keras')
agent.inventory = []

# Define normalizers for inverse transform back to true price

In [None]:
normalizer_close = normlist[idx_close]
normalizer_bb_upper = normlist[idx_bb_upper]
normalizer_bb_lower = normlist[idx_bb_lower]

X_backtest_true_price = normalizer_close.inverse_transform(X_backtest[:, idx_close].reshape(-1, 1))
X_backtest_true_bb_upper = normalizer_bb_upper.inverse_transform(X_backtest[:, idx_bb_upper].reshape(-1, 1))
X_backtest_true_bb_lower = normalizer_bb_lower.inverse_transform(X_backtest[:, idx_bb_lower].reshape(-1, 1))

In [None]:
for t in range(l_test):
    action = agent.act(state)
    next_state = get_state(X_backtest, t + 1, window_size + 1)
    reward = 0

    if action == 1: # buy
        # inverse transform to get true buy price in dollars
        buy_price = X_backtest[t, idx_close]
        agent.inventory.append(buy_price)
        states_buy_test.append(t)
        print(f'Buy: {format_price(buy_price)}')

    elif action == 2 and len(agent.inventory) > 0: # sell
        bought_price = agent.inventory.pop(0)  
        # inverse transform to get true sell price in dollars
        sell_price = X_backtest[t, idx_close]

        # reward is max of profit (close price at time of sell - close price at time of buy)
        reward = max(sell_price - bought_price, 0)
        total_profit += sell_price - bought_price
        states_sell_test.append(t)
        print(f'Sell: {format_price(sell_price)} | Profit: {format_price(sell_price - bought_price)}')


    if t == l_test - 1:
        done = True
    
    # append to memory so we can re-train on 'live' (test) data later    
    agent.memory.append((state, action, reward, next_state, done))
    state = next_state

    if done:
        print('------------------------------------------')
        print(f'Total Profit: {format_price(total_profit)}')
        print('------------------------------------------')
        
plot_behavior(X_backtest[:, idx_close].flatten(),X_backtest[:, idx_bb_upper].flatten(), X_backtest[:, idx_bb_lower].flatten(), states_buy_test, states_sell_test, total_profit, train=False)