### Imports

In [1]:
import numpy as np
import pandas as pd
import gymnasium as gym
from gymnasium import spaces
import torch
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv, VecNormalize
from stable_baselines3.common.callbacks import CheckpointCallback, EvalCallback
import matplotlib.pyplot as plt
import random
import os

In [2]:
SEED = 42
np.random.seed(SEED)
random.seed(SEED)
torch.manual_seed(SEED)
os.environ["PYTHONHASHSEED"] = str(SEED)

### Create Environment

In [None]:
class TradingEnv(gym.Env):
    """
    Simple trading environment (long-only) using daily data.
    Actions: 0 = Hold (no change), 1 = Buy (go to long), 2 = Sell (go to flat/cash)
    Observation: window of last N days of features + current position (0 or 1)
    Reward: change in portfolio value (percentage) minus transaction cost when action changes position
    """

    metadata = {"render_modes": ["human"], "render_fps": 1}

    def __init__(
        self,
        df,                  # DataFrame with datetime index and feature columns
        feature_cols=None,   # list of columns used as features (will be normalized per-window)
        window_size=50,
        transaction_cost=0.001,  # e.g., 0.1% per trade
        initial_balance=1.0,
        max_positions=1,     # 1 means long-only single position
        reward_scaling=1.0,
        deterministic=True,
    ):
        super().__init__()
        self.df = df.copy()
        self.feature_cols = feature_cols or ["log_return", "SMA_short", "SMA_long", "volatility"]
        self.window_size = window_size
        self.transaction_cost = transaction_cost
        self.initial_balance = initial_balance
        self.max_positions = max_positions
        self.reward_scaling = reward_scaling
        self.deterministic = deterministic

        # derive start/end indices for episodes
        self.start_index = self.window_size
        self.end_index = len(self.df) - 1

        # action space: discrete 3 actions
        self.action_space = spaces.Discrete(3)

        # observation: window_size x n_features flattened + position flag
        self.n_features = len(self.feature_cols)
        obs_len = self.window_size * self.n_features + 1
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(obs_len,), dtype=np.float32)

        # internal state
        self.reset()

    def reset(self, seed=None, options=None):
        if seed is not None:
            np.random.seed(seed)

        # randomize start if not deterministic (useful for training)
        if self.deterministic:
            self.current_step = self.start_index
        else:
            self.current_step = np.random.randint(self.start_index, self.end_index - 1)

        # portfolio state
        self.position = 0  # 0 = flat, 1 = long
        self.cash = self.initial_balance
        self.position_price = 0.0  # entry price when long
        self.portfolio_value = self.initial_balance

        # return initial observation
        return self._get_obs(), {}

    def _get_obs(self):
        # get the window of features ending at current_step (inclusive)
        start = self.current_step - self.window_size + 1
        end = self.current_step + 1
        window = self.df.iloc[start:end][self.feature_cols].values  # shape (window_size, n_features)

        # normalize features by z-score along column to make learning easier
        # small epsilon to avoid divide-by-zero
        mean = window.mean(axis=0)
        std = window.std(axis=0) + 1e-9
        norm_window = (window - mean) / std

        # flatten and append current position
        obs = np.concatenate([norm_window.flatten(), np.array([self.position], dtype=np.float32)])
        return obs.astype(np.float32)

    def step(self, action):
        assert self.action_space.contains(action)
        done = False
        info = {}

        prev_portfolio = self.portfolio_value
        price = float(self.df["Close"].iloc[self.current_step])
        next_price = float(self.df["Close"].iloc[self.current_step + 1])

        # interpret action:
        # 0 = hold; 1 = buy; 2 = sell (go flat)
        prev_position = self.position
        if action == 1 and self.position == 0:
            # open long position using all capital -> position fraction 1.0
            self.position = 1
            # apply transaction cost based on trade notional (cash used to buy)
            cost = self.transaction_cost * self.portfolio_value
            self.position_price = price
            self.cash = self.portfolio_value - cost  # assume fully invested after cost
        elif action == 2 and self.position == 1:
            # close position -> convert to cash at current price (we'll use next step price for PnL)
            self.position = 0
            # we compute realized PnL at next step handling below
            # apply transaction cost when closing
            cost = self.transaction_cost * self.portfolio_value
            self.cash = self.portfolio_value - cost
            self.position_price = 0.0
        else:
            # hold or invalid trade (e.g., buy when already long) -> no change in position
            pass

        # Compute portfolio change from current to next price
        # If we are long during the interval, portfolio changes by return of price movement
        # Use next_price/price - 1 for simple return
        simple_return = (next_price - price) / price
        if self.position == 1:
            # invested capital grows by simple_return
            self.portfolio_value = self.cash * (1 + simple_return)
        else:
            # position is flat -> portfolio == cash (cash doesn't grow)
            self.portfolio_value = self.cash

        # reward = change in portfolio value (relative) scaled
        reward = (self.portfolio_value - prev_portfolio) / max(prev_portfolio, 1e-9)
        reward = reward * self.reward_scaling

        # advance step
        self.current_step += 1
        if self.current_step >= self.end_index:
            done = True

        # make obs
        obs = self._get_obs()
        return obs, float(reward), done, False, info  # gymnasium returns (obs, reward, done, truncated, info)

    def render(self, mode="human"):
        # simple printout for debugging
        print(f"Step: {self.current_step}, Position: {self.position}, Portfolio: {self.portfolio_value:.6f}")

    def close(self):
        pass
