# Trading Agent Development

In [2]:
import gym
from gym import spaces
import numpy as np
import pandas as pd
import random
import enum

## Environment

In [None]:
class TradingActions(enum.Enum):
    """ Actions for trading. Buy or sell all. No amount possible. """
    Hold = 0
    Buy = 1
    Sell = 2

class RLTradingEnv(gym.Env):
    """ A trading environment for OpenAI gym """
    metadata = {'render.modes': ['human']}

    def __init__(self,
                 data_frame_dict: dict,
                 price_column='Close',
                 indicator_columns=['MA5', 'MA30', 'MA90'],
                 starting_cash=100000,
                 commission_rate=0,
                 sample_days=30,
                 random_on_reset=True,
                 verbose=True
                 ):
        """
        Represents the initialization method for the RLTradingEnv class, where key
        parameters related to the trading environment such as data frames, pricing
        information, initial cash balance, commission rate, and sampling days
        are set upon instantiation.

        :param data_frame_dict: A dictionary where keys represent identifiers for
            trading instruments or assets, and values are Pandas DataFrames
            containing time-series market data for each respective instrument.
            Each episode, one stock is trained.
        :param price_column: Name of the column in the DataFrame that contains
            the price information. This is typically the closing price of the
            instrument.
        :param indicator_columns: Names of the columns in the DataFrame that contain
            metrics for agent to base his decision on
        :param starting_cash: The initial amount of cash available in the environment
            for trading (at beginning of every episode).
        :param commission_rate: The commission or transaction fee rate applied to
            each trade. Represented as a decimal value, where 0.01 corresponds to
            a 1% transaction cost.
        :param sample_days: Number of previous days that are observable to the agent in each episode.
        :param random_on_reset: whether each episode randomly selects a stock to train
        """
        super(RLTradingEnv, self).__init__()  # initialise base class
        self.data_frame_dict = data_frame_dict
        self.price_column = price_column

        # moving average days to be calculated:
        self.indicator_columns = indicator_columns
        self.starting_cash = starting_cash
        self.commission_rate = commission_rate
        self.sample_days = sample_days
        self.random_on_reset = random_on_reset
        self.verbose = verbose

        # current episode (sequence of steps)
        self.cur_episode = self.next_episode() if random_on_reset else 0

        # current step (status and actions)
        self.cur_step = self.sample_days

        # set of possible actions as discrete values:
        self.action_space = spaces.Discrete(len(TradingActions))

        # set of floats representing available information for agent:
        self.observation_space = spaces.Box(low=-np.inf,
                                            high=np.inf,
                                            shape=(len(self.indicator_columns) * self.sample_days + 3,),
                                            dtype=np.float16)
        # contains indicator columns and three more for (cash, price and action)

        # possible range of rewards for actions:
        self.reward_range = (-np.inf, np.inf)

        # initial conditions:
        self.cash = self.starting_cash
        self.shares = 0

    def step(self, action):
        """
        Execute an action, return the reward and move to the next status.
        Takes action.
        Returns observation (box), reward (float), done (bool, indicating whether episode is finished) and info (dict).
        """
        # infer current balance:
        balance = self.cur_balance

        # increase step and check if episode is finished (adjust cur_step and cur_episode)
        self.cur_step += 1
        if self.cur_step == self.total_steps:
            self.cur_episode = self.next_episode()
            self.cur_step = self.sample_days

        # take action:
        self.take_action(action)

        # compute status and calculate reward:
        obs = self.next_observation(action)
        reward = self.cur_balance - balance  # equals change of balance
        done = self.cur_step == self.total_steps - 1  # see if episode is finished

        # construct info dictionary:
        info = { 'Step'  : self.cur_step,
                 'Reward' : round(reward, 2),
                 'Symbol' : self.cur_symbol,
                 'Action' : TradingActions(action).name,
                 'Shares' : self.shares,
                 'Close'  : round(self.cur_close_price, 2),
                 'Cash'   : round(self.cash, 2),
                 'Total'  : round(self.cur_balance, 2) }

        # if done: self.reset()  # happens automatically!

        return obs, reward, done, info

    def take_action(self, action):
        """ Takes action and updates current balance. Hold has no implication. """
        if action == TradingActions.Buy.value:  # buy all:
            if self.shares == 0:  # only if no shares held:
                price = self.cur_close_price * (1 + self.commission_rate)
                self.shares = int(self.cash / price)
                self.cash -= self.shares * price
                if self.verbose:
                    print(f"[STEP {self.cur_step}] Bought {self.cur_symbol} at {self.cur_close_price}")
                    print(f"    Cash: {self.cash}, Shares: {self.shares}")
        elif action == TradingActions.Sell.value:  # sell all:
            if self.shares > 0:  # only if there are shares to sell:
                price = self.cur_close_price * (1 - self.commission_rate)
                self.cash += self.shares * price
                self.shares = 0
                if self.verbose:
                    print(f"[STEP {self.cur_step}] Sold {self.cur_symbol} at {self.cur_close_price}")
                    print(f"    Cash: {self.cash}, Shares: {self.shares}")

    def next_observation(self, action):
        """ Construct and return new observable status. Reads current indicators (for sample_days), cash, balance and action. """
        observation = np.array([], dtype=np.int64)
        for i in range(self.sample_days, 0, -1):
            observation = np.append(observation, list(self.cur_indicators.iloc[self.cur_step - i + 1, :]))
        return np.append(observation, [self.cash, self.shares * self.cur_close_price, action])

    def next_episode(self):
        """ Select stock to be trained at beginning of episode. """
        if self.random_on_reset:
            return random.randrange(0, self.total_episodes)
        else:
            return (self.cur_episode + 1) % self.total_episodes

    def reset(self):
        """ Initialises environment when starting new episode. Returns next status. """
        if self.verbose:  # info statement
            print(f"Starting episode {self.cur_episode + 1}", (f"/{self.total_episodes}" if not self.random_on_reset else ""))
        self.cash = self.starting_cash
        self.shares = 0
        return self.next_observation(TradingActions.Hold.value)

    ################ properties ################
    @property
    def total_episodes(self):
        """ Total episodes equals amount of provided stock data frames. """
        return len(self.data_frame_dict)

    @property
    def cur_symbol(self):
        """ Current symbol is the key of the current episode's data frame. """
        return list(self.data_frame_dict.keys())[self.cur_episode]

    @property
    def cur_data(self):
        """ Current data is the current episode's data frame. '"""
        return self.data_frame_dict[self.cur_symbol]

    @property
    def cur_indicators(self):
        """ Current indicators are the technical indicators in the current data. """
        return self.cur_data[self.indicator_columns]

    @property
    def total_steps(self):
        """ Total steps equals length of current stock data frame."""
        return len(self.cur_data)

    @property
    def cur_close_price(self):
        """ Current close price is the price of the current symbol at current time. """
        return self.cur_data[self.price_column].iloc[self.cur_step].item()

    @property
    def cur_balance(self):
        """ Current balance equals cash plus current shares times current price. """
        return self.cash + (self.shares * self.cur_close_price)