<img src='https://certificate.tpq.io/quantsdev_banner_color.png' width="250px" align="right">

# Reinforcement Learning

&copy; Dr Yves J Hilpisch | The Python Quants GmbH

[quants@dev Discord Server](https://discord.gg/uJPtp9Awaj) | [@quants_dev](https://twitter.com/quants_dev) | <a href="mailto:qd@tpq.io">qd@tpq.io</a>

<img src="https://hilpisch.com/aiif_cover_shadow.png" width="300px" align="left">

## Simulated Financial Processes

In [None]:
import os
import math
import random
import numpy as np
import pandas as pd
from pylab import plt
from collections import deque
plt.style.use('seaborn')
os.environ['PYTHONHASHSEED'] = '0'
np.set_printoptions(precision=4, suppress=True)

In [None]:
import warnings
warnings.simplefilter('ignore')

In [None]:
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '4'

In [None]:
from tensorflow.python.framework.ops import disable_eager_execution
disable_eager_execution()

In [None]:
import tensorflow as tf
from tensorflow import keras
from keras.layers import Dense, Dropout
from keras.models import Sequential

In [None]:
def set_seeds(seed=100):
    random.seed(seed)
    np.random.seed(seed)
    tf.random.set_seed(seed)

## Simulation Environment (2)

In [None]:
class observation_space:
    def __init__(self, n):
        self.shape = (n,)

In [None]:
class action_space:
    def __init__(self, n):
        self.n = n
    def sample(self):
        return random.randint(0, self.n - 1)

In [None]:
class Simul:
    def __init__(self, symbol, features, window, lags, steps,
                 x0=100, kappa=1, theta=100, sigma=0.2,
                 leverage=1, min_accuracy=0.525, min_performance=0.85,
                 start=0, end=None, mu=None, std=None,
                 normalize=True, renew=False):
        self.symbol = symbol
        self.features = features
        self.n_features = len(features)
        self.window = window
        self.lags = lags
        self.steps = steps
        self.x0 = x0
        self.kappa = kappa
        self.theta = theta
        self.sigma = sigma
        self.leverage = leverage
        # minimum required prediction accuracy
        self.min_accuracy = min_accuracy
        # minimum required financial performance
        self.min_performance = min_performance
        self.start = start
        self.end = end
        self.mu = mu
        self.std = std
        self.normalize = normalize
        self.renew = renew
        self.observation_space = observation_space(self.lags)
        self.action_space = action_space(2)
        self._simulate_data()
        self._prepare_data()
    def _simulate_data(self):
        s = [self.x0]
        dt = 1 / self.steps
        for t in range(1, self.steps + 1):
            s_ = (s[t - 1] + self.kappa * (self.theta - s[t - 1]) * dt + s[t - 1] *
                  self.sigma * math.sqrt(dt) * random.gauss(0, 1))
            s.append(s_)
        self.data = pd.DataFrame(s, columns=[self.symbol],
                                index=pd.date_range(start='2022-1-1',
                                                    end='2023-1-1',
                                                    periods=self.steps+1))
    def _prepare_data(self):
        self.data = self.data.iloc[self.start:]
        self.data['r'] = np.log(self.data / self.data.shift(1))
        self.data.dropna(inplace=True)
        # additional features
        if self.window > 0:
            self.data['sma'] = self.data[self.symbol].rolling(self.window).mean()
            self.data['dif'] = self.data[self.symbol] - self.data['sma']
            self.data['min'] = self.data[self.symbol].rolling(self.window).min()
            self.data['max'] = self.data[self.symbol].rolling(self.window).max()
            self.data['mom'] = self.data['r'].rolling(self.window).mean()
            # add your own features
            self.data.dropna(inplace=True)
        if self.normalize:
            if self.mu is None or self.std is None:
                self.mu = self.data.mean()
                self.std = self.data.std()
            self.data_ = (self.data - self.mu) / self.std
        else:
            self.data_ = self.data.copy()
        self.data['d'] = np.where(self.data['r'] > 0, 1, 0)
        self.data['d'] = self.data['d'].astype(int)
        if self.end is not None:
            self.data = self.data.iloc[:self.end - self.start]
            self.data_ = self.data_.iloc[:self.end - self.start]
    def _get_state(self):
        return self.data_[self.features].iloc[self.bar -
                                self.lags:self.bar]
    def seed(self, seed):
        random.seed(seed)
        np.random.seed(seed)
        tf.random.set_random_seed(seed)
    def reset(self):
        if self.renew:
            self._simulate_data()
            self._prepare_data()
        self.treward = 0
        self.accuracy = 0
        self.performance = 1
        self.bar = self.lags
        state = self._get_state()
        return state.values
    def step(self, action):
        correct = action == self.data['d'].iloc[self.bar]  # prediction correct?
        ret = self.data['r'].iloc[self.bar] * self.leverage  # return incl. leverage
        reward_ = 1 if correct else 0  # reward of 1 if prediction correct, 0 otherwise
        pl = abs(ret) if correct else -abs(ret)  # strategy performance
        reward = reward_ + 100 * pl  # reward used for learning
        # reward = reward_ # reward used for learning (correct prediction)
        # reward = pl # reward used for learning (only strategy return)
        # reward = reward_ + 100 * pl # reward used for learning (correct prediction + return)
        self.treward += reward_  # total reward
        self.bar += 1
        self.accuracy = self.treward / (self.bar - self.lags)  # prediction accuracy
        self.performance *= math.exp(pl)  # ansolute/gross performance
        if self.bar >= len(self.data):
            done = True
        elif reward_ == 1:
            done = False
        elif (self.accuracy < self.min_accuracy and
              self.bar > self.lags + 10):
            done = True
        elif (self.performance < self.min_performance and
              self.bar > self.lags + 10):
            done = True
        else:
            done = False
        state = self._get_state()
        info = {}
        return state.values, reward, done, info

In [None]:
sym = 'EUR='

In [None]:
# set_seeds(100)
env = Simul(sym, [sym, 'r'], window=20, lags=4, steps=365,
            x0=100, kappa=2, theta=300, sigma=0.2,
            normalize=False, renew=True)

In [None]:
env.reset()

In [None]:
env.data[sym].plot(figsize=(10, 6));

In [None]:
a = env.action_space.sample()
a

In [None]:
env.reset()

In [None]:
env.step(a)

## FQL Agent

In [None]:
class FQLAgent:
    def __init__(self, hidden_units, learning_rate, env,
                 gamma=0.95, ed=0.99, dropout=False):
        self.learn_env = env
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = ed
        self.learning_rate = learning_rate
        self.gamma = gamma
        self.batch_size = 32
        self.max_treward = 0
        self.averages = list()
        self.performances = list()
        self.aperformances = list()
        self.memory = deque(maxlen=2000)
        self.trewards = []
        self.dropout = dropout
        self.model = self._build_model(hidden_units, learning_rate)
        
    def _build_model(self, hu, lr):
        model = Sequential()
        model.add(Dense(hu, input_shape=(
            self.learn_env.lags, self.learn_env.n_features),
                        activation='relu'))
        if self.dropout:
            model.add(Dropout(0.3, seed=100))
        model.add(Dense(hu, activation='relu'))
        if self.dropout:
            model.add(Dropout(0.3, seed=100))
        model.add(Dense(2, activation='linear'))
        model.compile(
            loss='mse',
            optimizer=keras.optimizers.Adam(learning_rate=lr)
        )
        return model
        
    def act(self, state):
        if random.random() <= self.epsilon:
            return self.learn_env.action_space.sample()
        action = self.model.predict(state)[0, 0]
        return np.argmax(action)
    
    def replay(self):
        batch = random.sample(self.memory, self.batch_size)
        for state, action, reward, next_state, done in batch:
            if not done:
                reward += self.gamma * np.amax(
                    self.model.predict(next_state)[0, 0])
            target = self.model.predict(state)
            target[0, 0, action] = reward
            self.model.fit(state, target, epochs=1,
                           verbose=False)
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
    
    def learn(self, episodes):
        for e in range(1, episodes + 1):
            state = self.learn_env.reset()
            state = np.reshape(state, [1, self.learn_env.lags,
                                       self.learn_env.n_features])
            for _ in range(10000):
                action = self.act(state)
                next_state, reward, done, info = self.learn_env.step(action)
                next_state = np.reshape(next_state,
                                [1, self.learn_env.lags,
                                 self.learn_env.n_features])
                self.memory.append([state, action, reward,
                                     next_state, done])
                state = next_state
                if done:
                    treward = _ + 1
                    self.trewards.append(treward)
                    av = sum(self.trewards[-25:]) / 25
                    perf = self.learn_env.performance
                    self.averages.append(av)
                    self.performances.append(perf)
                    self.aperformances.append(av)
                    self.max_treward = max(self.max_treward, treward)
                    templ = 'episode: {:2d}/{} | treward: {:4d} | '
                    templ += 'perf: {:5.3f} | av: {:5.1f} | max: {:4d}'
                    print(templ.format(e, episodes, treward, perf,
                                  av, self.max_treward), end='\r')
                    break
            if len(self.memory) > self.batch_size:
                self.replay()
        print()
    def test(self, episodes):
        env = self.learn_env
        trewards = []
        performances = []
        accuracies = []
        for e in range(1, episodes + 1):
            env_ = Simul(env.symbol, env.features, env.window, env.lags,
                         env.steps, x0=env.x0, kappa=env.kappa, theta=env.theta,
                         leverage=env.leverage, min_accuracy=env.min_accuracy,
                         min_performance=env.min_performance,
                         sigma=env.sigma, mu=env.mu, std=env.std,
                         normalize=env.normalize)
            state = env_.reset()
            for _ in range(10001):
                state = np.reshape(state, [1, env_.lags,
                                         env_.n_features])
                action = np.argmax(self.model.predict(state)[0, 0])
                next_state, reward, done, info = env_.step(action)
                state = next_state
                if done:
                    treward = _ + 1
                    trewards.append(treward)
                    perf = env_.performance
                    performances.append(perf)
                    acc = env_.accuracy
                    accuracies.append(acc)
                    print('episode: {:4d}/{} | treward: {:4d} | acc: {:.3f} | perf: {:.3f}'
                          .format(e, episodes, treward, acc, perf), end='\r')
                    break
        return trewards, accuracies, performances, env_

## Baseline Scenario

### Environment

In [None]:
# baseline features
features = [sym]

In [None]:
# no risk
env = Simul(sym, features, window=0, lags=4, steps=365,
            x0=1, kappa=1, theta=1.1, sigma=0.0,
            leverage=1, min_accuracy=0.5, min_performance=0.85,
            start=0, end=None, mu=None, std=None,
            normalize=False)

In [None]:
env.reset()

In [None]:
math.exp(env.data['r'].sum())  # passive benchmark performance

In [None]:
env.data[sym].diff().apply(np.sign).value_counts()

In [None]:
ax = env.data['r'].cumsum().apply(np.exp).plot(figsize=(10, 6))
env.data[sym].plot(ax=ax, style='r--', alpha=0.5);

### Training the Agent

In [None]:
set_seeds(100)
agent = FQLAgent(24, 0.001, env)

In [None]:
episodes = 60

In [None]:
%time agent.learn(episodes)

In [None]:
agent.epsilon

In [None]:
plt.figure(figsize=(10, 6))
x = range(1, len(agent.averages) + 1)
y = np.polyval(np.polyfit(x, agent.averages, deg=3), x)
plt.plot(agent.averages, label='moving average')
plt.plot(x, y, 'r--', label='regression')
plt.xlabel('episodes')
plt.ylabel('total reward')
plt.legend();

In [None]:
plt.figure(figsize=(10, 6))
x = range(1, len(agent.performances) + 1)
y = np.polyval(np.polyfit(x, agent.performances, deg=3), x)
plt.plot(agent.performances, label='moving average')
plt.plot(x, y, 'r--', label='regression')
plt.xlabel('episodes')
plt.ylabel('performance')
plt.legend();

### Testing the Agent 

In [None]:
rewards, accuracies, performances, env_ = agent.test(5)

In [None]:
rewards

In [None]:
accuracies

In [None]:
performances

In [None]:
ax = env.data[sym].plot(figsize=(10, 6))
env_.data[sym].plot(ax=ax);

## Trend Scenario

### Environment

In [None]:
# trend features
features = [sym]

In [None]:
# strong trend
set_seeds(100)
env = Simul(sym, features, window=20, lags=4, steps=365,
            x0=100, kappa=1, theta=500, sigma=0.25,
            leverage=1, min_accuracy=0.525, min_performance=0.5,
            start=0, end=None, mu=None, std=None,
            normalize=True)

In [None]:
env.reset()

In [None]:
env.data[sym].diff().apply(np.sign).value_counts()

In [None]:
math.exp(env.data['r'].sum())  # passive benchmark performance

In [None]:
math.exp(np.abs(env.data['r']).sum())  # maximum benchmark performance (= 100% correct predictions)

In [None]:
ax = env.data['r'].cumsum().apply(np.exp).plot(figsize=(10, 6))
env.data[sym].plot(ax=ax, style='r--', alpha=0.5, secondary_y=sym);

### Training the Agent

In [None]:
set_seeds(100)
agent = FQLAgent(24, 0.001, env)

In [None]:
episodes = 100

In [None]:
%time agent.learn(episodes)

In [None]:
agent.epsilon

In [None]:
plt.figure(figsize=(10, 6))
x = range(1, len(agent.averages) + 1)
y = np.polyval(np.polyfit(x, agent.averages, deg=3), x)
plt.plot(agent.averages, label='moving average')
plt.plot(x, y, 'r--', label='regression')
plt.xlabel('episodes')
plt.ylabel('total reward')
plt.legend();

In [None]:
plt.figure(figsize=(10, 6))
x = range(1, len(agent.performances) + 1)
y = np.polyval(np.polyfit(x, agent.performances, deg=3), x)
plt.plot(agent.performances, label='moving average')
plt.plot(x, y, 'r--', label='regression')
plt.xlabel('episodes')
plt.ylabel('performance')
plt.legend();

### Testing the Agent 

In [None]:
rewards, accuracies, performances, env_ = agent.test(5)

In [None]:
rewards

In [None]:
accuracies

In [None]:
performances

In [None]:
ax = env.data[sym].plot(figsize=(10, 6))
env_.data[sym].plot(ax=ax);

In [None]:
np.corrcoef(env.data[sym], env_.data[sym])

## Mean Reversion Scenario

### Environment

In [None]:
# reversion features
features = ['r', 'dif']

In [None]:
# mean reversion
set_seeds(1000)
env = Simul(sym, features, window=20, lags=4, steps=365,
            x0=100, kappa=7.5, theta=100, sigma=0.1,
            leverage=1, min_accuracy=0.475, min_performance=0.85,
            start=0, end=None, mu=None, std=None,
            normalize=True, renew=False)

In [None]:
env.data.columns

In [None]:
env.reset()

In [None]:
env.data[sym].diff().apply(np.sign).value_counts()

In [None]:
math.exp(env.data['r'].sum())  # passive benchmark performance

In [None]:
ax = env.data['r'].cumsum().apply(np.exp).plot(figsize=(10, 6))
env.data[sym].plot(ax=ax, style='r--', alpha=0.5, secondary_y=sym);

### Training the Agent

In [None]:
set_seeds(100)
agent = FQLAgent(24, 0.001, env)

In [None]:
episodes = 250

In [None]:
%time agent.learn(episodes)

In [None]:
agent.epsilon

In [None]:
plt.figure(figsize=(10, 6))
x = range(1, len(agent.averages) + 1)
y = np.polyval(np.polyfit(x, agent.averages, deg=3), x)
plt.plot(agent.averages, label='moving average')
plt.plot(x, y, 'r--', label='regression')
plt.xlabel('episodes')
plt.ylabel('total reward')
plt.legend();

In [None]:
plt.figure(figsize=(10, 6))
x = range(1, len(agent.performances) + 1)
y = np.polyval(np.polyfit(x, agent.performances, deg=3), x)
plt.plot(agent.performances, label='moving average')
plt.plot(x, y, 'r--', label='regression')
plt.xlabel('episodes')
plt.ylabel('performance')
plt.legend();

### Testing the Agent 

In [None]:
rewards, accuracies, performances, env_ = agent.test(5)

In [None]:
rewards

In [None]:
accuracies

In [None]:
performances

In [None]:
ax = env.data[sym].plot(figsize=(10, 6))
env_.data[sym].plot(ax=ax);

In [None]:
np.corrcoef(env.data[sym], env_.data[sym])

<img src="https://certificate.tpq.io/quantsdev_banner_color.png" alt="quants@dev" width="35%" align="right" border="0"><br>

[quants@dev Discord Server](https://discord.gg/uJPtp9Awaj) | [@quants_dev](https://twitter.com/quants_dev) | <a href="mailto:qd@tpq.io">qd@tpq.io</a>