In [15]:
import numpy as np
import pandas as pd
from tqdm import tqdm

The state space is constructed based on the trajectory of price movements over a recent sequence of time points. At any particular moment $t$, the state, denoted $S_t$, is encapsulated by the vector of length $l$
$$
S_t=\left[d_{t-l+1}, d_{t-l+2}, \ldots, d_t\right] \text {, }
$$
where each $d_i$ characterizes the direction and magnitude of price changes at time $i$. A positive value of di signifies a price increase relative to time $i$ − 1, and conversely, a negative value indicates a decline. It is further determined by a relative change of price $\pi_i$ by 
$$
d_{\mathrm{i}}= \begin{cases}+1 & \text { if } \pi_{\mathrm{i}}>k \\ +0 & \text { if } 0<\pi_{\mathrm{i}}<k \\ -1 & \text { if }-k<\pi_{\mathrm{i}}<0 \\ -2 & \text { if } \pi_{\mathrm{i}}<-k\end{cases}
$$
Therefore, +2 indicates a significant increase, while -1 a moderate decrease. $k$ is an adjustable sensitive level.    
In this way we construct a state space of $4^l$ states. 

In [None]:
def compute_relative_price_changes(price_data):
    relative_changes = np.diff(price_data) / price_data[:-1]
    return relative_changes

def get_state(price_changes, k=0.03):
    state = []
    for change in price_changes:
        if change > k:
            state.append(1)
        elif 0 < change <= k:
            state.append(0)
        elif -k <= change < 0:
            state.append(-1)
        else:
            state.append(-2)
    return tuple(state)

The action space is composed of three possible actions: selling one share is represented by −1, taking no action is denoted by 0, and buying one share is indicated by +1.    
If no position is held, selling action (-1) is not permissible, of course.

In [None]:
????/???????/???????/???????/???????/???????/???????/???????/???

The immediate reward, $R_{t+1}$, earned by the agent for taking action $A_t$ under prevailing environmental conditions, is mathematically defined as:
$$
R_{t+1}=A_t \cdot\left(\theta-X_t\right)-c \cdot\left|A_t\right|,
$$
where $X_t$ denotes the current price of the spread, and $\theta$ represents the true global mean of $X_t$, c is the transaction cost per trade.

In [None]:
def calculate_reward(action, current_price, mean_price, transaction_cost):
    return action * (mean_price - current_price) - transaction_cost * abs(action)


In [16]:
class QLearningAgent:
    def __init__(self, state_space_size, action_space_size, alpha=0.1, gamma=0.9, epsilon=0.1):
        self.q_table = np.zeros((state_space_size, action_space_size))
        self.alpha = alpha
        self.gamma = gamma
        self.epsilon = epsilon
        self.actions = [-1, 0, 1]  # sell, hold, buy

    def encode_state(self, state, base=4):
        state_int = 0
        for i, value in enumerate(state):
            state_int += (value + 2) * (base ** i)
        return state_int

    def choose_action(self, state):
        state_int = self.encode_state(state)
        if np.random.uniform(0, 1) < self.epsilon:
            return np.random.choice(self.actions)  # Exploration
        else:
            return self.actions[np.argmax(self.q_table[state_int])]  # Exploitation

    def update_q_table(self, state, action, reward, next_state):
        state_int = self.encode_state(state)
        next_state_int = self.encode_state(next_state)

        action_idx = self.actions.index(action)
        best_next_action = np.max(self.q_table[next_state_int])
        td_target = reward + self.gamma * best_next_action
        td_error = td_target - self.q_table[state_int, action_idx]
        self.q_table[state_int, action_idx] += self.alpha * td_error

# Training the Q-learning agent
def train_agent(price_data, mean_price, transaction_cost, episodes=1000, window_size=10):
    state_space_size = 4 ** window_size
    action_space_size = len([-1, 0, 1])
    agent = QLearningAgent(state_space_size, action_space_size)

    for episode in tqdm(range(episodes)):
        for t in range(len(price_data) - window_size - 1):
            price_window = price_data[t:t + window_size + 1]
            next_price_window = price_data[t + 1:t + window_size + 2]
            state = get_state(compute_relative_price_changes(price_window))
            next_state = get_state(compute_relative_price_changes(next_price_window))

            current_price = price_data[t + window_size - 1]
            action = agent.choose_action(state)
            reward = calculate_reward(action, current_price, mean_price, transaction_cost)

            agent.update_q_table(state, action, reward, next_state)

    return agent

