<a href="https://colab.research.google.com/github/sugiyama404/ReinfoceLearningForTrading/blob/main/impala_train.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
import pandas as pd
import time
from datetime import datetime
import random
import copy
import pickle

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import Dense, LSTM, Conv1D, MaxPool1D, Activation, concatenate
from tensorflow.keras import Input
from tensorflow.keras.models import clone_model

from sklearn.preprocessing import StandardScaler
import math

from google.colab import drive

from concurrent.futures import ThreadPoolExecutor

from dataclasses import dataclass, field
from typing import List

mode = 'train'
name = 'impala'

drive.mount('/content/drive/')
nov_dir = 'Colab Notebooks/dataset/reinforcement_learning/'
nov_path = '/content/drive/My Drive/' + nov_dir + f'sp500_{mode}.csv'

exp_dir = 'Colab Notebooks/workspace/export/'
mdl_dir = '/content/drive/My Drive/' + exp_dir + 'models'
csv_path = '/content/drive/My Drive/' + exp_dir + f'csv_data/{name}_{mode}.csv'

df = pd.read_csv(nov_path)
df['Date'] = pd.to_datetime(df['Date'], format = '%Y-%m-%d')

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


In [2]:
class Environment:
    def __init__(self, df, initial_money=100000, mode = 'test'):

        self.df = df.dropna().reset_index()

        self.df_total_steps  = len(self.df)-1
        self.initial_money   = initial_money
        self.mode            = mode
        self.trade_time      = None
        self.trade_win       = None
        self.brfore_buy_cash = None
        self.action_space    = np.array([0, 1, 2]) # buy,hold,sell
        self.hold_a_position = None
        self.now_price       = None
        self.cash_in_hand    = None

        self.reset()
        
    def reset(self):

        self.trade_time      = 0
        self.trade_win       = 0
        self.brfore_buy_cash = 0
        self.end_step        = self.df_total_steps
        self.now_step        = 0
        self.hold_a_position = 0.0
        self.now_price       = self.df.loc[self.now_step, 'SP500']
        self.cash_in_hand    = self.initial_money

        return self._get_now_state()

    def step(self, action):

        prev_revenue = self._get_revenue()
        self.now_step += 1
        self.now_price = self.df.loc[self.now_step, 'SP500']
 
        done = (self.end_step == self.now_step)

        self._trade(action,done)
        cur_revenue = self._get_revenue()
 
        reward = cur_revenue - prev_revenue

        if self.mode == 'test':
            info = { 'cur_revenue' : cur_revenue , 'trade_time' : self.trade_time, 'trade_win' : self.trade_win }
        else:
            info = { 'cur_revenue' : cur_revenue }

        return self._get_now_state(), reward, done, info

    def _get_now_state(self):
        state = np.empty(3)
        state[0] = self.hold_a_position
        state[1] = self.now_price
        state[2] = self.cash_in_hand
        return state

    def _get_revenue(self): 
        return self.hold_a_position * self.now_price + self.cash_in_hand

    def _trade(self, action,lastorder = False):
        if lastorder:
            self.cash_in_hand += self.now_price * self.hold_a_position
            self.hold_a_position = 0
            if self.mode == 'test':
                self.trade_time += 1
                if self.cash_in_hand > self.brfore_buy_cash:
                    self.trade_win += 1
        else:
            if self.action_space[0] == action: # buy
                if self.hold_a_position == 0:
                    buy_flag = True
                    if self.mode == 'test':
                        self.brfore_buy_cash = copy.copy(self.cash_in_hand)
                    while buy_flag:
                        if self.cash_in_hand > self.now_price:
                            self.hold_a_position += 1
                            self.cash_in_hand -= self.now_price
                        else:
                            buy_flag = False
            if self.action_space[2] == action: # sell
                if self.hold_a_position != 0:
                    self.cash_in_hand += self.now_price * self.hold_a_position
                    self.hold_a_position = 0
                    if self.mode == 'test':
                        self.trade_time += 1
                        if self.cash_in_hand > self.brfore_buy_cash:
                            self.trade_win += 1

