<a href="https://colab.research.google.com/github/sugiyama404/LimitOrder/blob/main/LimitOrder_train.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import pandas as pd
import pickle
import numpy as np
import copy
import keras
import tensorflow as tf
import tensorflow.keras.layers as kl
from time import sleep
import os

from sklearn.metrics import classification_report, accuracy_score
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from multiprocessing import Manager

from google.colab import drive

name = 'limitorder'
mode = 'train'
long_or_short = ['long', 'short']
drive.mount('/content/drive/')

out_dir = 'Colab Notebooks/dataset/LimitOrder/'
csv_path = '/content/drive/My Drive/' + out_dir + f'predict_and_dataset_{mode}.csv'
mdl_long_path = '/content/drive/My Drive/' + out_dir + f'model_{name}_{long_or_short[0]}.h5'
mdl_short_path = '/content/drive/My Drive/' + out_dir + f'model_{name}_{long_or_short[1]}.h5'

df = pd.read_csv(csv_path, index_col=0)

Mounted at /content/drive/


In [2]:
class Environment:
    def __init__(self, df, initial_money=100000):

        self.df = df
        self.df_total_steps  = len(self.df)-1
        self.initial_money   = initial_money
        self.cash_in_hand    = None
        # hold, good_buy, good_sell, bad_buy, bad_sell, buy_cancel, sell_cancel
        self.action_space    = np.array([0, 1, 2, 3, 4, 5, 6])
        self.now_step  = None
        self.alpha = 0.001
        self.beta = 0.9
        self.minimum_number_of_shares = 10000 # 最低取得株数

        self.is_cancel = False
        self.profit_and_loss = None
        self.reserve_price = None
        self.reserve_type = None
        self.have_a_position = None
        self.have_a_position_type = None
        self.price_old = None
        self.Money_old = None

        self.reset()

    def reset(self):
        self.now_step  = 0
        self.now_price = (self.df['PRICE_ASK_0'][self.now_step] + self.df['PRICE_BID_0'][self.now_step]) / 2
        self.cash_in_hand = self.initial_money / self.minimum_number_of_shares
        self.hold_a_position = 0
        self.end_step        = self.df_total_steps

        self.is_cancel = False
        self.profit_and_loss = []
        self.reserve_price = 0
        self.reserve_type = 'none'
        self.have_a_position = 0
        self.have_a_position_type = None

        self.price_old = 0
        self.Money_old = 0

        return self._get_now_state()

    def step(self, action):
        '''
        状態の内4まで出力する。
        1. 現在ポジション株数/最大保有株数
        2. 既存買注文の反対気配との乖離率 (逆数)
        3. 既存売注文の反対気配との乖離率 (逆数)
        4. Bid-Ask スプレッド
        5. 短期(5 秒)リターン予測値
        6. 長期(60 秒)リターン予測値

        最大保有株数は 100 万円相当の株数(最小取引単位の
        100 株に満たない場合は 100 株)で, 
        ショートポジションも取り得るものとしている.

        取引エージェントの行動タイミングは各銘柄のティッ ク更新時に, 
        前回行動から 1 秒以上経過している場合に
        発注判断を行うものとする (各発注判断を 1 ステップと する).
        ある時点において市場に出すことのできる注 文は買い,
        売りのそれぞれに対して1注文のみとし,最大ポジションを超える新規注文は行わない.
        各時点で選ぶことのできない行動は選択肢から排除している.

        報酬
        r(t) = ∆PnL(t + 1) − α(∆Pos(t + 1))^2 − βFo(t)
        ∆PnL(t + 1) : (累積) 損益額の変化幅(円)
        ∆Pos(t + 1) : ポジション金額の変化幅 (円)
        Fo(t) : 1 (時刻 t でキャンセルが発生した場合に追加を行う。)
              : 0 (その他)
        損益額のみを報酬とする場合, ポジションが必要以上
        に増加する傾向にあり [3], また不必要な注文の送信, 
        取消を繰り返してしまう. そこで過度なポジショニングと
        注文取消を抑制するために調整項を加えている.
        '''
        before_profit_and_loss = copy.deepcopy(self._get_revenue())
        before_hold_a_position = copy.deepcopy(self.hold_a_position)
        before_now_price = copy.deepcopy(self._get_now_price())

        self.now_step += 1
        self.now_price = self._get_now_price()

        self._judge_the_execution() # 約定しているかを調べる。
        self._order(action) # 注文を出す。


        state = self._get_now_state()
        reward = self._get_reward(before_profit_and_loss, before_hold_a_position, before_now_price)
        done = (self.end_step == self.now_step)
        info = {'now_step': self.now_step}

        return state, reward, done, info

    def _get_now_state(self):
        state = np.empty(6)
        state[0] = self.hold_a_position
        state[1] = self.df['PRICE_ASK_0'][self.now_step] / self.df['PRICE_ASK_9'][self.now_step]
        state[2] = self.df['PRICE_BID_0'][self.now_step] / self.df['PRICE_BID_9'][self.now_step]
        state[3] = self.df['PRICE_ASK_0'][self.now_step] - self.df['PRICE_BID_0'][self.now_step]
        state[4] = self.df['SHORT_PREDICT'][self.now_step]
        state[5] = self.df['LONG_PREDICT'][self.now_step]
        return state

    def _get_reward(self, before_profit_and_loss, before_hold_a_position, before_now_price):
        pnl = self._get_revenue() - before_profit_and_loss
        # pnl = self._get_revenue()
        pos = self.hold_a_position * self.now_price - before_hold_a_position * before_now_price
        f0  = 1 if self.is_cancel else 0

        reward = pnl - self.alpha * (pos ** 2) - self.beta * f0
        return reward

    def _order(self, action):
        '''
        取引エージェントの行動タイミングは各銘柄のティック更新時に, 
        前回行動から 1 秒以上経過している場合に
        発注判断を行うものとする (各発注判断を 1 ステップとする).
        ある時点において市場に出すことのできる注文は買い,
        売りのそれぞれに対して1注文のみとし,最大ポジションを超える新規注文は行わない.
        各時点で選ぶことのできない行動は選択肢から排除している.
        '''
        self.is_cancel = False

        if self.action_space[0] == action: # 何もしない
            pass
        elif self.action_space[1] == action: # 最良気配への新規買注文送出/既存買注文訂正
            price = self.df['PRICE_ASK_0'][self.now_step]
            order_type = 'buy'
        elif self.action_space[2] == action: # 最良気配への新規売注文送出/既存売注文訂正
            price = self.df['PRICE_BID_0'][self.now_step]
            order_type = 'sell'
        elif self.action_space[3] == action: # 反対気配への新規買注文送出/既存買注文訂正
            price = self.df['PRICE_ASK_9'][self.now_step]
            order_type = 'buy'
        elif self.action_space[4] == action: # 反対気配への新規売注文送出/既存売注文訂正
            price = self.df['PRICE_BID_9'][self.now_step]
            order_type = 'sell'
        elif self.action_space[5] == action: # 既存買注文の取消
            price = 0
            order_type = 'buy'
            self.is_cancel = True
        else: # 既存売注文の取消
            price = 0
            order_type = 'sell'
            self.is_cancel = True

        if self.action_space[0] != action:
            self._reservation_change(price, order_type)

    def _reservation_change(self, price, order_type):
        if not self.is_cancel:
            self.reserve_price = price
            self.reserve_type = order_type    
        else:
            self.reserve_price = 0
            self.reserve_type = 'none'

    def _judge_the_execution(self):
        '''
        約定しているかを調べる。

        取引エージェントの行動タイミングは各銘柄のティック更新時に, 
        前回行動から 1 秒以上経過している場合に
        発注判断を行うものとする (各発注判断を 1 ステップとする).
        ある時点において市場に出すことのできる注文は買い,
        売りのそれぞれに対して1注文のみとし,最大ポジションを超える新規注文は行わない.
        各時点で選ぶことのできない行動は選択肢から排除している.

        self.minimum_number_of_shares = 10000 # 最低取得株数
        '''

        if self.have_a_position_type == 'buy':
            '''買いのポジションを持っている場合'''
            if self.reserve_price <= self.df['PRICE_ASK_0'][self.now_step] and self.reserve_type == 'sell':
                self.cash_in_hand += self._bear_position_calc()
                self._reserve_and_position_reset(True)
                self.price_old = 0
                self.Money_old = 0
        elif self.have_a_position_type == 'sell':
            '''売りのポジションを持っている場合'''
            if self.reserve_price >= self.df['PRICE_BID_0'][self.now_step] and self.reserve_type == 'buy':
                self.cash_in_hand += self.reserve_price * self.hold_a_position
                self._reserve_and_position_reset(True)
        else:
            '''ポジションを持っていない場合'''
            if self.reserve_type == 'buy':
                if self.reserve_price >= self.df['PRICE_BID_0'][self.now_step] and self.cash_in_hand >= 0:
                    buy_flag = True
                    while buy_flag:
                        if self.cash_in_hand > self.reserve_price:
                            self.hold_a_position += 1
                            self.cash_in_hand -= self.now_price
                        else:
                            buy_flag = False
                    self.have_a_position_type = 'buy'
                    self._reserve_and_position_reset()
            if self.reserve_type == 'sell':
                if self.reserve_price <= self.df['PRICE_ASK_0'][self.now_step] and self.cash_in_hand >= 0:
                    sell_flag = True
                    self.price_old = self.reserve_price
                    self.Money_old = self.cash_in_hand
                    while sell_flag:
                        if self.cash_in_hand > self.reserve_price:
                            self.hold_a_position += 1
                            self.cash_in_hand -= self.now_price
                        else:
                            sell_flag = False
                    self.have_a_position_type = 'sell'
                    self._reserve_and_position_reset()

    def _reserve_and_position_reset(self, position_reset = False):
            self.reserve_price = 0
            self.reserve_type == 'none'
            if position_reset:
                self.hold_a_position = 0
                self.have_a_position_type = 'none'

    def _get_revenue(self):
        if self.have_a_position_type == 'sell':
            return self._bear_position_calc() + self.cash_in_hand
        else:
            return self.hold_a_position * self.now_price + self.cash_in_hand

    def _bear_position_calc(self):
        return ((self.price_old - self.now_price) * self.hold_a_position
                + self.Money_old)
        
    def _get_now_price(self):
        return (self.df['PRICE_ASK_0'][self.now_step] + self.df['PRICE_BID_0'][self.now_step]) / 2

