In [36]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from typing import Optional, Callable
from dataclasses import dataclass

In [37]:
@dataclass
class JumpDiffusionParams:
    S0: float
    r: float
    sigma: float
    lambda_j: float
    mu_j: float
    sigma_j: float
    steps_per_year: int = 252
    
    @property
    def dt(self) -> float:
        return 1.0 / self.steps_per_year
    
    @property
    def jump_compensator(self) -> float:
        return np.exp(self.mu_j + 0.5 * self.sigma_j ** 2) - 1.0


class JumpDiffusionSimulator:
    """Efficient Merton jump-diffusion simulator with correct jump aggregation."""
    
    def __init__(self, params: JumpDiffusionParams):
        self.params = params
        
    def simulate_path(self, rng: np.random.Generator) -> np.ndarray:
        """Simulate one jump-diffusion path with correct jump aggregation."""
        p = self.params
        n_steps = p.steps_per_year
        
        # Brownian motion
        Z = rng.standard_normal(n_steps)
        
        # Jump counts per timestep
        N = rng.poisson(p.lambda_j * p.dt, size=n_steps)
        
        # CORRECT: Sum of N jumps ~ Normal(N*mu_j, sqrt(N)*sigma_j)
        jump_means = N * p.mu_j
        jump_stds = np.sqrt(N) * p.sigma_j
        jumps = rng.normal(loc=jump_means, scale=jump_stds)
        
        # Log returns
        drift = (p.r - p.lambda_j * p.jump_compensator - 0.5 * p.sigma ** 2) * p.dt
        diffusion = p.sigma * np.sqrt(p.dt) * Z
        log_returns = drift + diffusion + jumps
        
        # Convert to prices
        return p.S0 * np.exp(np.cumsum(log_returns))
    
    def generate_paths_with_sma(
        self,
        n_paths: int,
        short_window: int,
        long_window: int,
        seed: Optional[int] = None
    ) -> list:
        """Generate multiple paths with SMAs computed."""
        rng = np.random.default_rng(seed)
        child_seeds = rng.integers(0, 2**31 - 1, size=n_paths)
        dates = pd.date_range("2024-01-01", periods=self.params.steps_per_year + 1, freq="B")
        
        sma_dfs = []
        for i in range(n_paths):
            child_rng = np.random.default_rng(int(child_seeds[i]))
            future_prices = self.simulate_path(child_rng)
            prices = np.concatenate([[self.params.S0], future_prices])
            price_series = pd.Series(prices, index=dates)
            
            df = pd.DataFrame({
                "Price": price_series,
                f"SMA_{short_window}": price_series.rolling(short_window, min_periods=1).mean(),
                f"SMA_{long_window}": price_series.rolling(long_window, min_periods=1).mean()
            })
            sma_dfs.append(df)
        
        return sma_dfs


class DMACOPricer:
    """Pricer for Digital Moving Average Crossover Option."""
    
    @staticmethod
    def compute_price(
        sma_dfs: list,
        stopping_time: int,
        r: float,
        steps_per_year: int,
        short_window: int,
        long_window: int
    ) -> float:
        """Compute DMACO option price."""
        n_paths = len(sma_dfs)
        payoffs = np.zeros(n_paths)
        
        short_key = f"SMA_{short_window}"
        long_key = f"SMA_{long_window}"
        
        for i, df in enumerate(sma_dfs):
            short_sma = df[short_key].to_numpy()
            long_sma = df[long_key].to_numpy()
            
            diff = short_sma - long_sma
            prev_diff = np.concatenate([[diff[0]], diff[:-1]])
            crossovers = ((prev_diff < 0) & (diff > 0)) | ((prev_diff > 0) & (diff < 0))
            
            last_idx = min(stopping_time, len(crossovers) - 1)
            if np.any(crossovers[:last_idx + 1]):
                first_cross = np.argmax(crossovers[:last_idx + 1])
                t_years = first_cross / steps_per_year
                payoffs[i] = np.exp(-r * t_years)
        
        return float(payoffs.mean())