In [3]:
class Critic:
    def __init__(self, model):
        self.model = model
        self.n_action = 3
        self.gamma = 0.9
        self.alfa = 0.5
        self.beta = 0.00025

    def valuenetwork(self, state, next_state, prev_action, action, prev_reward, reward, done, v, v_next, mu):

        with tf.GradientTape() as tape:

            actions = tf.one_hot(action, self.n_action) # (10, 3)

            state = state.reshape(1,10,3)
            prev_action = prev_action.reshape(1,10,1)
            prev_reward = prev_reward.reshape(1,10,1)

            pai, v_theta  = self.model([state, prev_action, prev_reward])

            pai = tf.reshape(pai, [10,3]) # (10, 3)

            actions = tf.cast(actions, tf.float32)

            pais = tf.reduce_sum(actions * pai, axis=1, keepdims=True)

            mu = self._reshape_and_cast(mu, 3)
            ratio = tf.math.divide_no_nan(pais, mu)
            rhoi = ci = tf.minimum(1.0, ratio)

            n_num, _ = ratio.shape

            rhoi = self._reshape_and_cast(rhoi,3)
            ci = self._reshape_and_cast(ci,3)
            reward = self._reshape_and_cast(reward,1)
            v_next = self._reshape_and_cast(v_next,1)
            v = self._reshape_and_cast(v,1)
            v_theta = self._reshape_and_cast(v_theta,1)

            b4_delta_v = (reward + self.gamma * v_next - v)
            b4_delta_v  =  tf.cast(b4_delta_v, tf.float32)
            delta_v = tf.multiply(rhoi, b4_delta_v)
            delta_v = self._reshape_and_cast(delta_v,3)
            v_trace =v + self._sigma(ci, delta_v, n_num)
            total_loss = self._compute_baseline_loss(v_trace - v_theta)
            total_loss += self._compute_policy_gradient_loss(pai, actions, delta_v)
            total_loss += self._compute_entropy_loss(pai)

        gradients = tape.gradient(total_loss, self.model.trainable_variables)
        self.model.optimizer.apply_gradients(zip(gradients, self.model.trainable_variables))

    def _reshape_and_cast(self, x, num):
        x = tf.reshape(x, [10,num])
        x  =  tf.cast(x, tf.float32)
        return x

    def _infinite_product(self, x, max_num):
        num = tf.ones([3, ], tf.float32)
        for i in range(max_num):
            num *= x[i]   
        return num

    def _sigma(self, x, delta_v, b):
        
        num = 0.0
        for i in range(b):
            num += pow(self.gamma, i) * self._infinite_product(x, i) * delta_v[i]
        return tf.cast(num, tf.float32)

    def _compute_baseline_loss(self, advantages):
        return .5 * tf.reduce_sum(tf.square(advantages))

    def _compute_policy_gradient_loss(self, logits, actions, advantages):
        cross_entropy = tf.losses.categorical_crossentropy(y_true=actions, y_pred=logits)
        cross_entropy = tf.reshape(cross_entropy, [10,1])
        advantages = tf.stop_gradient(advantages)
        policy_gradient_loss_per_timestep = cross_entropy * advantages
        return tf.reduce_sum(policy_gradient_loss_per_timestep)

    def _compute_entropy_loss(self, logits):
        log_policy = tf.math.log(logits)
        entropy_per_timestep = tf.reduce_sum(-logits * log_policy, axis=-1)
        return -tf.reduce_sum(entropy_per_timestep)