In [3]:
class NeuralNetwork(tf.keras.Model):
    def __init__(self, *args, **kwargs):
        super(NeuralNetwork, self).__init__(*args, **kwargs)

        self.action_space = 7

        self.input_layer = kl.Dense(20, activation="tanh", kernel_initializer="he_normal")
        self.dense1 = kl.Dense(10, activation="tanh", kernel_initializer="he_normal")
        self.value = kl.Dense(1, kernel_initializer="he_normal")
        self.dense2 = kl.Dense(10, activation="tanh", kernel_initializer="he_normal")
        self.advantages = kl.Dense(self.action_space, kernel_initializer="he_normal")

        self.optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

    @tf.function
    def call(self, x, training=None):
        x = self.input_layer(x)
        x1 = self.dense1(x)
        value = self.value(x1)
        x2 = self.dense2(x)
        advantages = self.advantages(x2)
        q = value + advantages - tf.reduce_mean(advantages, axis=1, keepdims=True)
        return q

    def save_model(self, name):
        self.save_weights(name)

    def load_model(self, name):
        self.load_weights(name)

    def update_model(self, value):
        self.set_weights(value)

    def get_model(self):
        return self.get_weights()

In [4]:
class ReplayBuffer:
    def __init__(self, max_size, batch_size):
        self.cntr = 0
        self.size = 0
        self.max_size = max_size
        self.batch_size = batch_size

        self.states_memory = np.zeros([self.max_size, 6], dtype=np.float32)
        self.acts_memory = np.zeros(self.max_size, dtype=np.uint8)
        self.rewards_memory = np.zeros(self.max_size, dtype=np.float32)
        self.next_states_memory = np.zeros([self.max_size, 6], dtype=np.float32)
        self.dones_memory = np.zeros(self.max_size, dtype=np.uint8)
        self.tderrors_memory = np.zeros(self.max_size, dtype=np.float32)

    def add(self, exp):
        self.states_memory[self.cntr] = exp.state
        self.acts_memory[self.cntr] = exp.act
        self.rewards_memory[self.cntr] = exp.reward
        self.next_states_memory[self.cntr] = exp.next_state
        self.dones_memory[self.cntr] = exp.done
        self.tderrors_memory[self.cntr] = exp.tderrors_memory
        self.cntr = (self.cntr+1) % self.max_size
        self.size = min(self.size+1, self.max_size)

    def sampling(self):
        td_errors = [self.tderrors_memory[index] for index in range(len(self.tderrors_memory))]
        p = self._make_prob(td_errors)
        batch_indexes = np.random.choice(len(self.tderrors_memory), size=self.batch_size, p=p).tolist()

        state      = np.array([self.states_memory[index] for index in batch_indexes])
        action     = np.array([self.acts_memory[index] for index in batch_indexes])
        reward     = np.array([self.rewards_memory[index] for index in batch_indexes])
        next_state = np.array([self.next_states_memory[index] for index in batch_indexes])
        done       = np.array([self.dones_memory[index] for index in batch_indexes])
        
        return {'state': state, 'next_state': next_state, 'reward': reward, 'action': action, 'done': done}

    def _make_prob(self, td_errors, alpha=0.5, eps=np.float32(0.001)):
        abs_td_errors = np.power(np.abs(td_errors) + eps, alpha)
        return abs_td_errors / np.sum(abs_td_errors)

    def set_transition(self, batch):
        states, actions = batch['state'], batch['action']
        rewards, next_states = batch['reward'], batch['next_state']
        dones, tderrors = batch['done'], batch['tderrors']
       
        for state, action, reward, next_state, done, tderror in zip(states, actions, 
                                                                    rewards,next_states, dones, tderrors):
            self.states_memory[self.cntr] = state
            self.acts_memory[self.cntr] = action
            self.rewards_memory[self.cntr] = reward
            self.next_states_memory[self.cntr] = next_state
            self.dones_memory[self.cntr] = done
            self.tderrors_memory[self.cntr] = tderror
            self.cntr = (self.cntr+1) % self.max_size
            self.size = min(self.size+1, self.max_size)

