In [1]:
import tensorflow as tf
import numpy as np
import pandas as pd
import math

tf.enable_eager_execution()

In [2]:
# update batch size
batch_size = 100

# number of batch in training
num_of_batch = 1000

# fixed number of time steps in one episode (not used)
trading_period = 500

# 1 is zscore
num_features = 1

# 0 is no position. 1 is long the spread. 2 is short the spread.
a_num = position_num = 3

# RNN hidden state dimension
h_dim = 100

# number of RNN layer
num_layers = 1

# number of layer1 output
layer1_out_num = 50

# learning rate
lr = 1e-3

# discount factor in reinforcement learning
gamma = 1

# random action probability
rand_action_prob = 0.15

batch_per_print = 50

In [3]:
# functions
def get_random_history(batch_size):
    """Sample some pairs and get the history of those pairs. The history should have
    three dimension. The first dimension is for time. The second dimension is indexed
    by features name. The third dimension is the index of training instance.
    """
    df = pd.read_csv("/home/u21376/fyp/dataset/GOOGL-GOOG-1.csv")
    history = df[['close1', 'close2', 'spread']].values
    arr = []
    for i in range(batch_size):
        temp = history[i*trading_period:(i+1)*trading_period]
        arr.append(history[i*trading_period:(i+1)*trading_period])
    hist = np.array(arr)
    return np.transpose(hist, (1, 2, 0))
    

def compute_input_history(history):
    """Slicing history in its second dimension."""
    return history[:,2:,:]

def sample_action(logits, random=False):
    if random:
        dist = tf.distributions.Categorical(logits=tf.zeros([batch_size, a_num]))
    else:
        dist = tf.distributions.Categorical(logits=logits)
    
    # 1-D Tensor where the i-th element correspond to a sample from
    # the i-th categorical distribution
    return dist.sample()

def discount_rewards(r, all_actions):
    """
    r is a numpy array in the shape of (n, batch_size).
    all_actions is a numpy array in the same shape as r.
    
    return the discounted and cumulative rewards"""
    
    result = np.zeros_like(r, dtype=float)
    n = r.shape[0]
    sum_ = np.zeros_like(r[0], dtype=float)
    pre_action = all_actions[n-1]
    for i in range(n-1,-1,-1):
        sum_ *= gamma
        
        # when the previous action(position) not equal to the current one,
        # set the previous sum of reward to be zero.
        sum_ = sum_*(all_actions[i]==pre_action) + r[i]
        result[i] = sum_
        
        # update pre_action
        pre_action = all_actions[i]
    
    return result

def loss(all_logits, all_actions, all_advantages):
    neg_log_select_prob = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=all_logits, labels=all_actions)
    
    # 0 axis is the time axis. 1 axis is the batch axis
    return tf.reduce_mean(neg_log_select_prob * all_advantages, 0)

# classes
class TradingPolicyModel(tf.keras.Model):
    def __init__(self):
        super(TradingPolicyModel, self).__init__()
        self.dense1 = tf.layers.Dense(units=layer1_out_num,
                                      activation=tf.keras.layers.LeakyReLU(),
                                      kernel_initializer=tf.contrib.layers.xavier_initializer()
                                     )
        self.dense2 = tf.layers.Dense(units=layer1_out_num,
                                      activation=tf.keras.layers.LeakyReLU(),
                                      kernel_initializer=tf.contrib.layers.xavier_initializer()
                                     )
        self.dense3 = tf.layers.Dense(units=layer1_out_num,
                                      activation=tf.keras.layers.LeakyReLU(),
                                      kernel_initializer=tf.contrib.layers.xavier_initializer()
                                     )
        self.dense4 = tf.layers.Dense(units=layer1_out_num,
                                      activation=tf.keras.layers.LeakyReLU(),
                                      kernel_initializer=tf.contrib.layers.xavier_initializer()
                                     )
        self.logits = tf.layers.Dense(units=a_num,
                                      activation=tf.keras.layers.LeakyReLU(),
                                      kernel_initializer=tf.contrib.layers.xavier_initializer()
                                     )

    def call(self, inputs):
        # Forward pass
        inputs = self.dense1(inputs)
        inputs = self.dense2(inputs)
        inputs = self.dense3(inputs)
        inputs = self.dense4(inputs)
        logits = self.logits(inputs)
        return logits


class StateEncodingModel(tf.keras.Model):
    def __init__(self):
        super(StateEncodingModel, self).__init__()
        self.cell_layer = tf.contrib.rnn.LSTMCell(h_dim)
        self.cell = tf.contrib.rnn.MultiRNNCell([self.cell_layer] * num_layers)
        self.state = self.cell.zero_state(batch_size, tf.float32)
    
    def call(self, inputs):
        oberservation, self.state = self.cell(inputs, self.state)
        return oberservation
        
    def reset_state(self):
        self.state = self.cell.zero_state(batch_size, tf.float32)


class TradingEnvironment():
    """Trading environment for reinforcement learning training.
    
    Arguments:
        state_encoding_model: the model that encode past input_history data into a state
        vector which will be fed as input to the policy network.
    """
    def __init__(self, state_encoding_model):
        # do some initialization
        self.state_encoding_model = state_encoding_model
        self._reset_env()
        
    def _reset_env(self):
        self.t = 0
        self.state_encoding_model.reset_state()

        # 0 is no position. 1 is long the spread. 2 is short the spread
        self.position = np.zeros(batch_size, dtype=int)

        # prepare a batch of history and input_history
        self.history = get_random_history(batch_size)
        self.input_history = compute_input_history(self.history)
        
        # create or update self.state variable
        self.update_state()
    
    def reset(self):
        """Return an initial state for the trading environment"""
        if self.t == 0:
            return self.state
        else:
            self._reset_env()
            return self.state
    
    def compute_reward(self, action):
        # if action is 0, no reward.
        # if action is 1 or 2, can compute immediate return as immediate reward
        r = np.zeros_like(action, dtype=float)        
        cur_his = self.history[self.t]
        nex_his = self.history[self.t+1]
        
        # compute for each training instance in a batch
        for i, a in enumerate(action):