In [4]:
class Learner(Critic):
    def __init__(self):

        conv_filter = 12
        units = 28
        look_back = 10
        opt = Adam(learning_rate=0.001)

        input1_ = Input(shape=(look_back, 3))
        input2_ = Input(shape=(look_back, 1))
        input3_ = Input(shape=(look_back, 1))

        x = Conv1D(filters=conv_filter, kernel_size=1, padding="same", activation="tanh")(input1_)
        x = MaxPool1D(pool_size=1, padding='same')(x)
        x = Activation("relu")(x)
        combined = concatenate([x, input2_, input3_],axis=-1)
        common = LSTM(units, return_sequences=True)(combined)
        common = Dense(units, kernel_initializer='random_uniform')(common)
        common = Activation("relu")(common)

        actor  = Dense(3, activation="softmax")(common)
        critic = Dense(1, activation="linear")(common)

        model = keras.Model([input1_, input2_, input3_], [actor, critic])
        model.compile(loss = "mean_absolute_error", optimizer=opt)
        model.summary()
        #dot_img_file = './f"{name}_model.png"'
        #tf.keras.utils.plot_model(model, to_file=dot_img_file, show_shapes=True)
        self.model = model
        super().__init__(model)

    def load(self, name):
        self.model.load_weights(name)

    def save(self, name):
        self.model.save_weights(name)

    def placement(self, memory):
        length = memory.max_length_memory()
        for i in range(length):
            min = i
            max = (i + 10)
            state, next_state, prev_action, action, prev_reward, reward, done, v, v_next, mu = memory.get_experiences(min, max)
            self.valuenetwork(state, next_state, prev_action, action, prev_reward, reward, done, v, v_next, mu)

In [5]:
class Actor:
    def __init__(self, learner):

        self.learner = learner
        self.model = clone_model(learner.model)
        self.n_action = 3
        self.state_arr = np.array([])
        self.p_action_arr = np.array([])
        self.p_reward_arr = np.array([])

        self.next_state_arr = np.array([])
        self.action_arr = np.array([])
        self.reward_arr = np.array([])

    def reset(self):
        self.state_arr = np.empty((0,3), int)
        self.p_action_arr = np.array([])
        self.p_reward_arr = np.array([])

        self.next_state_arr = np.empty((0,3), int)
        self.action_arr = np.array([])
        self.reward_arr = np.array([])

    def policynetwork(self, state, prev_action, prev_reward):

        if len(self.state_arr) == 10:
            self.state_arr[0:-1] = self.state_arr[1:]
            self.p_action_arr[0:-1] = self.p_action_arr[1:]
            self.p_reward_arr[0:-1] = self.p_reward_arr[1:]
            self.state_arr[-1] = state
            self.p_action_arr[-1] = prev_action
            self.p_reward_arr[-1] = prev_reward
            tmp_state = copy.deepcopy(self.state_arr)
            tmp_action = copy.deepcopy(self.p_action_arr)
            tmp_reward = copy.deepcopy(self.p_reward_arr)
            tmp_state = tmp_state.reshape(1,10,3)
            tmp_action = tmp_action.reshape(1,10,1)
            tmp_reward = tmp_reward.reshape(1,10,1)
            act_p, v = self.model([tmp_state, tmp_action, tmp_reward]) # [-1.03245259 -0.55189404  0.87892511] 1 0
            v_np = v.numpy()
            p_np = act_p.numpy()
            one_hot_actions = tf.one_hot([0,1,2], 3)
            act_p = p_np[0][9]
            mu = tf.reduce_sum(one_hot_actions * act_p, axis=1)
            return np.random.choice(3, p=p_np[0][9]), v_np[0][9][0], mu.numpy()

        self.state_arr = np.append(self.state_arr, np.array([state]), axis=0)
        self.p_action_arr = np.append(self.p_action_arr, np.array([prev_action]))
        self.p_reward_arr = np.append(self.p_reward_arr, np.array([prev_reward]))
        return 1, 1, [0.0, 1.0, 0.0]

    def policynetwork_next(self, next_state, action, reward):

        if len(self.next_state_arr) == 10:
            self.next_state_arr[0:-1] = self.next_state_arr[1:]
            self.action_arr[0:-1] = self.action_arr[1:]
            self.reward_arr[0:-1] = self.reward_arr[1:]
            self.next_state_arr[-1] = next_state
            self.action_arr[-1] = action
            self.reward_arr[-1] = reward

            tmp_n_state = copy.deepcopy(self.next_state_arr)
            tmp_action = copy.deepcopy(self.action_arr)
            tmp_reward = copy.deepcopy(self.reward_arr)
            tmp_n_state = tmp_n_state.reshape(1,10,3)
            tmp_action = tmp_action.reshape(1,10,1)
            tmp_reward = tmp_reward.reshape(1,10,1)

            _, v_next = self.model([tmp_n_state, tmp_action, tmp_reward])
            v_next_np = v_next.numpy()
            return v_next_np[0][9][0]

        self.next_state_arr = np.append(self.next_state_arr, np.array([next_state]), axis=0)
        self.action_arr = np.append(self.action_arr, np.array([action]))
        self.reward_arr = np.append(self.reward_arr, np.array([reward]))
        return 1.0

    def load(self, name):
        self.learner.load(name)

    def save(self, name):
        self.learner.save(name)

    def integration(self):
        self.model = clone_model(self.learner.model)

    def placement(self, memory):
        self.learner.placement(memory)

