In [94]:

import gymnasium as gym  # ✅ Use gymnasium instead of gym
import numpy as np
import pandas as pd
from gymnasium import spaces
from copy import deepcopy
import py_vollib_vectorized

class SimEnv(gym.Env):
    """
    Custom Options Trading Environment for Reinforcement Learning.
    """
    def __init__(self, df):
        super(SimEnv, self).__init__()
        
        # Market Data
        self.df = df
        self.df_today = None
        
        # Index Tracking
        self.global_index = 0
        self.daily_index = 0
        self.start_index = 0
        self.max_steps = 62  # Max steps per episode

        # Trading Variables
        self.position = 0
        self.entry_price = 0
        self.position_open_time = None

        # Capital & PnL
        self.capital = 100
        self.pnl = 0
        self.position_value = 0

        # Episode State
        self.done = False
        self.current_row = None

        # Action & Observation Space
        self.action_space = spaces.Discrete(3)  # 0: Hold, 1: Open, 2: Close
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(11,), dtype=np.float32)

    def __init__(self, env_config):
        super(SimEnv, self).__init__()
        
        # ✅ Read `df` from `env_config`
        self.df = env_config.get("df")
        
        # ✅ Ensure df is provided
        if self.df is None:
            raise ValueError("Error: `df` must be provided in env_config!")
        
        # ✅ Initialize other attributes
        self.df_today = None
        self.global_index = 0
        self.max_steps = 62  # Max steps per episode
        self.capital=100

        self.position = 0
        self.entry_price = 0
        self.pnl = 0
        self.done = False
        self.current_row = None
        
        self.action_space = spaces.Discrete(3)  # 0: Hold, 1: Open, 2: Close
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(11,), dtype=np.float32)

    def reset(self, seed=None):
        """ Reset environment and start a new episode. """
        self.global_index = self.pick_episode_start()
        self.daily_index = self.df_today.index.get_loc(self.global_index)
        self.start_index = self.global_index
        self.done = False
        self.position = 0
        self.pnl = 0

        # Compute daily straddle prices before trading starts
        straddle_prices = self.compute_daily_atm_straddle_prices()
        self.df_today["daily_straddle_prices"] = straddle_prices
        self.df_today["open_straddle_pnl"] = 0

        obs = self._get_state()  # Observation (state)
        action_mask = self.compute_action_mask()  # ✅ Compute action mask for valid actions
        info = {"action_mask": action_mask}  # ✅ Include action mask in `info`

        return obs, info  # ✅ Must return a tuple (obs, info)

    def _get_state(self):
        """ Returns the current state as a NumPy array. """
        row = self.df.iloc[self.global_index]
        steps_taken = self.global_index - self.start_index
        steps_remaining = self.max_steps - steps_taken
        
        state = np.array([
            row['implied_spot'],  # Current spot price
            row['atm_vol'],  # ATM implied volatility
            row['scaled_slope'],  # Volatility skew slope
            row['scaled_quadratic'],  # Volatility skew curvature
            steps_taken,  # How many steps taken in this episode
            steps_remaining,  # Steps remaining before timeout
            self.position,  # Position status (0: no position, >0: position held)
            int(self.position>0),  # binary state to make it sumpler
            self.pnl,  # Cumulative PnL
            self.df_today["daily_straddle_prices"].loc[self.global_index],  # Current straddle price
            self.df_today["open_straddle_pnl"].loc[self.global_index]  # PnL from position
        ], dtype=np.float32)

        return state

    def compute_action_mask(self):
        """ Computes an action mask where invalid actions are marked as 0. """
        action_mask = np.array([1, 1, 1])  # Default: all actions allowed
        
        if self.position == 0:
            action_mask[2] = 0  # Can't close if no position is open
        else:
            action_mask[0] = 0  # Can't open a new position if one is already open
        
        return action_mask  # ✅ Masked actions for Rllib

    def step(self, action):
        """ Execute the selected action. """
        reward = 0.0
        allowed_actions = self.valid_actions()

        if action not in allowed_actions:
            reward = -1000  # Strong penalty for invalid action
            truncated = True
            self.done = True
            return self._get_state(), reward, self.done, truncated, {"action_mask": self.compute_action_mask()}  # ✅ Return action mask

        if action == 0 and self.position == 0:  # Open position
            self.open_position()
            self.update_time_step(60)

        elif action == 1 and self.position > 0:  # Close position
            reward = self.df_today["open_straddle_pnl"].loc[self.global_index] - self.pnl
            self.pnl = self.df_today["open_straddle_pnl"].loc[self.global_index]
            self.position = 0
            self.done = True

        elif action == 2:  # Hold position
            if self.position > 0:
                reward = self.df_today["open_straddle_pnl"].loc[self.global_index] - self.pnl
                self.pnl = self.df_today["open_straddle_pnl"].loc[self.global_index]
            
            self.update_time_step(1)

        # End episode if time exceeds max steps
        if self.global_index - self.start_index >= self.max_steps:
            self.done = True

        return self._get_state(), reward, self.done, False, {"action_mask": self.compute_action_mask()}  # ✅ Return action mask

    def valid_actions(self):
        if self.position == 0:
            return [0, 2]
        else:
            return [1, 2]

    def render(self, mode="human"):
        """ Optional: Print state information for debugging. """
        print(f"Time: {self.df.iloc[self.global_index]['minute']}, Position: {self.position}, PnL: {self.pnl}")

    def close(self):
        pass
    def open_position(self):

        ivol = self.get_current_row()['implied_spot']
        texp = self.get_current_row()['years_to_maturity']
        spot=self.get_current_row()['implied_spot']
        #straddle_price_1 = self.price_one_day_straddle(texp, ivol)
        straddle_price=self.df_today['daily_straddle_prices'].loc[self.global_index]
        #print(f"straddle_price={straddle_price}")
        #print(f"straddle_price_1={straddle_price_1}")
        if (straddle_price == 0):
            print(f"eror: straddle_price={straddle_price}. at time={self.get_current_time()}")
        self.position = self.capital / straddle_price
        #self.position_value = self.position * straddle_price
        self.strike=spot
        #spot_vols=self.compute_spot_vols(self.strike)
        self.straddle_prices=self.compute_straddle_prices(self.strike)
        self.df_today["open_straddle_prices"]=self.straddle_prices
        self.df_today["open_straddle_pnl"]=(self.df_today["open_straddle_prices"]- straddle_price)*self.position
        self.position_open_time = self.global_index
        return self.position*straddle_price


    def pick_random_day(self, burn_days=5):
        all_days = self.df['date'].unique()
        all_days = sorted(all_days)
        #print(f"all_days={all_days}")
        start_day = np.random.choice(all_days[burn_days:-1])
        return start_day

    def pick_random_timestep(self,df):
        all_times = df['minute'].apply(lambda x: x.time()).unique()
        all_times = sorted(all_times)
        latest_time = pd.Timestamp('12:45').time()
        earliest_time = pd.Timestamp('9:30').time()
        all_times = [x for x in all_times if x >= earliest_time and x <= latest_time]
        #print(f"all_times={all_times}")
        start_time = np.random.choice(all_times)
        return start_time

    def pick_episode_start(self):
        start_day = self.pick_random_day()
        self.df_today = self.df[self.df['date'] == start_day]
        self.df_today=deepcopy(self.df_today)
        start_time=self.pick_random_timestep(self.df_today)
        episode_start_index = self.df_today [(self.df_today['minute'].apply(lambda x: x.time()) == start_time)].index[0]
        #print(f"episode_start_index={episode_start_index}")
        
        #self.current_row = self.df.iloc[self.global_index]
        #self.df_today=self.select_todays_data()
        return episode_start_index
    
    

    
    def compute_spot_vols(self,strike):
        """
        Compute fitted volatilities for a range of strikes.
        
        Parameters:
            spot (float): Spot price.
            atm_vol (float): At-the-money
            slope (float): Slope of the linear term.
            quadratic_term (float): Coefficient of the quadratic term.
            texp_years (float): Time to expiration in years.    

        Returns:
            array-like: Fitted volatilities for a range of strikes.
        """
        spots=self.df_today['implied_spot']
        atm_vol=self.df_today['atm_vol']
        texp_years = self.df_today['years_to_maturity']
        slope=self.df_today['slope']
        quadratic_term=self.df_today['quadratic_term']
        #print(f"variable sizes: texp={texp_years.shape}, spot={spots.shape}, atm_vol={atm_vol.shape}, slope={slope.shape}, quadratic_term={quadratic_term.shape},strike={strike.shape}")
        vols = apply_quadratic_volatility_model(strike, spots, atm_vol, slope, quadratic_term, texp_years)
        #print(f"vols size={vols.shape}")
        return vols


    def compute_daily_atm_straddle_prices(self):
        """
        Compute straddle prices for a range of strikes.
        
        Parameters:
            spot (float): Spot price.
            atm_vol (float): At-the-money
            slope (float): Slope of the linear term.
            quadratic_term (float): Coefficient of the quadratic term.
            texp_years (float): Time to expiration in years.    

        Returns:
            array-like: Fitted volatilities for a range of strikes.
        """
        texp = self.df_today['years_to_maturity']
        spot = self.df_today['implied_spot']
        texp = self.df_today['years_to_maturity']
        vol=self.df_today['atm_vol']
        #print("variable sizes: ",texp.shape,spot.shape,vol.shape)
        straddle_prices = self.price_instrument('c', spot, spot, texp, vol) + self.price_instrument('p', spot, spot, texp, vol)

        return straddle_prices

    
    def compute_straddle_prices(self, strike):
        """
        Compute straddle prices for a range of strikes.
        
        Parameters:
            spot (float): Spot price.
            atm_vol (float): At-the-money
            slope (float): Slope of the linear term.
            quadratic_term (float): Coefficient of the quadratic term.
            texp_years (float): Time to expiration in years.    

        Returns:
            array-like: Fitted volatilities for a range of strikes.
        """
    
        texp = self.df_today['years_to_maturity']
        spot = self.df_today['implied_spot']
        vols=self.compute_spot_vols(strike)
        #print(f"variable sizes: texp={texp.shape}, spot={spot.shape}, vols={vols.shape}")
        #vols=apply_apply_quadratic_volatility_model(strike, spot, atm_vols, slopes, quadratic_terms, texp)
        straddle_prices = self.price_instrument('c', strike, spot, texp, vols) + self.price_instrument('p', strike, spot, texp, vols) 
        #print(f"straddle_prices={straddle_prices}")

        df_output=pd.DataFrame()
    
        df_output["spot"]=spot
        df_output["texp"]=texp
        df_output["vols"]=vols
        df_output["strike"]=strike
        df_output["straddle_prices"]=straddle_prices
        df_output.to_csv("straddle_prices.csv")

        return straddle_prices

    
    def update_time_step(self, minutes=1):
        self.global_index = min(self.global_index + minutes, self.df_today.index.max())

    
    def price_instrument(self, cp, strike, spot, texp, vol):
        #if self.debug:
        #    print(f"cp={cp}\n, strike={strike}\n, spot={spot}\n, texp={texp}\n, vol={vol}\n")
        #print(f"pricing_insturment sizes: cp={cp}, strike={strike.shape}, spot={spot.shape}, texp={texp.shape}, vol={vol.shape}")
        return py_vollib_vectorized.models.vectorized_black_scholes(cp, spot, strike, texp, 0, vol,return_as="numpy")

    
    def get_current_time(self):
        return self.df.iloc[self.global_index]['minute']
    

    def get_current_row(self):
        return self.df.iloc[self.global_index]

