# Trading Environment for Stock Trading

Gymnasium environment for RL-based stock trading with buy/sell/hold actions.


In [5]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import pandas as pd
from pathlib import Path
from typing import Optional, Tuple, Dict


## TradingEnv Class


In [6]:
class TradingEnv(gym.Env):
    """
    Trading environment for single stock trading.
    
    Actions: 0=Hold, 1=Buy (all-in), 2=Sell (all-out)
    Reward: Sharpe ratio (return / risk)
    """
    
    metadata = {"render_modes": ["human"], "render_fps": 4}
    
    def __init__(
        self,
        data_path: str,
        initial_balance: float = 10000.0,
        lookback_window: int = 30,
        render_mode: Optional[str] = None
    ):
        super().__init__()
        
        self.initial_balance = initial_balance
        self.lookback_window = lookback_window
        self.render_mode = render_mode
        
        # Load data
        self.data = self._load_data(data_path)
        self.data_length = len(self.data)
        
        # Trading state
        self.current_step = 0
        self.balance = initial_balance
        self.shares = 0
        self.position = 0  # 0: no position, 1: long position
        self.entry_price = 0.0
        
        # Track returns for Sharpe ratio
        self.returns_history = []
        self.portfolio_values = []
        
        # Action space: 0=hold, 1=buy, 2=sell
        self.action_space = spaces.Discrete(3)
        
        # Observation space: normalized prices, volume, returns, position info
        n_features = 8
        self.observation_space = spaces.Box(
            low=-np.inf,
            high=np.inf,
            shape=(lookback_window, n_features),
            dtype=np.float32
        )
        
        # Normalization parameters
        self.price_mean = None
        self.price_std = None
        self.volume_mean = None
        self.volume_std = None
        
        self._compute_normalization_params()
    
    def _load_data(self, data_path: str) -> pd.DataFrame:
        """Load and preprocess stock data."""
        df = pd.read_csv(data_path, sep=',')
        df['Date'] = pd.to_datetime(df['Date'])
        df = df.sort_values('Date').reset_index(drop=True)
        df['Returns'] = df['Close'].pct_change().fillna(0)
        return df
    
    def _compute_normalization_params(self):
        """Compute normalization parameters from data."""
        self.price_mean = self.data[['Close', 'Open', 'High', 'Low']].mean().mean()
        self.price_std = self.data[['Close', 'Open', 'High', 'Low']].std().mean()
        self.volume_mean = self.data['Volume'].mean()
        self.volume_std = self.data['Volume'].std()
    
    def reset(
        self,
        seed: Optional[int] = None,
        options: Optional[dict] = None
    ) -> Tuple[np.ndarray, Dict]:
        """Reset environment to initial state."""
        super().reset(seed=seed)
        
        # Start from a random point
        self.current_step = np.random.randint(
            self.lookback_window,
            self.data_length - 100
        )
        
        # Reset trading state
        self.balance = self.initial_balance
        self.shares = 0
        self.position = 0
        self.entry_price = 0.0
        self.returns_history = []
        self.portfolio_values = [self.initial_balance]
        self._recent_actions = []
        
        observation = self._get_observation()
        info = self._get_info()
        
        return observation, info
    
    def step(self, action: int) -> Tuple[np.ndarray, float, bool, bool, Dict]:
        """Execute one step in the environment."""
        self._execute_action(action)
        self.current_step += 1
        
        terminated = self.current_step >= self.data_length - 1
        truncated = False
        
        reward = self._calculate_reward()
        observation = self._get_observation()
        info = self._get_info()
        
        return observation, reward, terminated, truncated, info
    
    def _execute_action(self, action: int):
        """Execute trading action."""
        current_price = self.data.iloc[self.current_step]['Close']
        
        if action == 1:  # Buy
            if self.position == 0:
                self.shares = self.balance / current_price
                self.balance = 0
                self.position = 1
                self.entry_price = current_price
                
        elif action == 2:  # Sell
            if self.position == 1:
                self.balance = self.shares * current_price
                self.shares = 0
                self.position = 0
                self.entry_price = 0.0
        
        # Track recent actions
        if not hasattr(self, '_recent_actions'):
            self._recent_actions = []
        self._recent_actions.append(action)
        if len(self._recent_actions) > 10:
            self._recent_actions.pop(0)
    
    def _get_portfolio_value(self) -> float:
        """Calculate current portfolio value."""
        if self.current_step >= len(self.data):
            current_price = self.data.iloc[-1]['Close']
        else:
            current_price = self.data.iloc[self.current_step]['Close']
        return self.balance + self.shares * current_price
    
    def _calculate_reward(self) -> float:
        """Calculate reward based on Sharpe ratio."""
        current_value = self._get_portfolio_value()
        self.portfolio_values.append(current_value)
        
        # Calculate returns
        if len(self.portfolio_values) > 1:
            daily_return = (current_value - self.portfolio_values[-2]) / self.portfolio_values[-2]
            self.returns_history.append(daily_return)
        
        # Early reward
        if len(self.returns_history) < 2:
            if len(self.returns_history) == 1:
                return self.returns_history[0] * 10
            return 0.0
        
        returns_array = np.array(self.returns_history)
        mean_return = np.mean(returns_array)
        std_return = np.std(returns_array)
        
        if std_return < 1e-8:
            return mean_return * 100
        
        # Sharpe ratio
        sharpe_ratio = mean_return / std_return
        recent_return = self.returns_history[-1] if self.returns_history else 0.0
        
        # Weighted reward: 70% Sharpe ratio, 30% recent return
        reward = (sharpe_ratio * 0.7 + recent_return * 10 * 0.3) * 0.1
        
        # Penalty for holding cash too long
        if self.position == 0 and len(self.returns_history) > 10:
            recent_actions = getattr(self, '_recent_actions', [])
            if len(recent_actions) > 5 and all(a == 0 for a in recent_actions[-5:]):
                reward -= 0.001
        
        return reward
    
    def _get_observation(self) -> np.ndarray:
        """Get current observation."""
        start_idx = max(0, self.current_step - self.lookback_window + 1)
        end_idx = self.current_step + 1
        
        window_data = self.data.iloc[start_idx:end_idx].copy()
        
        # Pad if necessary
        if len(window_data) < self.lookback_window:
            padding = self.lookback_window - len(window_data)
            first_row = window_data.iloc[0:1]
            padding_data = pd.concat([first_row] * padding, ignore_index=True)
            window_data = pd.concat([padding_data, window_data], ignore_index=True)
        
        # Normalize features
        normalized_close = (window_data['Close'] - self.price_mean) / (self.price_std + 1e-8)
        normalized_open = (window_data['Open'] - self.price_mean) / (self.price_std + 1e-8)
        normalized_high = (window_data['High'] - self.price_mean) / (self.price_std + 1e-8)
        normalized_low = (window_data['Low'] - self.price_mean) / (self.price_std + 1e-8)
        normalized_volume = (window_data['Volume'] - self.volume_mean) / (self.volume_std + 1e-8)
        returns = window_data['Returns'].values
        position_indicator = np.full(self.lookback_window, self.position)
        cash_ratio = (self.balance / self.initial_balance) * np.ones(self.lookback_window)
        
        # Stack features
        observation = np.column_stack([
            normalized_close.values,
            normalized_open.values,
            normalized_high.values,
            normalized_low.values,
            normalized_volume.values,
            returns,
            position_indicator,
            cash_ratio
        ]).astype(np.float32)
        
        return observation
    
    def _get_info(self) -> Dict:
        """Get additional information about current state."""
        return {
            "step": self.current_step,
            "balance": self.balance,
            "shares": self.shares,
            "position": self.position,
            "portfolio_value": self._get_portfolio_value(),
            "current_price": self.data.iloc[self.current_step]['Close']
        }
    
    def render(self):
        """Render environment."""
        if self.render_mode == "human":
            info = self._get_info()
            print(f"Step: {info['step']}, "
                  f"Portfolio: ${info['portfolio_value']:.2f}, "
                  f"Position: {info['position']}, "
                  f"Price: ${info['current_price']:.2f}")

