In [26]:
import numpy as np
import gym
from gym import spaces
from gym.utils import seeding
from enum import Enum
import matplotlib.pyplot as plt
import pandas as pd
from collections import deque
import itertools
from typing import List
import pickle as pkl

class Actions(Enum):
    Sell = 0
    Buy = 1


class Positions(Enum):
    Short = 0
    Long = 1

    def opposite(self):
        return Positions.Short if self == Positions.Long else Positions.Long

def safe_divide(a, b):
    return np.divide(a, b, out=np.zeros_like(a), where=b!=0)

def moving_average(iterable, n=3):
    # moving_average([40, 30, 50, 46, 39, 44]) --> 40.0 42.0 45.0 43.0
    # http://en.wikipedia.org/wiki/Moving_average
    it = iter(iterable)
    d = deque(itertools.islice(it, n-1))
    print(next(it))
    d.appendleft(0)
    s = sum(d)
    print(d)
    print(s)
    for elem in it:
        s += elem - d.popleft()
        d.append(elem)
        yield s / n

In [9]:
# temp data
df = pd.DataFrame()
df['SSI'] = price
df['HPG'] = price

In [10]:
window = deque(maxlen=5)
data = df.iloc[:5].to_numpy()

In [51]:
window.extend(data)

In [53]:
window.clear()

In [54]:
window

deque([])

In [None]:
n = 5000
h = 5
pe = 50
sig = 0.1

# trading params
tick_size = 0.1
lot_size = 100
n_action = 5
M = 10

# calculated params
dt = 1 / n
lmbda = np.log(2) / h
action_space = lot_size * np.arange(-n_action, n_action+1)
holdings = np.arange(-M, M+1)


In [None]:
class Positions(Enum):
    Short = 0
    Long = 1

    def opposite(self):
        return Positions.Short if self == Positions.Long else Positions.Long


class TradingEnv(gym.Env):

    metadata = {'render.modes': ['human']}

    def __init__(
        self,
        df: pd.DataFrame,
        window_size: int = 1,
        n_action: int = 5,
        max_asset: int = 5,
        tick_size: float = 0.1,
        lot_size: int = 100,
        start_nav: float = 1e6,
        kappa: float = 0.02,
    ):

        self.seed()
        self.df = df
        self.window_size = window_size
        self.window = deque(maxlen=self.window_size)
        self.shape = (window_size, self.df.shape[1])
        self.tick_size = tick_size
        self.lot_size = lot_size
        self.kappa = kappa
        self.start_nav = start_nav
        self.shares = np.arange(-200, 300, 100)

        # spaces
        self.n_action = n_action
        self.action_space = spaces.MultiDiscrete(np.ones(max_asset) * (n_action - 1))
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=self.shape, dtype=np.float32)

        # episode
        self._start_tick = self.window_size - 1
        self._end_tick = self.df.shape[0] - 1
        self._current_tick = None
        self._last_trade_tick = None
        self._first_rendering = None
        self.done = None
        # self.position = None
        # self.position_history = None
        self.total_reward = None
        self.total_profit = None
        self.history = None

    def seed(self, seed=None):
        self.np_random, seed = seeding.np_random(seed)
        return [seed]

    def reset(self):
        #NOTE:try also self._current_tick = self._start_tick
        self._current_tick = self._start_tick - 1
        self._first_rendering = True
        self.window.clear()
        self.window.extend(df.iloc[:self.window].to_numpy())
        self.done = False
        self.total_reward = 0.
        self.total_profit = 1.  # unit
        self.history = {
            'actions': (self.window_size - 1) * [None],
            'delta_vt': (self.window_size - 1) * [0],
            'total_reward': (self.window_size - 1) * [self.total_reward],
            'total_profit': (self.window_size - 1) * [self.total_profit]
        }
        return self._get_observation()


    def step(self, action: List[int]):
        self._current_tick += 1
        new_prices = df.iloc[self._current_tick].to_numpy()
        
        self.done = False
        if self._current_tick == self._end_tick:
            self.done = True
        
        delta_vt = self.delta_vt(action, new_prices)
        
        step_reward = self._calculate_reward(delta_vt)
        self.total_reward += step_reward
        self.total_profit += delta_vt
        
        # always update history last
        info = dict(
            actions = action,
            delta_vt = delta_vt,
            total_reward = self.total_reward,
            total_profit = self.total_profit,
        )
        self._update_history(info)

        return new_prices, step_reward, self.done, info


    def _get_observation(self):
        # process window
        return self.window[-1]


    def _update_history(self, info):
        if not self.history:
            self.history = {key: [] for key in info.keys()}

        for key, value in info.items():
            self.history[key].append(value)
    
    def spread_cost(self, dn: np.ndarray) -> float:
        return sum(dn * self.tick_size)

    def impact_cost(self, dn: np.ndarray) -> float:
        return sum(dn ** 2 * self.tick_size / self.lot_size)
    
    def total_cost(self, dn: np.ndarray) -> float:
        return self.spread_cost(dn) + self.impact_cost(dn)
    
    def delta_vt(
        self, 
        action: np.ndarray,
        prices: np.ndarray,
    ):
        shares = self._decode_action(action)
        prev_action = self.history['actions'][-1]
        prev_shares = self._decode_action(prev_action)
        dn = shares - prev_shares
        rate = safe_divide(prices, self.window[-1]) - 1
        return np.sum(prev_shares * window[-1] * rate) - self.total_cost(dn)
    
    def _decode_action(self, action: np.ndarray) -> np.ndarray:
        return np.take(self.shares, action)

    def render(self, mode='human'):

        def _plot_position(position, tick):
            color = None
            if position == Positions.Short:
                color = 'red'
            elif position == Positions.Long:
                color = 'green'
            if color:
                plt.scatter(tick, self.prices[tick], color=color)

        if self._first_rendering:
            self._first_rendering = False
            plt.cla()
            plt.plot(self.prices)
            start_position = self.history['position_history'][self._start_tick]
            _plot_position(start_position, self._start_tick)

        _plot_position(self._position, self._current_tick)

        plt.suptitle(
            "Total Reward: %.6f" % self._total_reward + ' ~ ' +
            "Total Profit: %.6f" % self._total_profit
        )

        plt.pause(0.01)


    def render_all(self, mode='human'):
        window_ticks = np.arange(len(self.history['position_history']))
        plt.plot(self.prices)

        short_ticks = []
        long_ticks = []
        for i, tick in enumerate(window_ticks):
            if self.history['position_history'][i] == Positions.Short:
                short_ticks.append(tick)
            elif self.history['position_history'][i] == Positions.Long:
                long_ticks.append(tick)

        plt.plot(short_ticks, self.prices[short_ticks], 'ro')
        plt.plot(long_ticks, self.prices[long_ticks], 'go')

        plt.suptitle(
            "Total Reward: %.6f" % self._total_reward + ' ~ ' +
            "Total Profit: %.6f" % self._total_profit
        )
        
        
    def close(self):
        plt.close()


    def save_rendering(self, filepath):
        plt.savefig(filepath)


    def pause_rendering(self):
        plt.show()

    def _calculate_reward(self, delta_vt):
        return delta_vt - self.kappa * (delta_vt ** 2)

    def max_possible_profit(self):  # trade fees are ignored
        raise NotImplementedError


In [49]:
action_space = spaces.MultiDiscrete(np.ones(5) * (5 - 1))
action = np.zeros_like(action_space.sample())

In [50]:
shares = np.arange(-200, 300, 100)
np.take(shares, action)

array([-200, -200, -200, -200, -200])

In [56]:
2**2

4