<img src="http://hilpisch.com/tpq_logo.png" alt="The Python Quants" width="35%" align="right" border="0"><br>

# Reinforcement Learning for Finance

**Chapter 03 &mdash; Financial Q-Learning**

&copy; Dr. Yves J. Hilpisch

<a href="http://tpq.io" target="_blank">http://tpq.io</a> | <a href="http://twitter.com/dyjh" target="_blank">@dyjh</a> | <a href="mailto:team@tpq.io">team@tpq.io</a>

## Finance Environment

In [None]:
!git clone https://github.com/tpq-classes/rl_4_finance.git
import sys
sys.path.append('rl_4_finance')


In [None]:
import random

In [None]:
class ActionSpace:
    def sample(self):
        return random.randint(0, 1)

In [None]:
action_space = ActionSpace()

In [None]:
[action_space.sample() for _ in range(10)]

In [None]:
import numpy as np
import pandas as pd
from pylab import plt
plt.style.use('seaborn-v0_8')

In [None]:
url = 'https://certificate.tpq.io/rl4finance.csv'
data = pd.read_csv(url, index_col=0, parse_dates=True)

In [None]:
data.info()

In [None]:
# data['EUR='].plot();

In [None]:
class Finance:
    # url = 'https://certificate.tpq.io/findata.csv'  # <1>
    url = 'https://certificate.tpq.io/rl4finance.csv'  # <1>
    def __init__(self, symbol, feature, min_accuracy=0.485, n_features=4):
        self.symbol = symbol  # <2>
        self.feature = feature  # <3>
        self.n_features = n_features  # <4>
        self.action_space = ActionSpace()  # <5>
        self.min_accuracy = min_accuracy  # <6>
        self._get_data()  # <7>
        self._prepare_data()  # <8>
    def _get_data(self):
        self.raw = pd.read_csv(self.url,
                index_col=0, parse_dates=True)  # <7>

In [None]:
class Finance(Finance):
    def _prepare_data(self):
        self.data = pd.DataFrame(self.raw[self.symbol]).dropna()  # <1>
        self.data['r'] = np.log(self.data / self.data.shift(1))  # <2>
        self.data['d'] = np.where(self.data['r'] > 0, 1, 0)  # <3>
        self.data.dropna(inplace=True)  # <4>
        self.data_ = (self.data - self.data.mean()) / self.data.std()  #  <5>
    def reset(self):
        self.bar = self.n_features  # <5>
        self.treward = 0  # <6>
        state = self.data_[self.feature].iloc[
            self.bar - self.n_features:self.bar].values  # <7>
        return state, {}

In [None]:
class Finance(Finance):
    def step(self, action):
        if action == self.data['d'].iloc[self.bar]:  # <1>
            correct = True
        else:
            correct = False
        reward = 1 if correct else 0  # <2>
        self.treward += reward  # <3>
        self.bar += 1  # <4>
        self.accuracy = self.treward / (self.bar - self.n_features)  # <5>
        if self.bar >= len(self.data):  # <6>
            done = True
        elif reward == 1:  # <7>
            done = False
        elif (self.accuracy < self.min_accuracy) and (self.bar > 15):  # <8>
            done = True
        else:
            done = False
        next_state = self.data_[self.feature].iloc[
            self.bar - self.n_features:self.bar].values  # <9>
        return next_state, reward, done, False, {}

In [None]:
fin = Finance(symbol='EUR=', feature='EUR=', n_features=4)  # <1>

In [None]:
list(fin.raw.columns)  # <2>

In [None]:
fin.reset()

In [None]:
# fin.data_['EUR='].mean(), fin.data_['EUR='].std()

In [None]:
# fin.data_['EUR='].plot();

In [None]:
fin.action_space.sample()

In [None]:
fin.step(fin.action_space.sample())

In [None]:
fin = Finance('EUR=', 'r')  # <3>

In [None]:
fin.reset()

In [None]:
class RandomAgent:
    def __init__(self):
        self.env = Finance('EUR=', 'r')
    def play(self, episodes=1):
        self.trewards = list()
        for e in range(episodes):
            self.env.reset()
            for step in range(1, 100):
                a = self.env.action_space.sample()
                state, reward, done, trunc, info = self.env.step(a)
                if done:
                    self.trewards.append(step)
                    break

In [None]:
ra = RandomAgent()

In [None]:
ra.play(15)

In [None]:
ra.trewards

In [None]:
round(sum(ra.trewards) / len(ra.trewards), 2)  # <1>

In [None]:
len(fin.data)  # <2>

In [None]:
import os
import warnings
warnings.simplefilter('ignore')
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

