In [14]:
import gymnasium as gym
import numpy as np
from scipy.stats import norm
from stable_baselines3 import PPO

In [23]:
class HedgingEnv(gym.Env):
    def __init__(self, drift_unit_time, sigma_unit_time, max_dd_unit_init_prem=0.5, time_step=1/252, K = 0):
        super(HedgingEnv, self).__init__()
        self.action_space = gym.spaces.Box(low=-1, high=1, shape=(2,), dtype=np.float32)  # Actions: [s(t+1), l(t+1)]
        self.observation_space = gym.spaces.Box(low=-np.inf, high=np.inf, shape=(7,), dtype=np.float32)  # State
        self.mu = drift_unit_time  # Drift parameter
        self.sigma = sigma_unit_time # Volatility parameter
        self.dt = time_step  # Time step
        self.K = K
        self.stoploss_thresh = max_dd_unit_init_prem  # Stop-loss threshold
        # Initialize parameters
        self.reset()

    def reset(self, seed=None, **kwargs):
        # Initialize path and state
        self.p_t = 0  # Current price
        self.t = 0  # Current time step
        self.delta_pos = 0  # Hedge position
        self.delta_mismatch = 0  # Delta mismatch
        self.cummval_t = 0  # Cumulative reward
        self.max_cummval_t = 0  # Track maximum cumulative reward
        self.dd_t = 0  # Drawdown
        self.max_dd_t = 0  # Track minimum cumulative reward
        self.straddle_price_t = self.calculate_straddle_price()
        self.delta_t = self.calculate_delta()
        self.stoploss_thresh = self.stoploss_thresh * self.straddle_price_t
        
        obs = self.get_state()
        info = {"initial_price": self.straddle_price_t}
        print(obs, info, self.delta_t)
        return obs, info
    
        return obs, info
    
    def step(self, action):
        s_next, l_next = action  # Actions: boundaries for t+1
        s_next, l_next = min(s_next, l_next), max(s_next, l_next)  # Ensure s(t+1) < l(t+1)
        
        # Simulate next price step
        p_prev = self.p_t
        straddle_prev = self.straddle_price_t
        self.t += 1
        self.p_t = self.next_price()
        self.straddle_price_t = self.calculate_straddle_price()
        
        # Compute PnL and cumulative reward
        pnl = self.straddle_price_t - straddle_prev - self.delta_pos * (self.p_t - p_prev)
        self.cummval_t += pnl
        self.max_cummval_t = max(self.max_cummval_t, self.cummval_t)
        self.dd_t = self.max_cummval_t - self.cummval_t
        self.max_dd_t = max(self.max_dd_t, self.dd_t)
        
        # Calculate delta and adjust hedge
        self.delta_t = self.calculate_delta()
        if not (s_next <= self.delta_t <= l_next):
            self.delta_pos = self.delta_t
            self.delta_mismatch = 0
        else:
            self.delta_mismatch = self.delta_t - self.delta_pos
            
        # Check termination
        done = (self.t == 252) or (self.max_dd_t > self.stoploss_thresh)
        truncated = self.t >= 252
        reward = self.calculate_reward(done) if done else 0
        
        return self.get_state(), reward, done, truncated, {}

    def calculate_straddle_price(self):
        """
        Computes the price of a straddle in the Bachelier model.

        Returns:
        - float: Price of the straddle.
        """
        F = self.p_t
        t = (252 - self.t) / 252
        
        if t == 0:
            # At expiry, straddle price is intrinsic value
            return abs(F - self.K)
        
        # Compute d
        d = (F - self.K) / (self.sigma * np.sqrt(t))
        
        # Compute PDF of d
        phi_d = norm.pdf(d)
        
        # Compute straddle price
        return self.sigma * np.sqrt(t) * 2 * phi_d + abs(F - self.K) * norm.cdf(d)
    
    def calculate_delta(self):
        # Compute delta using Bachelier model
        F = self.p_t
        K = self.K
        t = (252 - self.t) / 252
        d = (F - K) / (self.sigma * np.sqrt(t))
        if t > 0:
            return 2 * norm.cdf(d) - 1
        else:
             # At expiry, delta is -1 if F < K, 1 if F > K, 0 if F == K
            return 1 if F > K else -1 if F < K else 0
        
    def calculate_pnl(self, p_prev):
        return self.calculate_straddle_price() - self.delta_pos * (self.p_t - p_prev)

    def calculate_reward(self, done):
        if done:
            print("cumulative reward of episode: ", self.cummval_t)
            print("terminating state: ", self.get_state(), self.straddle_price_t, self.delta_t)
            return self.cummval_t
        else:
            0

    def get_state(self):
        return np.array([
            self.delta_pos,
            self.delta_mismatch,
            self.cummval_t,
            self.max_dd_t,
            self.dd_t,
            self.t / 252,  # Normalize time
            self.p_t,
        ], dtype=np.float32)

    def next_price(self):
        Z = np.random.normal(0, 1)  # Random normal variable
        drift = (self.mu - 0.5 * self.sigma**2) * self.dt
        diffusion = self.sigma * np.sqrt(self.dt) * Z
        p_t_next = self.p_t * np.exp(drift + diffusion)  # Update price using GBM
        return p_t_next

In [24]:
env = HedgingEnv(drift_unit_time=1, sigma_unit_time=1, max_dd_unit_init_prem=0.2, time_step=1/252, K=0)
model = PPO("MlpPolicy", env, verbose=1)
model.learn(total_timesteps=10000)