#             y_p = cur_his["y_close"][i]
#             x_p = cur_his["x_close"][i]
#             nex_y_p = nex_his["y_close"][i]
#             nex_x_p = nex_his["x_close"][i]
            
            y_p = cur_his[0][i]
            x_p = cur_his[1][i]
            nex_y_p = nex_his[0][i]
            nex_x_p = nex_his[1][i]
            if a == 1:
                r[i] = math.log(nex_y_p/y_p) + math.log(x_p/nex_x_p)
            elif a == 2:
                r[i] = math.log(nex_x_p/x_p) + math.log(y_p/nex_y_p)
        return r
    
    def update_state(self):
        # concate next_input_history and next position to form next partial state
        partial_state = tf.concat([self.input_history[self.t].T, tf.one_hot(self.position, position_num)], 1)
        
        # update state
        self.state = self.state_encoding_model(partial_state)        
    
    def step(self, action):
        """Given the current state and action, return the reward, next state and done.
        This function should be called after reset.
        
        reward is of type numpy array. state is of type tensor. done is of type boolean.
        
        
        Arguments:
            action: a numpy array containing the current action for each training pair.

        Note that we follow the convention where the trajectory is indexed as s_0, a_0, r_0,
        s_1, ... . Therefore t is updated just after computing the reward is computed and
        before computing next state.
        """
        # r_t
        r = self.compute_reward(action)

        # t = t+1
        self.t += 1
        
        # compute s_(t+1)
        self.position = action
        self.update_state()

        return r, self.state, self.t == batch_size

In [4]:
pi = TradingPolicyModel()
state_encoding_model = StateEncodingModel()
env = TradingEnvironment(state_encoding_model)
optimizer = tf.train.AdamOptimizer(learning_rate=lr)

# for training reference only
average_total_r = np.zeros(batch_size)

for batch in range(num_of_batch):
    
    # saving for update
    all_logits = []
    all_actions = []
    all_rewards = []
    with tf.GradientTape() as gt:
        done = False
        s = env.reset()

        # internally the episode length is fixed by trading_period
        while not done:
            logits = pi(s)
            a = sample_action(logits, random=np.random.rand() <= rand_action_prob)
            r, next_s, done = env.step(a.numpy())

            # save the episode
            all_logits.append(logits)
            all_actions.append(a)
            all_rewards.append(r)
            
            average_total_r += r
        
        all_logits_stack = tf.stack(all_logits)
        all_actions_stack = tf.stack(all_actions)
        all_rewards_stack = np.array(all_rewards)
        
        # compute cummulative rewards for each action
        all_cum_rewards = discount_rewards(all_rewards_stack, all_actions_stack.numpy())
        all_cum_rewards -= np.mean(all_cum_rewards)
        all_cum_rewards /= np.std(all_cum_rewards)
        all_cum_rewards = tf.convert_to_tensor(all_cum_rewards, dtype=tf.float32)

        loss_value = loss(all_logits_stack, all_actions_stack, all_cum_rewards)
    
    if (batch+1) % batch_per_print == 0:
        print("batch: {}, average_total_r_per_ep: {}".format(batch, np.mean(average_total_r)/batch_per_print))
        average_total_r = np.zeros(batch_size)
    
    grads = gt.gradient(loss_value, state_encoding_model.variables + pi.variables)
    optimizer.apply_gradients(zip(grads, state_encoding_model.variables + pi.variables))

batch: 49, average_total_r_per_ep: 1.6464054135444144e-05
batch: 99, average_total_r_per_ep: -2.2561814368062148e-05
batch: 149, average_total_r_per_ep: -1.3817527085588554e-05
batch: 199, average_total_r_per_ep: 5.936656222624392e-05
batch: 249, average_total_r_per_ep: 2.0207143590096053e-05
batch: 299, average_total_r_per_ep: 2.5382190553506685e-05
batch: 349, average_total_r_per_ep: 5.874221155951589e-05
batch: 399, average_total_r_per_ep: 3.5309044906404675e-05
batch: 449, average_total_r_per_ep: 7.370314040504169e-05
batch: 499, average_total_r_per_ep: 7.194255206300513e-05
batch: 549, average_total_r_per_ep: 3.744918510306042e-05
batch: 599, average_total_r_per_ep: 2.2135442501452317e-05
batch: 649, average_total_r_per_ep: 4.44327209753318e-05
batch: 699, average_total_r_per_ep: 5.214230257825284e-05
batch: 749, average_total_r_per_ep: 1.8597701248665535e-05
batch: 799, average_total_r_per_ep: 4.5274625407673044e-05
batch: 849, average_total_r_per_ep: 3.1649892616269556e-05
batch

In [5]:
# test time
average_total_r = np.zeros(batch_size)
done = False
s = env.reset()

# internally the episode length is fixed by trading_period
while not done:
    logits = pi(s)
    a = sample_action(logits)
    r, next_s, done = env.step(a.numpy())

    # save the episode
    all_logits.append(logits)
    all_actions.append(a)
    all_rewards.append(r)

    average_total_r += r

print("At test time, average_total_r_per_ep: {}".format(np.mean(average_total_r)))

At test time, average_total_r_per_ep: 4.2436951307533045e-05