In [5]:
def train(episodes, q, qlist):
    n_actions = 7
    gamma = 9.9e-1
    max_size = 50_000
    batch_size = 2048
    episodes = episodes
    target_update_interval = 100
    train_interval_time = 10

    buffer = ReplayBuffer(max_size, batch_size)

    model = NeuralNetwork()
    model_target = NeuralNetwork()
    optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
    while True:
        if not q.empty():
            batch = q.get()
            buffer.set_transition(batch)
        sleep(train_interval_time)
        if buffer.size >= buffer.batch_size:
            break           

    for episode in range(episodes):
        batch = buffer.sampling()

        q_next = model(batch['next_state'])
        q_target_next = model_target(batch['next_state'])

        next_actions = tf.cast(tf.argmax(q_next, axis=1), tf.int32)
        next_actions_onehot = tf.one_hot(next_actions, n_actions)

        next_maxQ = tf.reduce_sum(
            q_target_next * next_actions_onehot, axis=1, keepdims=True)
        TQ = batch['reward'] + gamma * (1 - batch['done']) * next_maxQ

        with tf.GradientTape() as tape:
            qvalues = model(batch['state'])
            actions_onehot = tf.one_hot(batch['action'], n_actions)
            Q = tf.reduce_sum(qvalues * actions_onehot, axis=1, keepdims=True)
            td_errors = tf.square(TQ - Q)
            loss = tf.reduce_mean(td_errors)

        grads = tape.gradient(loss, model.trainable_variables)
        grads, _ = tf.clip_by_global_norm(grads, 40.0)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))

        # workerにデータ送信
        for ql in qlist:
            ql.put(model.get_model())

        if episode % target_update_interval == 0 and episode != 0:
            model_target.update_model(model.get_model())
            print(f'Learner training times: {str(episode)} / {str(episodes)}')
        
        for _ in range(4):
            if not q.empty():
                batch = q.get()
                buffer.set_transition(batch)

    # workerにデータ送信
    for ql in qlist:
        ql.put('final')