[0. 0. 0. 0. 0. 0. 0.] {'initial_price': 0.7978845608028654} 0.0
Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
[0. 0. 0. 0. 0. 0. 0.] {'initial_price': 0.7978845608028654} 0.0
cumulative reward of episode:  -0.12919210382425783
terminating state:  [ 0.          0.         -0.1291921   0.1291921   0.1291921   0.29761904
  0.        ] 0.6686924569786076 0.0
[0. 0. 0. 0. 0. 0. 0.] {'initial_price': 0.7978845608028654} 0.0
cumulative reward of episode:  -0.10324986029358396
terminating state:  [ 0.          0.         -0.10324986  0.10324986  0.10324986  0.24206349
  0.        ] 0.6946347005092814 0.0
[0. 0. 0. 0. 0. 0. 0.] {'initial_price': 0.7978845608028654} 0.0
cumulative reward of episode:  -0.08176125130427192
terminating state:  [ 0.          0.         -0.08176125  0.08176125  0.08176125  0.19444445
  0.        ] 0.7161233094985935 0.0
[0. 0. 0. 0. 0. 0. 0.] {'initial_price': 0.7978845608028654} 0.0
cumulative reward of episode:  -0.0

<stable_baselines3.ppo.ppo.PPO at 0x16e65182170>

In [19]:
from stable_baselines3.common.evaluation import evaluate_policy

# Evaluate the model
mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=10, render=False)
print(f"Mean Reward: {mean_reward}, Std Reward: {std_reward}")

Mean Reward: -0.0015846779569983482, Std Reward: 0.0




In [20]:
obs, info = env.reset()
done = False

while not done:
    action, _ = model.predict(obs)
    obs, reward, done, truncated, info = env.step(action)
    env.render()  # Render the environment (if supported)


NotImplementedError: 

In [None]:
import numpy as np
from scipy.optimize import minimize
import matplotlib.pyplot as plt

def solve_pde(r, x_grid, dx, dt, T_max):
    """Solve the heat equation with absorption exactly at t = r(x)."""
    n_x = len(x_grid)
    absorption = np.zeros(n_x)
    u = np.zeros(n_x)
    center_index = n_x // 2
    u[center_index] = 1.0 / dx  # Dirac delta at x=0
    
    n_steps = int(T_max / dt)
    time_grid = np.arange(0, T_max + dt, dt)
    
    # Precompute absorption times for each x
    absorption_time = r.copy()
    
    for step in range(n_steps):
        t = step * dt
        
        # Absorb density where t == absorption_time[j] (within dt tolerance)
        for j in range(n_x):
            if abs(t - absorption_time[j]) < dt/2:
                absorption[j] += u[j] * dx
                u[j] = 0.0
        
        # Update using Crank-Nicolson scheme (stable for larger dt)
        alpha = dt / (2 * dx**2)
        A = np.eye(n_x) * (1 + 2*alpha) + np.eye(n_x, k=1)*(-alpha) + np.eye(n_x, k=-1)*(-alpha)
        B = np.eye(n_x) * (1 - 2*alpha) + np.eye(n_x, k=1)*(alpha) + np.eye(n_x, k=-1)*(alpha)
        u = np.linalg.solve(A, B @ u)
        
        # Apply Dirichlet boundary conditions
        u[0] = 0.0
        u[-1] = 0.0
    
    # Normalize absorption
    absorption_total = np.sum(absorption)
    if absorption_total > 0:
        absorption /= absorption_total
    return absorption

def loss(r, target, x_grid, dx, dt, T_max):
    """Compute loss between absorption and target distribution."""
    absorption = solve_pde(r, x_grid, dx, dt, T_max)
    return np.sum((absorption - target)**2)

# Parameters
n_x = 201  # Increased spatial resolution
x_max = 5.0
x_grid = np.linspace(-x_max, x_max, n_x)
dx = x_grid[1] - x_grid[0]
dt = 0.01 * dx**2  # Smaller dt for Crank-Nicolson
T_max = 20.0  # Extended time horizon

# Target distribution (standard normal)
target_pdf = np.exp(-0.5 * x_grid**2) / np.sqrt(2 * np.pi)
target_pdf /= np.sum(target_pdf)  # Discrete normalization

# Initial guess for r(x) (quadratic barrier)
r_initial = x_grid**2

# Optimize r(x) using L-BFGS-B with gradients (faster convergence)
result = minimize(loss, r_initial, args=(target_pdf, x_grid, dx, dt, T_max),
                  method='L-BFGS-B', jac='3-point', 
                  options={'maxiter': 100, 'disp': True})
r_optimized = result.x

# Plot results
plt.figure(figsize=(10, 6))
plt.plot(x_grid, r_optimized, label='Optimized Barrier')
plt.plot(x_grid, x_grid**2, '--', label='Initial Guess (Quadratic)')
plt.xlabel('x')
plt.ylabel('r(x)')
plt.legend()
plt.title('Root Barrier for Skorokhod Embedding')
plt.show()

# Compare absorbed distribution with target
absorption = solve_pde(r_optimized, x_grid, dx, dt, T_max)
plt.figure(figsize=(10, 6))
plt.plot(x_grid, absorption, label='Absorbed Distribution')
plt.plot(x_grid, target_pdf, '--', label='Target Distribution')
plt.xlabel('x')
plt.ylabel('Density')
plt.legend()
plt.title('Absorbed vs Target Distribution')
plt.show()