In [13]:

def apply_quadratic_volatility_model(strikes, spot, atm_vol, slope, quadratic_term, texp_years):
    """
    Apply the quadratic volatility model to new data points.
    
    Parameters:
        strikes (array-like): Array of strike prices.
        spot (float): Spot price.
        atm_vol (float): At-the-money volatility.
        slope (float): Slope of the linear term.
        quadratic_term (float): Coefficient of the quadratic term.
        texp_years (float): Time to expiration in years.
    
    Returns:
        array-like: Fitted volatilities for the given strikes.
    """
    #print(f"apply_quadratic_vol input sizes: strikes={strikes}, spot={len(spot)}, atm_vol={len(atm_vol)}, slope={len(slope)}, quadratic_term={len(quadratic_term)}, texp_years={len(texp_years)}")
    log_strikes = np.log(strikes) - np.log(spot)
    fitted_vols = atm_vol + slope * log_strikes + quadratic_term * log_strikes**2
    #fitted_vols = atm_vol + (slope / np.sqrt(texp_years)) * log_strikes + quadratic_term * log_strikes**2
    fitted_vols= np.clip(fitted_vols, .05,.4)
    return fitted_vols


In [40]:

df=pd.read_csv("./algo_data/vol_surfaces2.csv")
df['minute'] = pd.to_datetime(df['minute'])
df['minute'].apply(lambda x: x.tz).unique()
#for each row find 16:17:00 and compute years to maturity where maturity is 16:17:00 for each row