class GreeksCalculator:
    """Comprehensive Greeks calculator for DMACO options."""
    
    def __init__(
        self,
        base_params: JumpDiffusionParams,
        n_paths: int,
        short_window: int,
        long_window: int,
        seed: int = 1
    ):
        self.base_params = base_params
        self.n_paths = n_paths
        self.short_window = short_window
        self.long_window = long_window
        self.seed = seed
    
    def compute_sensitivity(
        self,
        param_name: str,
        param_values: np.ndarray,
        stopping_times: np.ndarray,
        greek_name: str
    ) -> tuple:
        """
        Compute price sensitivity to a parameter.
        
        Returns: (price_matrix, results_df)
        """
        price_matrix = np.zeros((len(param_values), len(stopping_times)))
        results_list = []
        
        total = len(param_values) * len(stopping_times)
        print(f"\n{greek_name}: Testing {len(param_values)} values × {len(stopping_times)} stopping times = {total} simulations")
        
        for i, param_val in enumerate(param_values):
            for j, stop_time in enumerate(stopping_times):
                # Create modified parameters
                params = JumpDiffusionParams(
                    S0=self.base_params.S0,
                    r=self.base_params.r,
                    sigma=self.base_params.sigma,
                    lambda_j=self.base_params.lambda_j,
                    mu_j=self.base_params.mu_j,
                    sigma_j=self.base_params.sigma_j,
                    steps_per_year=self.base_params.steps_per_year
                )
                setattr(params, param_name, param_val)
                
                # Simulate and price
                simulator = JumpDiffusionSimulator(params)
                sma_dfs = simulator.generate_paths_with_sma(
                    self.n_paths, self.short_window, self.long_window, self.seed
                )
                
                price = DMACOPricer.compute_price(
                    sma_dfs, int(stop_time), params.r,
                    params.steps_per_year, self.short_window, self.long_window
                )
                
                price_matrix[i, j] = price
                results_list.append({
                    'parameter': param_name,
                    'value': param_val,
                    'stopping_time': stop_time,
                    'price': price
                })
            
            progress = ((i + 1) / len(param_values)) * 100
            print(f"  Progress: {progress:.0f}%")
        
        results_df = pd.DataFrame(results_list)
        return price_matrix, results_df
    
    def compute_window_sensitivity(
        self,
        window_type: str,
        window_values: np.ndarray,
        stopping_times: np.ndarray
    ) -> tuple:
        """Compute sensitivity to SMA window sizes."""
        price_matrix = np.zeros((len(window_values), len(stopping_times)))
        results_list = []
        
        total = len(window_values) * len(stopping_times)
        print(f"\n{window_type.capitalize()} Window: Testing {len(window_values)} values × {len(stopping_times)} stopping times = {total} simulations")
        
        for i, window_val in enumerate(window_values):
            for j, stop_time in enumerate(stopping_times):
                if window_type == 'short':
                    short_w, long_w = int(window_val), self.long_window
                else:
                    short_w, long_w = self.short_window, int(window_val)
                
                simulator = JumpDiffusionSimulator(self.base_params)
                sma_dfs = simulator.generate_paths_with_sma(
                    self.n_paths, short_w, long_w, self.seed
                )
                
                price = DMACOPricer.compute_price(
                    sma_dfs, int(stop_time), self.base_params.r,
                    self.base_params.steps_per_year, short_w, long_w
                )
                
                price_matrix[i, j] = price
                results_list.append({
                    'parameter': f'{window_type}_window',
                    'value': window_val,
                    'stopping_time': stop_time,
                    'price': price
                })
            
            progress = ((i + 1) / len(window_values)) * 100
            print(f"  Progress: {progress:.0f}%")
        
        results_df = pd.DataFrame(results_list)
        return price_matrix, results_df


def plot_greek(
    price_matrix: np.ndarray,
    param_values: np.ndarray,
    stopping_times: np.ndarray,
    param_name: str,
    greek_name: str,
    save_path: str
):
    """Create line plot for a Greek."""
    plt.figure(figsize=(12, 7))
    colors = plt.cm.viridis(np.linspace(0, 1, len(param_values)))
    
    for i, param_val in enumerate(param_values):
        plt.plot(
            stopping_times, 
            price_matrix[i, :], 
            marker='o', 
            linewidth=2,
            markersize=6,
            color=colors[i],
            label=f'{param_name} = {param_val:.3f}'
        )
    
    plt.xlabel('Time Horizon (days)', fontsize=12, fontweight='bold')
    plt.ylabel('DMACO Option Price', fontsize=12, fontweight='bold')
    plt.title(f'{greek_name}: {param_name} Sensitivity\nOption Price vs Time Horizon', 
              fontsize=14, fontweight='bold', pad=20)
    plt.legend(loc='best', fontsize=9, framealpha=0.9, ncol=2)
    plt.grid(True, alpha=0.3, linestyle='--')
    plt.tight_layout()
    plt.savefig(save_path, dpi=300, bbox_inches='tight')
    print(f"  Saved: {save_path}")
    plt.close()