# Example usage with dummy price data
price_data = np.random.randn(100)  # Replace with actual price data
mean_price = np.mean(price_data)
transaction_cost = 0.01
agent = train_agent(price_data, mean_price, transaction_cost)


100%|██████████| 1000/1000 [00:03<00:00, 292.29it/s]


An alternative choice is to model the Q-function as a neural network model. Below is an example. Notice we also simplify the reward function and do not limit action according to current position.

In [None]:
import gymnasium as gym
from gymnasium import spaces
import torch
import torch.nn as nn
import torch.optim as optim

# Define the trading environment
class TradingEnv(gym.Env):
    def __init__(self, prices, window_size):
        super(TradingEnv, self).__init__()
        self.prices = prices
        self.window_size = window_size
        self.n_features = window_size
        self.action_space = spaces.Discrete(3)  # 0: Sell, 1: Hold, 2: Buy
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(self.n_features,), dtype=np.float32)
        self.reset()

    def reset(self):
        self.current_step = self.window_size
        self.done = False
        return self._next_observation()

    def _next_observation(self):
        return self.prices[self.current_step - self.window_size:self.current_step]

    def step(self, action):
        reward = 0
        self.current_step += 1

        if self.current_step >= len(self.prices) - 150:
            self.done = True

        obs = self._next_observation()

        if action == 0:  # Sell
            reward = self._take_action(-1)
        elif action == 2:  # Buy
            reward = self._take_action(1)

        return obs, reward, self.done, {}

    def _take_action(self, action):
        prev_price = self.prices[self.current_step - 1]
        curr_price = self.prices[self.current_step]
        return action * (curr_price - prev_price)

# Define the Q-learning agent
class QLearningAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.model = self._build_model()
        self.criterion = nn.MSELoss()
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.001)

    def _build_model(self):
        model = nn.Sequential(
            nn.Linear(self.state_size, 24),
            nn.ReLU(),
            nn.Linear(24, 24),
            nn.ReLU(),
            nn.Linear(24, self.action_size)
        )
        return model

    def train(self, env, episodes=1000, gamma=0.95, epsilon=1.0, epsilon_decay=0.995, epsilon_min=0.01):
        for e in range(episodes):
            state = env.reset()
            state = torch.FloatTensor(state).unsqueeze(0)
            for time in range(450):
                if np.random.rand() <= epsilon:
                    action = np.random.choice(env.action_space.n)
                else:
                    q_values = self.model(state)
                    action = torch.argmax(q_values).item()

                next_state, reward, done, _ = env.step(action)
                next_state = torch.FloatTensor(next_state).unsqueeze(0)
                target = reward
                if not done:
                    target = reward + gamma * torch.max(self.model(next_state)).item()
                target_f = self.model(state)
                target_f[0][action] = target
                self.optimizer.zero_grad()
                loss = self.criterion(self.model(state), target_f)
                loss.backward()
                self.optimizer.step()
                state = next_state
                if done and e%10==0:
                    print(f"Episode: {e}/{episodes}, loss: {loss}, epsilon: {epsilon:.2}")
                    break
            if epsilon > epsilon_min:
                epsilon *= epsilon_decay

# Load data
def load_data(stock_symbol, start_date, end_date):
    df = pd.read_csv(f"{stock_symbol}.csv", index_col='Date', parse_dates=True)
    return df['Adj Close'].loc[start_date:end_date].values

# Training the model
if __name__ == "__main__":
    stock_symbol = 'AAPL'  # Example stock
    start_date = '2021-01-01'
    end_date = '2022-12-31'
    window_size = 10

    prices = load_data(stock_symbol, start_date, end_date)
    env = TradingEnv(prices, window_size)
    state_size = env.observation_space.shape[0]
    action_size = env.action_space.n
    agent = QLearningAgent(state_size, action_size)
    agent.train(env, episodes=200)
