In [30]:
import numpy as np
from dataclasses import dataclass
import numpy as np

In [31]:
@dataclass
class CIRParams:
    kappa: float
    theta: float
    sigma: float
    r0: float

In [32]:
def cir_A_B(params: CIRParams, t: float, T: float):
    """
    Closed-form CIR bond price coefficients: P(t,T) = A * exp(-B * r_t)
    """
    kappa, theta, sigma = params.kappa, params.theta, params.sigma
    tau = max(T - t, 0)
    if tau == 0:
        return 1, 0
    gamma = np.sqrt(kappa**2 + 2.0 * sigma**2)
    # B(t,T)
    numerator = 2 * (np.exp(gamma * tau) - 1)
    denominator = (gamma + kappa) * (np.exp(gamma * tau) - 1) + 2 * gamma
    B = numerator / denominator * 2
    
    B = 2 * (np.exp(gamma * tau) - 1) / ((gamma + kappa) * (np.exp(gamma * tau) - 1) + 2 * gamma)
    logA_num = 2 * gamma * np.exp((kappa + gamma) * tau / 2)
    logA_den = (gamma + kappa) * (np.exp(gamma * tau) - 1) + 2 * gamma
    A = (logA_num / logA_den) ** (2 * kappa * theta / (sigma**2))
    return A, B

In [33]:
def P0T(params: CIRParams, T: float) -> float:
    A, B = cir_A_B(params, 0, T)
    return A * np.exp(-B * params.r0)

In [34]:
def simulate_CIR_paths(params: CIRParams, T: float, n_steps: int, n_paths: int):
    """
    Exact transitions for CIR on a time grid
    """
    kappa, theta, sigma, r0 = params.kappa, params.theta, params.sigma, params.r0
    dt = T / n_steps
    r = np.full(n_paths, r0, dtype=float)
    int_r = np.zeros(n_paths, dtype=float)  # trapezoidal approx
    
    exp_kdt = np.exp(-kappa * dt)
    c = (sigma**2) * (1.0 - exp_kdt) / (4.0 * kappa)  # for noncentral chi-square scale
    
    for _ in range(n_steps):
        d = 4.0 * kappa * theta / (sigma**2)
        lam = (4.0 * kappa * exp_kdt / (sigma**2 * (1.0 - exp_kdt))) * r
        X = np.random.noncentral_chisquare(df=d, nonc=lam, size=n_paths)
        r_new = c * X
        int_r += 0.5 * (r + r_new) * dt
        r = r_new
    
    return r, int_r

In [35]:
def swap_analytics(params: CIRParams, K: float, T_exp: float, payment_times):
    """
    Deterministic (no MC) value at t=0 of the forward-start payer swap (starting at T_exp)
    """
    P0_Texp = P0T(params, T_exp)
    P0_Tend = P0T(params, payment_times[-1])
    deltas = np.diff([T_exp] + list(payment_times))
    P0_payments = np.array([P0T(params, t) for t in payment_times])
    A0 = np.sum(deltas * P0_payments)
    V_swap0 = P0_Texp - P0_Tend - K * A0
    return V_swap0, A0, P0_Texp, P0_Tend

In [36]:
def price_swaptions_MC(params: CIRParams, K: float, T_exp: float, payment_times, n_paths=1000, n_steps=400):
    """
    Monte Carlo under risk-neutral measure. Price payer/receiver swaptions with expiry T_exp
    """
    r_T, int_r = simulate_CIR_paths(params, T_exp, n_steps, n_paths)
    D0T = np.exp(-int_r)  # discount factor to T_exp
    
    def P_T_U(U):
        A, B = cir_A_B(params, T_exp, U)
        return A * np.exp(-B * r_T)
    
    P_T_end = P_T_U(payment_times[-1])
    deltas = np.diff([T_exp] + list(payment_times))
    P_T_payments = np.column_stack([P_T_U(t) for t in payment_times])
    V_T = 1.0 - P_T_end - K * np.sum(deltas * P_T_payments, axis=1)
    payer_payoff = np.maximum(V_T, 0.0)
    receiver_payoff = np.maximum(-V_T, 0.0)
    payer_price = np.mean(D0T * payer_payoff)
    receiver_price = np.mean(D0T * receiver_payoff)
    return payer_price, receiver_price, V_T.mean(), D0T.mean()

In [37]:
params = CIRParams(
    kappa=0.6,   
    theta=0.03,  
    sigma=0.12,  
    r0=0.02      
)
T_exp = 1     
tenor_years = 5 
freq = 4           
n_payments = int(tenor_years * freq)
payment_times = [T_exp + (i+1)/freq for i in range(n_payments)]

In [38]:
_, A0, P0_Texp, P0_Tend = swap_analytics(params, 0, T_exp, payment_times)
S_fwd = (P0_Texp - P0_Tend) / A0
K = S_fwd  # atm
V_swap0, A0, P0_Texp, P0_Tend = swap_analytics(params, K, T_exp, payment_times)
parity_rhs = V_swap0 
payer_px, receiver_px, avg_VT, avg_D0T = price_swaptions_MC(params, K, T_exp, payment_times, n_paths=20000, n_steps=400)
parity_lhs = payer_px - receiver_px

In [39]:
result = {
    "Forward par rate S_fwd": S_fwd,
    "Strike K": K,
    "PVBP A0": A0,
    "P(0, T_exp)": P0_Texp,
    "P(0, T_end)": P0_Tend,
    "Swap PV at t=0 (payer)": V_swap0,
    "Parity RHS (PVBP*(S_fwd - K))": parity_rhs,
    "Payer swaption (MC)": payer_px,
    "Receiver swaption (MC)": receiver_px,
    "Parity LHS (payer - receiver)": parity_lhs,
    "Parity error (LHS - RHS)": parity_lhs - parity_rhs,
    "Avg discount to expiry E[D(0,T)] (MC)": avg_D0T,
    "Avg swap value at expiry E[V_T] (MC, raw)": avg_VT
}

for k, v in result.items():
    print(f"{k:40s}: {v:.10f}")

Forward par rate S_fwd                  : 0.0279479911
Strike K                                : 0.0279479911
PVBP A0                                 : 4.5553469387
P(0, T_exp)                             : 0.9778034026
P(0, T_end)                             : 0.8504906067
Swap PV at t=0 (payer)                  : 0.0000000000
Parity RHS (PVBP*(S_fwd - K))           : 0.0000000000
Payer swaption (MC)                     : 0.0079375248
Receiver swaption (MC)                  : 0.0078681257
Parity LHS (payer - receiver)           : 0.0000693992
Parity error (LHS - RHS)                : 0.0000693992
Avg discount to expiry E[D(0,T)] (MC)   : 0.9777429125
Avg swap value at expiry E[V_T] (MC, raw): 0.0002035842