def plot_heatmap(
    price_matrix: np.ndarray,
    param_values: np.ndarray,
    stopping_times: np.ndarray,
    param_name: str,
    greek_name: str,
    save_path: str
):
    """Create heatmap for a Greek."""
    plt.figure(figsize=(10, 8))
    im = plt.imshow(price_matrix, aspect='auto', cmap='YlOrRd', origin='lower')
    plt.colorbar(im, label='Option Price')
    plt.xlabel('Time Horizon (days)', fontsize=12, fontweight='bold')
    plt.ylabel(param_name, fontsize=12, fontweight='bold')
    plt.title(f'{greek_name} Heatmap: {param_name} Sensitivity', 
              fontsize=14, fontweight='bold', pad=20)
    
    plt.xticks(range(len(stopping_times)), stopping_times)
    plt.yticks(range(len(param_values)), [f'{v:.3f}' for v in param_values])
    plt.tight_layout()
    plt.savefig(save_path, dpi=300, bbox_inches='tight')
    print(f"  Saved: {save_path}")
    plt.close()


def create_summary_plot(all_results: dict, stopping_times: np.ndarray, save_path: str):
    """Create multi-panel summary plot of all Greeks."""
    fig, axes = plt.subplots(3, 2, figsize=(16, 14))
    fig.suptitle('DMACO Greeks Summary: Comprehensive Sensitivity Analysis', 
                 fontsize=16, fontweight='bold', y=0.995)
    
    greeks_info = [
        ('Delta', 'S0', axes[0, 0]),
        ('Vega', 'sigma', axes[0, 1]),
        ('Rho', 'r', axes[1, 0]),
        ('Lambda', 'lambda_j', axes[1, 1]),
        ('Short Window', 'short_window', axes[2, 0]),
        ('Long Window', 'long_window', axes[2, 1])
    ]
    
    for greek_name, param_key, ax in greeks_info:
        if param_key in all_results:
            price_matrix, param_values = all_results[param_key]
            colors = plt.cm.viridis(np.linspace(0, 1, len(param_values)))
            
            for i, param_val in enumerate(param_values):
                ax.plot(stopping_times, price_matrix[i, :], 
                       marker='o', linewidth=1.5, markersize=4,
                       color=colors[i], label=f'{param_val:.3f}')
            
            ax.set_xlabel('Time Horizon (days)', fontsize=10)
            ax.set_ylabel('Price', fontsize=10)
            ax.set_title(f'{greek_name}', fontsize=11, fontweight='bold')
            ax.legend(fontsize=7, ncol=2, loc='best')
            ax.grid(True, alpha=0.3, linestyle='--')
    
    plt.tight_layout()
    plt.savefig(save_path, dpi=300, bbox_inches='tight')
    print(f"\n  Saved summary plot: {save_path}")
    plt.close()




In [38]:
# ============================================================================
# Main Execution: Complete Greeks Analysis
# ============================================================================