In [6]:
class Experiences:
    def __init__(self):
        '''
        データ転送用クラス
        agentのExperiencesを蓄える用
        '''
        self.state = np.zeros([0, 6], dtype=np.float32)
        self.action = np.zeros(0, dtype=np.uint8)
        self.reward = np.zeros(0, dtype=np.float32)
        self.next_state = np.zeros([0, 6], dtype=np.float32)
        self.done = np.zeros(0, dtype=np.uint8)
        self.tderrors = np.zeros(0, dtype=np.float32)

    def exp_reset(self):
        self.state = np.zeros([0, 6], dtype=np.float32)
        self.action = np.zeros(0, dtype=np.uint8)
        self.reward = np.zeros(0, dtype=np.float32)
        self.next_state = np.zeros([0, 6], dtype=np.float32)
        self.done = np.zeros(0, dtype=np.uint8)
        self.tderrors = np.zeros(0, dtype=np.float32)

    def exp_add(self, state, action, reward, next_state, done):
        state  = np.reshape(state, (1, 6))
        self.state = np.append(self.state, state, axis=0)
        self.action = np.append(self.action, np.array(action))
        self.reward = np.append(self.reward, np.array(reward))
        next_state  = np.reshape(next_state, (1, 6))
        self.next_state = np.append(self.next_state, next_state, axis=0)
        self.done =  np.append(self.done, np.array(done))

    def transition(self):
        key = ['state', 'action', 'reward', 'next_state', 'done', 'tderrors']
        value = [self.state, self.action, self.reward,
                 self.next_state, self.done, self.tderrors]
        return dict(zip(key,value))