In [6]:
@dataclass
class ExperiencesMemory:
    state : np.ndarray = np.empty((0,3), int)
    next_state : np.ndarray = np.empty((0,3), int)
    prev_action : np.ndarray = np.array([])
    action : np.ndarray = np.array([])
    prev_reward : np.ndarray = np.array([])
    reward : np.ndarray = np.array([])
    done : np.ndarray = np.array([])
    v : np.ndarray = np.array([])
    v_next : np.ndarray = np.array([])
    mu : np.ndarray = np.empty((0,3), int)
    minibatch_size : int = 64

    def append_experiences(self, state, next_state, prev_action, action, prev_reward, reward, done, v, v_next, mu):
        self.state = np.append(self.state, np.array([state]), axis=0)
        self.next_state = np.append(self.next_state, np.array([next_state]), axis=0)
        self.prev_action = np.append(self.prev_action, np.array(prev_action))
        self.action = np.append(self.action, np.array(action))
        self.prev_reward = np.append(self.prev_reward, np.array(prev_reward))
        self.reward = np.append(self.reward, np.array(reward))
        self.done = np.append(self.done, np.array(done))
        self.v = np.append(self.v, np.array(v))
        self.v_next = np.append(self.v_next, np.array(v_next))
        self.mu = np.append(self.mu, np.array([mu]), axis=0)

    def max_length_memory(self):
        max_len = len(self.state)
        max_len = int(max_len) - 11

        return max_len

    def get_experiences(self, min, max):
        state, next_state, mu = np.empty((0,3), int), np.empty((0,3), int), np.empty((0,3), int)
        prev_action, action, prev_reward, reward, done, v, v_next = np.array([]), np.array([]), np.array([]), np.array([]), np.array([]), np.array([]), np.array([])
        for i in range(min, max):
            state = np.append(state, np.array([self.state[i]]), axis=0)
            next_state = np.append(next_state, self.action[i])
            prev_action = np.append(prev_action, self.prev_action[i])
            action = np.append(action, self.action[i])
            prev_reward = np.append(prev_reward, self.prev_reward[i])
            reward = np.append(reward, self.reward[i])
            done = np.append(done, self.done[i])
            v = np.append(v, self.v[i])
            v_next = np.append(v_next, self.v_next[i])
            mu = np.append(mu, np.array([self.mu[i]]), axis=0)

        return state, next_state, prev_action, action, prev_reward, reward, done, v, v_next, mu