print("TradingEnv class defined")


TradingEnv class defined


## Test Environment


In [7]:
# Find a data file
data_dir = Path('data')
data_files = list(data_dir.rglob('*.txt'))

if not data_files:
    print("No data files found!")
else:
    test_file = data_files[0]
    print(f"Using: {test_file.name}")
    
    # Create environment
    env = TradingEnv(
        data_path=str(test_file),
        initial_balance=10000.0,
        lookback_window=30,
        render_mode="human"
    )
    
    print(f"\nEnvironment created:")
    print(f"  Action space: {env.action_space}")
    print(f"  Observation space: {env.observation_space.shape}")
    print(f"  Data length: {env.data_length}")
    
    # Test reset
    observation, info = env.reset()
    print(f"\nReset:")
    print(f"  Observation shape: {observation.shape}")
    print(f"  Step: {info['step']}, Portfolio: ${info['portfolio_value']:.2f}")
    
    # Test actions
    print(f"\nTesting actions:")
    
    # Buy
    observation, reward, _, _, info = env.step(1)
    print(f"  After BUY: Portfolio=${info['portfolio_value']:.2f}, Position={info['position']}, Shares={info['shares']:.2f}")
    
    # Hold
    observation, reward, _, _, info = env.step(0)
    print(f"  After HOLD: Portfolio=${info['portfolio_value']:.2f}, Position={info['position']}")
    
    # Sell
    observation, reward, _, _, info = env.step(2)
    print(f"  After SELL: Portfolio=${info['portfolio_value']:.2f}, Position={info['position']}, Balance=${info['balance']:.2f}")
    
    print("\nEnvironment test completed!")