In [None]:
import random
import numpy as np
import tensorflow as tf
from tensorflow import keras
from collections import deque
from tensorflow.keras.layers import Dense
from tensorflow.keras.models import Sequential

In [None]:
opt = keras.optimizers.Adam(learning_rate=0.0001)

In [None]:
class DQLAgent:
    def __init__(self, symbol, feature, min_accuracy, n_features=4):
        self.epsilon = 1.0
        self.epsilon_decay = 0.9975
        self.epsilon_min = 0.1
        self.memory = list()
        self.batch_size = 32
        self.gamma = 0.5
        self.trewards = deque(maxlen=2000)
        self.max_treward = 0
        self.n_features = n_features
        self._create_model()
        self.env = Finance(symbol, feature,
                    min_accuracy, n_features)  # <1>
    def _create_model(self):
        self.model = Sequential()
        self.model.add(Dense(24, activation='relu',
                             input_dim=self.n_features))
        self.model.add(Dense(24, activation='relu'))
        self.model.add(Dense(2, activation='linear'))
        self.model.compile(loss='mse', optimizer=opt)

    def act(self, state):
        if random.random() < self.epsilon:
            return self.env.action_space.sample()
        q = self.model(tf.convert_to_tensor(state, dtype=tf.float32), training=False)
        return int(tf.argmax(q[0]).numpy())

    def replay(self):
        batch = random.sample(self.memory, self.batch_size)

        states      = np.vstack([b[0] for b in batch]).astype(np.float32)
        actions     = np.array([b[1] for b in batch], dtype=np.int32)
        next_states = np.vstack([b[2] for b in batch]).astype(np.float32)
        rewards     = np.array([b[3] for b in batch], dtype=np.float32)
        dones       = np.array([b[4] for b in batch], dtype=np.bool_)

        # Q(s, :)
        q_states = self.model(states, training=False).numpy()          # (B, A)

        # max_a' Q(s', a')
        q_next = self.model(next_states, training=False).numpy()       # (B, A)
        max_q_next = np.max(q_next, axis=1)                            # (B,)

        targets = q_states.copy()
        targets[np.arange(self.batch_size), actions] = rewards + self.gamma * max_q_next * (~dones)

         # One update only (FAST)
        self.model.train_on_batch(states, targets)
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def learn(self, episodes):
        for e in range(1, episodes + 1):
            state, _ = self.env.reset()
            state = np.reshape(state, [1, self.n_features])
            for f in range(1, 5000):
                action = self.act(state)
                next_state, reward, done, trunc, _ = self.env.step(action)
                next_state = np.reshape(next_state, [1, self.n_features])
                self.memory.append(
                    [state, action, next_state, reward, done])
                state = next_state
                if done:
                    self.trewards.append(f)
                    self.max_treward = max(self.max_treward, f)
                    templ = f'episode={e:4d} | treward={f:4d}'
                    templ += f' | max={self.max_treward:4d}'
                    print(templ, end='\r')
                    break
            if len(self.memory) > self.batch_size:
                self.replay()
        print()
    def test(self, episodes):
        ma = self.env.min_accuracy  # <2>
        self.env.min_accuracy = 0.5  # <3>
        for e in range(1, episodes + 1):
            state, _ = self.env.reset()
            state = np.reshape(state, [1, self.n_features])
            for f in range(1, 5001):
                action = np.argmax(self.model(tf.convert_to_tensor(state, dtype=tf.float32), training=False).numpy()[0])
                state, reward, done, trunc, _ = self.env.step(action)
                state = np.reshape(state, [1, self.n_features])
                if done:
                    print(f'total reward={f} | accuracy={self.env.accuracy:.3f}')
                    break
        self.env.min_accuracy = ma  # <2>

In [None]:
agent = DQLAgent('EUR=', 'r', 0.495, 4)  # normalized returns as features

In [None]:
# agent = DQLAgent('EUR=', 'EUR=', 0.45, 4)  # normalized price data as features

In [None]:
# agent = DQLAgent('AAPL.O', 'r', 0.495, 4)  # normalized returns as features

In [None]:
# agent = DQLAgent('AAPL.O', 'AAPL.O', 0.495, 4)  # normalized price data as features

In [None]:
# agent = DQLAgent('AAPL.O', 'r', 0.495, 8)  # normalized returns data as features (8 instead of 4)

In [None]:
%time agent.learn(333)

In [None]:
agent.test(5)  # <1>

<img src="http://hilpisch.com/tpq_logo.png" alt="The Python Quants" width="35%" align="right" border="0"><br>

<a href="http://tpq.io" target="_blank">http://tpq.io</a> | <a href="http://twitter.com/dyjh" target="_blank">@dyjh</a> | <a href="mailto:team@tpq.io">team@tpq.io</a>