In [7]:
class Main:
    def __init__(self, env, actor, num, mdl_dir, name, batch_size = 10, episodes_times = 1000, mode = 'test'):
        self.env = env
        self.actor = actor
        self.num = str(num)
        self.mdl_dir = mdl_dir
        self.scaler = self._standard_scaler(self.env)
        self.episodes_times = episodes_times
        self.batch_size = batch_size
        self.mode = mode
        self.name = name

        if self.mode == 'test':
            self._load()
            with open(csv_path, 'w') as f:
                row = 'FixedProfit,TradeTimes,TradeWin'
                print(row, file=f)
        else:
            with open(csv_path, 'w') as f:
                row = 'FixedProfit'
                print(row, file=f)
        
        # self.actor.integration()

    def play_game(self):

        for episode in range(self.episodes_times):
            state = self.env.reset()
            state = self.scaler.transform([state])
            self.actor.reset()
            state = state.flatten()
            done = False
            start_time = datetime.now()
            memory = ExperiencesMemory()
            prev_action = 1
            prev_reward = 0
            i = 0
    
            while not done:
                action, v, mu = self.actor.policynetwork(state, prev_action, prev_reward)
                next_state, reward, done, info = self.env.step(action)
                next_state = self.scaler.transform([next_state])
                next_state = next_state.flatten()
                v_next = self.actor.policynetwork_next(next_state, action, reward)

                if (i > self.batch_size) and (self.mode == 'train'):
                    memory.append_experiences(state, next_state, prev_action, action, prev_reward, reward, done, v, v_next, mu)

                state = next_state
                prev_action = action
                prev_reward = reward
                i += 1
               
            play_time = datetime.now() - start_time
            if self.mode == 'test':
                print("Episode: {}/{} RapTime: {} FixedProfit: {:.0f} TradeTimes: {} TradeWin: {}".format(episode + 1, episodes_times, play_time, info['cur_revenue'], info['trade_time'], info['trade_win']))
                with open(csv_path, 'a') as f:
                    row = str(info['cur_revenue']) + ',' + str(info['trade_time']) + ',' + str(info['trade_win'])
                    print(row, file=f)
            else:
                self.actor.placement(memory)
                self.actor.integration()
                print("Episode: {}/{} RapTime: {} FixedProfit: {:.0f}".format(episode + 1, episodes_times, play_time, info['cur_revenue']))
                with open(csv_path, 'a') as f:
                    row = str(info['cur_revenue'])
                    print(row, file=f)

        if self.mode == 'train':
            self._save()

    def _standard_scaler(self, env):
        states = []
        for _ in range(env.df_total_steps):
            action = np.random.choice(env.action_space)
            state, reward, done, info = env.step(action)
            states.append(state)
            if done:
                break
        
        scaler = StandardScaler()
        scaler.fit(states)
        return scaler

    def _load(self):
        with open('{}/{}_{}.pkl'.format(self.mdl_dir, self.name, self.num), 'rb') as f:
            self.scaler = pickle.load(f)
        self.actor.load('{}/{}.h5'.format(self.mdl_dir, self.name))

    def _save(self):
        self.actor.save('{}/{}.h5'.format(self.mdl_dir, self.name))
        with open('{}/{}_{}.pkl'.format(self.mdl_dir, self.name, self.num), 'wb') as f:
            pickle.dump(self.scaler, f)

In [8]:
initial_money=1000000
episodes_times = 50
batch_size = 10
learner = Learner()

thread_num = 4
envs = []
for i in range(thread_num):
    env = Environment(df, initial_money=initial_money, mode = mode)
    actor = Actor(learner)
    main = Main(env, actor, i, mdl_dir, name, batch_size, episodes_times, mode)
    envs.append(main)

datas = []
with ThreadPoolExecutor(max_workers=thread_num) as executor:
    for env in envs:
        job = lambda: env.play_game()
        datas.append(executor.submit(job))

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 10, 3)]      0                                            
__________________________________________________________________________________________________
conv1d (Conv1D)                 (None, 10, 12)       48          input_1[0][0]                    
__________________________________________________________________________________________________
max_pooling1d (MaxPooling1D)    (None, 10, 12)       0           conv1d[0][0]                     
__________________________________________________________________________________________________
activation (Activation)         (None, 10, 12)       0           max_pooling1d[0][0]              
______________________________________________________________________________________________