if __name__ == "__main__":
    # Base parameters
    base_params = JumpDiffusionParams(
        S0=100,
        r=0.05,
        sigma=0.15,
        lambda_j=1.0,
        mu_j=-0.1,
        sigma_j=0.25,
        steps_per_year=252
    )
    
    # Simulation settings
    n_paths = 1000  # Paths per simulation (balanced for speed/accuracy)
    short_window = 5
    long_window = 30
    seed = 42
    stopping_times = np.arange(10, 36, 5)
    
    print("="*80)
    print(" "*20 + "DMACO COMPREHENSIVE GREEKS ANALYSIS")
    print("="*80)
    print(f"\nBase Parameters:")
    print(f"  S0 = {base_params.S0}, r = {base_params.r}, σ = {base_params.sigma}")
    print(f"  λ = {base_params.lambda_j}, μⱼ = {base_params.mu_j}, σⱼ = {base_params.sigma_j}")
    print(f"\nSimulation: {n_paths:,} paths, SMA({short_window},{long_window}), seed={seed}")
    print("="*80)
    
    calc = GreeksCalculator(base_params, n_paths, short_window, long_window, seed)
    all_results = {}
    all_dataframes = []
    
    # 1. DELTA - Sensitivity to S0 (spot price)
    print("\n[1/6] Computing DELTA...")
    S0_range = np.linspace(90, 110, 6)
    delta_matrix, delta_df = calc.compute_sensitivity(
        'S0', S0_range, stopping_times, 'Delta'
    )
    all_results['S0'] = (delta_matrix, S0_range)
    all_dataframes.append(delta_df)
    plot_greek(delta_matrix, S0_range, stopping_times, 'S0', 'Delta', 'greek_delta.png')
    plot_heatmap(delta_matrix, S0_range, stopping_times, 'S0', 'Delta', 'heatmap_delta.png')
    
    # 2. VEGA - Sensitivity to sigma (volatility)
    print("\n[2/6] Computing VEGA...")
    sigma_range = np.linspace(0.05, 0.35, 7)
    vega_matrix, vega_df = calc.compute_sensitivity(
        'sigma', sigma_range, stopping_times, 'Vega'
    )
    all_results['sigma'] = (vega_matrix, sigma_range)
    all_dataframes.append(vega_df)
    plot_greek(vega_matrix, sigma_range, stopping_times, 'σ', 'Vega', 'greek_vega.png')
    plot_heatmap(vega_matrix, sigma_range, stopping_times, 'σ', 'Vega', 'heatmap_vega.png')
    
    # 3. RHO - Sensitivity to r (risk-free rate)
    print("\n[3/6] Computing RHO...")
    r_range = np.linspace(-0.05, 0.15, 6)
    rho_matrix, rho_df = calc.compute_sensitivity(
        'r', r_range, stopping_times, 'Rho'
    )
    all_results['r'] = (rho_matrix, r_range)
    all_dataframes.append(rho_df)
    plot_greek(rho_matrix, r_range, stopping_times, 'r', 'Rho', 'greek_rho.png')
    plot_heatmap(rho_matrix, r_range, stopping_times, 'r', 'Rho', 'heatmap_rho.png')
    
    # 4. LAMBDA - Sensitivity to jump intensity
    print("\n[4/6] Computing LAMBDA (Jump Intensity)...")
    lambda_range = np.linspace(0.5, 3.0, 6)
    lambda_matrix, lambda_df = calc.compute_sensitivity(
        'lambda_j', lambda_range, stopping_times, 'Lambda'
    )
    all_results['lambda_j'] = (lambda_matrix, lambda_range)
    all_dataframes.append(lambda_df)
    plot_greek(lambda_matrix, lambda_range, stopping_times, 'λ', 'Lambda', 'greek_lambda.png')
    plot_heatmap(lambda_matrix, lambda_range, stopping_times, 'λ', 'Lambda', 'heatmap_lambda.png')
    
    # 5. SHORT WINDOW - Sensitivity to short MA window
    print("\n[5/6] Computing SHORT WINDOW Sensitivity...")
    short_range = np.linspace(1, 15, 6, dtype=int)
    short_matrix, short_df = calc.compute_window_sensitivity(
        'short', short_range, stopping_times
    )
    all_results['short_window'] = (short_matrix, short_range)
    all_dataframes.append(short_df)
    plot_greek(short_matrix, short_range, stopping_times, 'Short Window', 
               'Short MA Window', 'greek_short_window.png')
    plot_heatmap(short_matrix, short_range, stopping_times, 'Short Window',
                 'Short MA Window', 'heatmap_short_window.png')
    
    # 6. LONG WINDOW - Sensitivity to long MA window
    print("\n[6/6] Computing LONG WINDOW Sensitivity...")
    long_range = np.linspace(10, 50, 6, dtype=int)
    long_matrix, long_df = calc.compute_window_sensitivity(
        'long', long_range, stopping_times
    )
    all_results['long_window'] = (long_matrix, long_range)
    all_dataframes.append(long_df)
    plot_greek(long_matrix, long_range, stopping_times, 'Long Window',
               'Long MA Window', 'greek_long_window.png')
    plot_heatmap(long_matrix, long_range, stopping_times, 'Long Window',
                 'Long MA Window', 'heatmap_long_window.png')
    
    # Create summary visualization
    print("\nCreating summary visualization...")
    create_summary_plot(all_results, stopping_times, 'greeks_summary.png')
    
    # Save combined results
    combined_df = pd.concat(all_dataframes, ignore_index=True)
    combined_df.to_csv('greeks_all_results.csv', index=False)
    print(f"\n  Saved all results: greeks_all_results.csv")
    
    # Print summary statistics
    print("\n" + "="*80)
    print(" "*30 + "SUMMARY STATISTICS")
    print("="*80)
    
    for name, (matrix, values) in all_results.items():
        print(f"\n{name.upper()}:")
        print(f"  Value range: {values.min():.3f} to {values.max():.3f}")
        print(f"  Price range: {matrix.min():.6f} to {matrix.max():.6f}")
        print(f"  Mean price: {matrix.mean():.6f} (±{matrix.std():.6f})")
        
        # Calculate sensitivity (approximate derivative)
        if len(values) > 1:
            sensitivity = (matrix[-1, :].mean() - matrix[0, :].mean()) / (values[-1] - values[0])
            print(f"  Avg sensitivity: {sensitivity:.6f} per unit change")
    
    print("\n" + "="*80)
    print(" "*25 + "ANALYSIS COMPLETE!")
    print("="*80)
    print("\nGenerated files:")
    print("  • 6 Greek line plots (greek_*.png)")
    print("  • 6 Greek heatmaps (heatmap_*.png)")
    print("  • 1 Summary plot (greeks_summary.png)")
    print("  • 1 CSV with all results (greeks_all_results.csv)")
    print("="*80)

                    DMACO COMPREHENSIVE GREEKS ANALYSIS

