In [None]:
import numpy as np
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')

### Synthetic Indexes Simulation - Geometric Brownian Motion (GBM)

In [None]:
s0 = 1000 # Initial price
mu = 0 # Drift zero
sigma = .75 # Volatility
T = 1/365 # 1 tick = 1 day
dt = T / 100 # Subdividing time into 100 steps per "day"
steps = int(T / dt)
num_simulations = 1 # Just a simulation for example

# Simulations
prices_ = np.zeros((steps, num_simulations))
prices_[0] = s0

for t in range(1, steps):
    Z = np.random.standard_normal(num_simulations) # N(0,1)
    prices_[t] = prices_[t -1] * np.exp((mu - .5 * sigma ** 2) * dt + sigma * np.sqrt(dt) * Z)

# Plot
plt.figure(figsize=(10,6))
plt.plot(prices_)
plt.title("Simulação de um Índice Sintético (Volatility 75)")
plt.xlabel("Ticks")
plt.ylabel("Preço")
plt.grid(True)
plt.show()

### Environment Like Gym

In [None]:
from enum import Enum

class PositionEnum(Enum):
    NEUTRAL: int = 0
    BUY: int = 1
    SELL: int = 2

class TradingEnv:
    def __init__(self, prices: list[int], window_size: int = 10, initial_balance: int = 1000):
        self.position: PositionEnum = PositionEnum.NEUTRAL
        self.entry_price: int = 0
        self.current_step: int = 0
        self.balance: int = 0
        self.prices: list[int] = prices
        self.window_size: int = window_size
        self.initial_balance: int = initial_balance
        self.reset()

    def reset(self):
        self.balance = self.initial_balance
        self.position = PositionEnum.NEUTRAL
        self.current_step = self.window_size
        self.entry_price = 0
        return self._get_state()

    def _get_state(self):
        window = self.prices[self.current_step - self.window_size:self.current_step]
        norm_window = ((window - np.mean(window)) / np.std(window) + 1e-7)
        state = np.concatenate((norm_window, [self.position.value]))
        return state

    def step(self, action: int):
        price: int = self.prices[self.current_step]
        done: bool = False
        reward: int = 0

        if action == PositionEnum.BUY.value:
            if self.position == PositionEnum.NEUTRAL:
                self.position = PositionEnum.BUY
                self.entry_price = price
        elif action == PositionEnum.SELL.value:
            if self.position == PositionEnum.NEUTRAL:
                self.position = PositionEnum.SELL
                self.entry_price = price
        else:
            pass

        # Calculate reward value
        if self.position == PositionEnum.BUY:
            reward = price - self.entry_price # positive if the closing value is greater than the purchase value
        elif self.position == PositionEnum.SELL:
            reward = self.entry_price - price # positive if the closing value is less than the selling value

        # next ticket
        self.current_step += 1
        if self.current_step >= len(self.prices) - 1:
            done = True

        next_stage = self._get_state()

        return next_stage, reward, done

### Creating Agent DQN (Deep Q-Learning) with Keras

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
import random
from collections import deque

class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=2000)
        self.gamma = .95 # Discount rate
        self.epsilon = 1.0 # Exploration rate
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.learning_rate = 0.001
        self.model = self._build_model()

    def _build_model(self):
        model = Sequential()
        model.add(Dense(64, input_dim=self.state_size, activation='relu'))
        model.add(Dense(64, activation='relu'))
        model.add(Dense(self.action_size, activation='linear'))
        model.compile(loss='mse', optimizer=Adam(learning_rate=self.learning_rate))
        return model

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        act_values = self.model.predict(state[np.newaxis, :], verbose=0)
        return np.argmax(act_values[0])

    def replay(self, batch_size):
        minibatch = random.sample(self.memory, batch_size)

        for state, action, reward, next_state, done in minibatch:
            target = reward
            if done:
                target += self.gamma * np.amax(self.model.predict(next_state[np.newaxis, :], verbose=0))
            target_f = self.model.predict(state[np.newaxis, :], verbose=0)
            target_f[0][action] = target
            self.model.fit(state[np.newaxis, :], target_f, epochs=1, verbose=1)

        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

### Training the Model

In [None]:
env = TradingEnv(prices_[:, 0])
state_size_ = env.window_size + 1
action_size_ = len(PositionEnum)

agent = DQNAgent(state_size_, action_size_)

episodes = 50
batch_size_ = 32

for e in range(episodes):
    state_ = env.reset()
    total_reward = 0

    while True:
        action_ = agent.act(state_)
        next_state_, reward_, done_ = env.step(action_)
        total_reward += reward_
        print(reward_)

        agent.remember(state_, action_, reward_, next_state_, done_)
        state_ = next_state_

        if done_:
            print(f"Episode: {e+1}/{episodes} - Total reward: {total_reward}")
            break

        if len(agent.memory) > batch_size_:
            agent.replay(batch_size_)