Using: iba.us.txt

Environment created:
  Action space: Discrete(3)
  Observation space: (30, 8)
  Data length: 3199

Reset:
  Observation shape: (30, 8)
  Step: 483, Portfolio: $10000.00

Testing actions:
  After BUY: Portfolio=$9816.75, Position=1, Shares=424.20
  After HOLD: Portfolio=$9899.47, Position=1
  After SELL: Portfolio=$9899.47, Position=0, Balance=$9899.47

Environment test completed!


## Example: Random Agent


In [8]:
# Create environment
if 'env' not in locals():
    data_file = data_files[0]
    env = TradingEnv(
        data_path=str(data_file),
        initial_balance=10000.0,
        lookback_window=30
    )

# Run random agent
observation, info = env.reset()
total_reward = 0

print("Running random agent for 200 steps...")
print(f"After reset: Portfolio=${info['portfolio_value']:.2f}, Position={info['position']}\n")

for step in range(200):
    action = env.action_space.sample()
    observation, reward, terminated, truncated, info = env.step(action)
    total_reward += reward
    
    if step % 50 == 0:
        print(f"Step {step}: Portfolio=${info['portfolio_value']:.2f}, "
              f"Position={info['position']}, Reward={reward:.4f}")
    
    if terminated or truncated:
        print(f"\nEpisode ended at step {step}")
        break

print(f"\nResults:")
print(f"  Initial balance: ${env.initial_balance:.2f}")
print(f"  Final portfolio value: ${info['portfolio_value']:.2f}")
print(f"  Total return: {(info['portfolio_value'] / env.initial_balance - 1) * 100:.2f}%")
print(f"  Total reward: {total_reward:.4f}")


Running random agent for 200 steps...
After reset: Portfolio=$10000.00, Position=0

Step 0: Portfolio=$10000.00, Position=0, Reward=0.0000
Step 50: Portfolio=$9642.79, Position=0, Reward=-0.0045
Step 100: Portfolio=$8609.11, Position=0, Reward=-0.0086
Step 150: Portfolio=$4902.87, Position=1, Reward=-0.0153

Results:
  Initial balance: $10000.00
  Final portfolio value: $5748.23
  Total return: -42.52%
  Total reward: -1.9015