Base Parameters:
  S0 = 100, r = 0.05, σ = 0.15
  λ = 1.0, μⱼ = -0.1, σⱼ = 0.25

Simulation: 1,000 paths, SMA(5,30), seed=42

[1/6] Computing DELTA...

Delta: Testing 6 values × 6 stopping times = 36 simulations
  Progress: 17%
  Progress: 33%
  Progress: 50%
  Progress: 67%
  Progress: 83%
  Progress: 100%
  Saved: greek_delta.png
  Saved: heatmap_delta.png

[2/6] Computing VEGA...

Vega: Testing 7 values × 6 stopping times = 42 simulations
  Progress: 14%
  Progress: 29%
  Progress: 43%
  Progress: 57%
  Progress: 71%
  Progress: 86%
  Progress: 100%
  Saved: greek_vega.png
  Saved: heatmap_vega.png

[3/6] Computing RHO...

Rho: Testing 6 values × 6 stopping times = 36 simulations
  Progress: 17%
  Progress: 33%
  Progress: 50%
  Progress: 67%
  Progress: 83%
  Progress: 100%
  Saved: greek_rho.png
  Saved: heatmap_rho.png

[4/6] Computing LAMBDA (Jump Intensity)...

Lambda: Testing 6 values × 6 stopping times = 36 simulations


In [39]:
# Base parameters for the jump-diffusion model
base_params = JumpDiffusionParams(
    S0=100,
    r=0.02,
    sigma=0.15,
    lambda_j=1,
    mu_j=-0.1,
    sigma_j=0.25,
    steps_per_year=252
)

# Table of parameter scenarios
scenarios = [
    {"Description": "Base",            "short_ma": 5,  "long_ma": 18, "T": 30},
    {"Description": "Longer T",        "short_ma": 5,  "long_ma": 18, "T": 35},
    {"Description": "Longer Long MA",  "short_ma": 5,  "long_ma": 26, "T": 30},
    {"Description": "Longer Short MA", "short_ma": 7,  "long_ma": 18, "T": 30},
]

# Monte Carlo settings
n_paths = 3000  # number of paths
seed = 42      # random seed for reproducibility

# Create results list
results = []

for s in scenarios:
    simulator = JumpDiffusionSimulator(base_params)
    sma_dfs = simulator.generate_paths_with_sma(
        n_paths=n_paths,
        short_window=s["short_ma"],
        long_window=s["long_ma"],
        seed=seed
    )
    
    price = DMACOPricer.compute_price(
        sma_dfs=sma_dfs,
        stopping_time=s["T"],
        r=base_params.r,
        steps_per_year=base_params.steps_per_year,
        short_window=s["short_ma"],
        long_window=s["long_ma"]
    )
    
    results.append({
        "Description": s["Description"],
        "S0": base_params.S0,
        "r (%)": base_params.r * 100,
        "Volatility (%)": base_params.sigma * 100,
        "Jump Intensity λ": base_params.lambda_j,
        "μ_j": base_params.mu_j,
        "σ_j": base_params.sigma_j,
        "Short MA (days)": s["short_ma"],
        "Long MA (days)": s["long_ma"],
        "Time Horizon T (days)": s["T"],
        "DMACO Price": price
    })

# Convert to DataFrame
df_prices = pd.DataFrame(results)

# Display the table
print(df_prices)


       Description   S0  r (%)  Volatility (%)  Jump Intensity λ  μ_j   σ_j  \
0             Base  100    2.0            15.0                 1 -0.1  0.25   
1         Longer T  100    2.0            15.0                 1 -0.1  0.25   
2   Longer Long MA  100    2.0            15.0                 1 -0.1  0.25   
3  Longer Short MA  100    2.0            15.0                 1 -0.1  0.25   

   Short MA (days)  Long MA (days)  Time Horizon T (days)  DMACO Price  
0                5              18                     30     0.876428  
1                5              18                     35     0.917654  
2                5              26                     30     0.824179  
3                7              18                     30     0.828989  
