In [1]:
import os
import random
import numpy as np
import pandas as pd

random.seed(100)
os.environ['PYTHONHASHSEED'] = '0'

class ActionSpace:
    def sample(self):
        return random.randint(0, 1)
    
action_space = ActionSpace()
print([action_space.sample() for _ in range(10)])

class Finance:
    url = 'https://certificate.tpq.io/rl4finance.csv'
    def __init__(self, symbol, feature, min_accuracy=0.485, n_features=4):
        self.symbol = symbol
        self.feature = feature
        self.n_features = n_features
        self.action_space = ActionSpace()
        self.min_accuracy = min_accuracy
        self._get_data()
        self._prepare_data()
    def _get_data(self):
        self.raw = pd.read_csv(self.url, index_col=0, parse_dates=True)

class Finance(Finance):
    def _prepare_data(self):
        self.data = pd.DataFrame(self.raw[self.symbol]).dropna()
        self.data['r'] = np.log(self.data / self.data.shift(1))
        self.data['d'] = np.where(self.data['r'] > 0, 1, 0)
        self.data.dropna(inplace=True)
        self.data_ = (self.data - self.data.mean()) / self.data.std()
    def reset(self):
        self.bar = self.n_features
        self.treward = 0
        state = self.data_[self.feature].iloc[self.bar - self.n_features:self.bar].values
        return state, {}
    
class Finance(Finance):
    def step(self, action):
        if action == self.data['d'].iloc[self.bar]:
            correct = True
        else:
            correct = False
        reward = 1 if correct else 0
        self.treward += reward
        self.bar += 1
        self.accuracy = self.treward / (self.bar - self.n_features)
        if self.bar >= len(self.data):
            done = True
        elif reward == 1:
            done = False
        elif (self.accuracy < self.min_accuracy) and (self.bar > 15):
            done = True
        else:
            done = False
        next_state = self.data_[self.feature].iloc[self.bar - self.n_features:self.bar].values
        return next_state, reward, done, False, {}
    
fin = Finance(symbol='EUR=', feature='EUR=')
print(list(fin.raw.columns))
fin.reset()
print(fin.action_space.sample())
print(fin.step(fin.action_space.sample()))

[0, 1, 1, 0, 1, 1, 1, 0, 0, 0]
['AAPL.O', 'MSFT.O', 'INTC.O', 'AMZN.O', 'GS.N', '.SPX', '.VIX', 'SPY', 'EUR=', 'XAU=', 'GDX', 'GLD']
1
(array([2.64643904, 2.69560062, 2.68085214, 2.63046153]), 0, False, False, {})


In [2]:
fin = Finance('EUR=', 'r')
fin.reset()

(array([-1.19130476, -1.21344494,  0.61099805, -0.16094865]), {})

In [None]:
class RandomAgent:
     def __init__(self):
        self.env = Finance('EUR=', 'r')
     def play(self, episodes=1):
        self.trewards = list()
        for e in range(episodes):
            self.env.reset()
            for step in range(1, 100):
                a = self.env.action_space.sample()
                state, reward, done, trunc, info = self.env.step(a)
                if done:
                    self.trewards.append(step)
                    break

ra = RandomAgent()
ra.play(15)
print(ra.trewards)
print(round(sum(ra.trewards) / len(ra.trewards), 2))
print(len(fin.data))

In [3]:
import os
import random
import warnings
import numpy as np
import tensorflow as tf
from tensorflow import keras
from collections import deque
from keras.layers import Dense
from keras.models import Sequential

warnings.simplefilter('ignore')
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

opt = keras.optimizers.Adam(learning_rate=0.0001)

class DQLAgent:
    def __init__(self, symbol, feature, min_accuracy, n_features=4):
        self.epsilon = 1.0
        self.epsilon_decay = 0.9975
        self.epsilon_min = 0.1
        self.memory = deque(maxlen=2000)
        self.batch_size = 32
        self.gamma = 0.5
        self.trewards = list()
        self.max_treward = 0
        self.n_features = n_features
        self._create_model()
        self.env = Finance(symbol, feature,
                     min_accuracy, n_features)
    def _create_model(self):
         self.model = Sequential()
         self.model.add(Dense(24, activation='relu',
                              input_dim=self.n_features))
         self.model.add(Dense(24, activation='relu'))
         self.model.add(Dense(2, activation='linear'))
         self.model.compile(loss='mse', optimizer=opt)
    def act(self, state):
        if random.random() < self.epsilon:
            return self.env.action_space.sample()
        return np.argmax(self.model.predict(state)[0])
    def replay(self):
        batch = random.sample(self.memory, self.batch_size)
        for state, action, next_state, reward, done in batch:
            if not done:
                reward += self.gamma * np.amax(
                    self.model.predict(next_state)[0])
            target = self.model.predict(state)
            target[0, action] = reward
            self.model.fit(state, target, epochs=1, verbose=False)
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
    def learn(self, episodes):
        for e in range(1, episodes + 1):
            state, _ = self.env.reset()
            state = np.reshape(state, [1, self.n_features])
            for f in range(1, 5000):
                action = self.act(state)
                next_state, reward, done, trunc, _ = \
                    self.env.step(action)
                next_state = np.reshape(next_state,
                                        [1, self.n_features])
                self.memory.append(
                    [state, action, next_state, reward, done])
                state = next_state
                if done:
                    self.trewards.append(f)
                    self.max_treward = max(self.max_treward, f)
                    templ = f'episode={e:4d} | treward={f:4d}'
                    templ += f' | max={self.max_treward:4d}'
                    print(templ, end='\r')
                    break
            if len(self.memory) > self.batch_size:
                self.replay()
        print()
    def test(self, episodes):
        ma = self.env.min_accuracy
        self.env.min_accuracy = 0.5
        for e in range(1, episodes + 1):
            state, _ = self.env.reset()
            state = np.reshape(state, [1, self.n_features])
            for f in range(1, 5001):
                action = np.argmax(self.model.predict(state)[0])
                state, reward, done, trunc, _ = self.env.step(action)
                state = np.reshape(state, [1, self.n_features])
                if done:
                    tmpl = f'total reward={f} | '
                    tmpl += f'accuracy={self.env.accuracy:.3f}'
                    print(tmpl)
                    break
        self.env.min_accuracy = ma

In [6]:
random.seed(250)
tf.random.set_seed(250)

agent = DQLAgent('EUR=', 'r', 0.495, 4)

%time agent.learn(250)

agent.test(5)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 93ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 40ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 16ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 15ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 16ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 14ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 16ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18