In [7]:
class Agent(Experiences):
    def __init__(self, epsilon):
        self.model        = NeuralNetwork()
        self.model_target = NeuralNetwork()

        self.gamma = 0.99
        self.epsilon = epsilon
        self.n_actions = 7

        super().__init__()

    def act(self, state):
        ran_num = np.random.rand()
        if ran_num <= self.epsilon:
            return np.random.choice(7)
        else:
            return self._predict(state)

    def _predict(self, state):
        act = self.model(state)
        return np.argmax(act)

    def make_tderror(self):
        td_q = self.model_target(self.next_state)
        q = self.model_target(self.state)
        q_value = tf.reduce_sum(q * tf.one_hot(self.action, self.n_actions),
                                axis = 1)
        self.tderrors = self.reward + (1 - self.done) * self.gamma * np.max(td_q, axis=1) - q_value

In [8]:
def main(epsilon, agent_num, q, rq):
    os.environ["CUDA_VISIBLE_DEVICES"] = ""
    env = Environment(df)
    target_update_interval = 20
    agent = Agent(epsilon)
    terminal = False
    cnt = 0
    train_interval_time = env.df_total_steps // 5

    while not terminal:
        done = False
        total_reward = 0
        state = env.reset()

        while not done:
            action = agent.act(np.array([state]))
            next_state, reward, done, info = env.step(action)
            reward = reward_clipping(reward)
            agent.exp_add(state, action, reward, next_state, done)
            total_reward += reward

            if len(agent.action) == train_interval_time:
                agent.make_tderror()
                trans = agent.transition()
                q.put(trans)
                agent.exp_reset()

            if not rq.empty():
                w = rq.get()
                if w == 'final':
                    terminal = True
                    break
                else:
                    agent.model.update_model(w)
                    if cnt % target_update_interval == 0:
                        agent.model_target.update_model(agent.model.get_model())
                    cnt += 1

            state = next_state

        if agent_num == 0:
            print(f'Reward:{str(reward)} Total Reward:{str(total_reward)}')

In [9]:
def reward_clipping(val):
        result = 1 if val > 0 else 0 if val == 0 else -1
        return result

In [None]:
worker_num = 3
episodes = 10**4

q = Manager().Queue(4) # 最初に入れたデータを最初に取り出す。
qlist = [Manager().Queue(1) for _ in range(worker_num)]
epsilon_list = [0, 0.8, 0.5]

with ProcessPoolExecutor(max_workers=4) as executor:
    data = []
    for rq, e, i in zip(qlist, epsilon_list, range(worker_num)):
        print(f'start worker_{i}')
        data.append(executor.submit(main, e, i, q, rq))
    print('start train')
    data.append(executor.submit(train, episodes, q, qlist))

start worker_0
start worker_1
start worker_2
start train
Reward:0 Total Reward:-152
Reward:0 Total Reward:-147
Reward:0 Total Reward:-147
Reward:-1 Total Reward:-22765
Reward:-1 Total Reward:-153919
Reward:0 Total Reward:-131124
Reward:0 Total Reward:-145
Reward:0 Total Reward:-145
Reward:0 Total Reward:-145