def get_years_to_maturity(row):
    maturity = pd.Timestamp(row['minute'].date(), tz=row['minute'].tz) + pd.Timedelta(hours=16, minutes=17)
    return (maturity - row['minute']).seconds / (365.25 * 24 * 60 * 60)

df['years_to_maturity'] = df.apply(get_years_to_maturity, axis=1)
df.loc[df['implied_spot'] <= .07, ['implied_spot', 'atm_vol', 'slope', 'quadratic_term', 'scaled_slope', 'scaled_quadratic']] = np.nan
df.loc[df['atm_vol'] <= .03, ['implied_spot', 'atm_vol', 'slope', 'quadratic_term', 'scaled_slope', 'scaled_quadratic']] = np.nan
# Forward fill the NaN values
df=df.ffill().infer_objects(copy=False)

  df['minute'] = pd.to_datetime(df['minute'])


In [54]:
env=SimEnv({"df":df})
env.reset()
for i in range(0,70):
    action=env.action_space.sample()
    (obs, reward, done, truncate,info) = env.step(action)
    print(f"action={action}, obs={obs}, reward={reward}, done={done}, info={info}")
    if done:
        break

action=0, obs=[ 5.7182129e+02  1.8609144e-01 -1.4202131e-02  3.0216920e-01
  6.0000000e+01  2.0000000e+00  5.0383991e+01  1.0000000e+00
  0.0000000e+00  1.7754730e+00 -1.0060093e+01], reward=0.0, done=False, info={'action_mask': array([0, 1, 1])}
action=1, obs=[ 5.7182129e+02  1.8609144e-01 -1.4202131e-02  3.0216920e-01
  6.0000000e+01  2.0000000e+00  0.0000000e+00  0.0000000e+00
 -1.0060093e+01  1.7754730e+00 -1.0060093e+01], reward=-10.060092453241982, done=True, info={'action_mask': array([1, 1, 0])}


In [59]:
import ray
from ray.rllib.algorithms.dqn import DQNConfig

# Initialize Ray
ray.init(ignore_reinit_error=True)




2025-03-12 16:39:08,591	INFO worker.py:1672 -- Calling ray.init() again after it has already been called.


0,1
Python version:,3.11.11
Ray version:,2.43.0


In [93]:
config = DQNConfig()  # ✅ Ensure config is initialized correctly
"""
config = (
    DQNConfig()
    .training(
        gamma=0.99,  # Discount factor
        lr=0.001,  # Learning rate
        train_batch_size=1000  # ✅ Process 1000 timesteps per training step
    )
    .rollouts(
        #num_env_runners=2,  # ✅ Run 2 environments in parallel (was `num_rollout_workers`)
        rollout_fragment_length=500  # ✅ Collect 500 steps per worker before training
    )
    .environment(
        env=SimEnv,  # ✅ Use your custom environment
        env_config={"df": df}  # ✅ Pass `df` inside env_config
    )
    .resources(
        num_gpus=0  # Set to 1 if using GPU
    )
    .env_runners(
        num_env_runners=1  # ✅ Replaces deprecated `num_rollout_workers`
    )
    .debugging(
        log_level="INFO",  # ✅ Enables logging for TensorBoard
        logger_config={
            "type": "ray.tune.logger.TBXLogger"  # ✅ Enables TensorBoard logs
        }
    )
    
)
"""

config = (
    DQNConfig()
    .training(
        gamma=0.99,  # ✅ Discount factor
        lr=0.001,  # ✅ Learning rate
        train_batch_size=1000  # ✅ Process 1000 timesteps per training step
    )
    .rollouts(
        num_rollout_workers=1,  # ✅ Set number of parallel environments
        rollout_fragment_length=500  # ✅ Collect 500 steps per worker before training
    )
    .environment(
        env=SimEnv,  # ✅ Use your custom environment
        env_config={"df": df}  # ✅ Pass dataset into the environment
    )
    .resources(
        num_gpus=0  # ✅ Set to 1 if using GPU
    )
    .debugging(
        log_level="INFO",  # ✅ Enables logging for TensorBoard
        logger_config={
            "type": "ray.tune.logger.TBXLogger"  # ✅ Enables TensorBoard logs
        }
    )
)


dqn_agent = config.build_algo()

AttributeError: 'NoneType' object has no attribute 'environment'

In [70]:
results=[]
# Train the agent for a number of iterations
result = dqn_agent.train()
results.append(result)
print(f"Iteration {i}: Reward = {result['episode_reward_mean']}")

2025-03-12 17:09:50,816	ERROR actor_manager.py:833 -- Ray error (The actor ff58d755492f103acdef39f101000000 is unavailable: The actor is temporarily unavailable: IOError: The actor is restarting.. The task may or maynot have been executed on the actor.), taking actor 1 out of service.
2025-03-12 17:09:50,817	ERROR actor_manager.py:651 -- The actor ff58d755492f103acdef39f101000000 is unavailable: The actor is temporarily unavailable: IOError: The actor is restarting.. The task may or maynot have been executed on the actor.
NoneType: None
2025-03-12 17:10:54,899	ERROR actor_manager.py:833 -- Ray error (The actor ff58d755492f103acdef39f101000000 is unavailable: The actor is temporarily unavailable: IOError: The actor is restarting.. The task may or maynot have been executed on the actor.), taking actor 1 out of service.
2025-03-12 17:10:54,899	ERROR actor_manager.py:651 -- The actor ff58d755492f103acdef39f101000000 is unavailable: The actor is temporarily unavailable: IOError: The actor i

KeyboardInterrupt: 