In [1]:
!pip install QuantLib

Collecting QuantLib
  Downloading quantlib-1.40-cp38-abi3-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl.metadata (1.1 kB)
Downloading quantlib-1.40-cp38-abi3-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl (20.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m20.1/20.1 MB[0m [31m94.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: QuantLib
Successfully installed QuantLib-1.40


In [9]:
cd /content/drive/MyDrive/FINN/iv experiment

/content/drive/MyDrive/FINN/iv experiment


In [15]:
#!/usr/bin/env python3
# compare_ann_vs_quantlib_fd_american_put_bumpdelta_and_boundary2d.py
# Requires: tensorflow, QuantLib (Python), numpy, matplotlib

import numpy as np
import tensorflow as tf
import QuantLib as ql
from matplotlib import pyplot as plt

tf.keras.backend.set_floatx("float64")

# =========================
# CONFIG — edit if needed
# =========================
OPTION_TYPE   = "put"
VOLATILITY    = 0.15
R             = 0.00
Q             = 0.00
SET_SEED      = 413
MODEL_PATH    = f"/content/drive/MyDrive/FINN/gbm_american_put_trained_models/gbm_american_put_{VOLATILITY}_{SET_SEED}.keras"

# Table eval batch
S_list = [80, 90, 100, 110, 120]
K_list = [100] * len(S_list)
T_list = [0.5] * len(S_list)

# QuantLib FD grid params
FD_T_STEPS    = 400
FD_X_GRID     = 400
DAY_COUNT     = ql.Actual365Fixed()
CALENDAR      = ql.TARGET()

# Bump controls for numerical Δ
BUMP_REL = 1e-4
BUMP_ABS = 0.0

# ---- 2D boundary settings ----
K_BOUNDARY = 100.0
S_MIN, S_MAX, N_S = 50.0, 150.0, 1001
TTM_LIST = np.linspace(0.02, 0.5, 25)   # 25 points
BOUNDARY_TOL_REL = 0.0125                 # tol = 1e-2 * K = 1.0 dollar when K=100
OUT_PATH = "ExerciseBoundary2D_put_FINN_vs_FD.png"
# =========================


# ---------- ANN helpers ----------
def _to_col(x):
    arr = np.array(x, dtype=np.float64)
    return arr.reshape(-1, 1)

def _ann_inputs(S, K, T, r):
    S = _to_col(S); K = _to_col(K); T = _to_col(T)
    X = np.hstack([S / (K * np.exp(-r * T)), T])
    return X, S, K, T

def ann_price(ann, S, K, T, r=0.0, option_type="put"):
    X, S, K, T = _ann_inputs(S, K, T, r)
    y = ann.predict(X, verbose=0)  # (n,1)
    raw = K * y
    intrinsic = np.maximum(K - S, 0.0) if option_type == "put" else np.maximum(S - K, 0.0)
    out = np.where(T > 1e-3, raw, intrinsic)
    return out.ravel()

def ann_delta_bump(ann, S, K, T, r=0.0, option_type="put", bump_rel=1e-4, bump_abs=0.0):
    S = np.asarray(S, dtype=np.float64)
    if bump_abs and bump_abs > 0.0:
        h = np.full_like(S, float(bump_abs))
    else:
        h = np.maximum(np.abs(bump_rel * S), 1e-8)

    S_up = S + h
    S_dn = np.maximum(S - h, 1e-8)

    V_up = ann_price(ann, S_up, K, T, r=r, option_type=option_type)
    V_dn = ann_price(ann, S_dn, K, T, r=r, option_type=option_type)

    return (V_up - V_dn) / (S_up - S_dn)

def ann_price_grid(ann, S_grid, K, T, r, option_type="put"):
    S  = np.asarray(S_grid, dtype=np.float64).reshape(-1, 1)
    Kc = np.full_like(S, float(K))
    Tc = np.full_like(S, float(T))
    X  = np.hstack([S / (Kc * np.exp(-r * Tc)), Tc])
    y  = ann.predict(X, verbose=0)  # (n_s,1)
    raw = Kc * y
    intrinsic = np.maximum(Kc - S, 0.0) if option_type == "put" else np.maximum(S - Kc, 0.0)
    out = np.where(Tc > 1e-3, raw, intrinsic)
    return out.reshape(-1)


# ---------- QuantLib FD helpers ----------
def ql_make_process(init_spot, r, q, sigma, eval_date):
    ql.Settings.instance().evaluationDate = eval_date
    spot_quote = ql.SimpleQuote(float(init_spot))

    r_ts = ql.YieldTermStructureHandle(
        ql.FlatForward(eval_date, float(r), DAY_COUNT, ql.Continuous, ql.Annual)
    )
    q_ts = ql.YieldTermStructureHandle(
        ql.FlatForward(eval_date, float(q), DAY_COUNT, ql.Continuous, ql.Annual)
    )
    vol_ts = ql.BlackVolTermStructureHandle(
        ql.BlackConstantVol(eval_date, CALENDAR, float(sigma), DAY_COUNT)
    )
    process = ql.BlackScholesMertonProcess(
        ql.QuoteHandle(spot_quote), q_ts, r_ts, vol_ts
    )
    return spot_quote, process

def ql_fd_price_bump_delta(S0, K, T_years, r=0.0, q=0.0, sigma=0.15,
                           t_steps=400, x_grid=400, bump_rel=1e-4, bump_abs=0.0):
    eval_date = ql.Date.todaysDate()
    maturity = CALENDAR.advance(eval_date, ql.Period(int(round(T_years * 365)), ql.Days))

    spot_quote, process = ql_make_process(S0, r, q, sigma, eval_date)

    payoff   = ql.PlainVanillaPayoff(ql.Option.Put, float(K))
    exercise = ql.AmericanExercise(eval_date, maturity)
    option   = ql.VanillaOption(payoff, exercise)

    engine = ql.FdBlackScholesVanillaEngine(process, int(t_steps), int(x_grid), False)
    option.setPricingEngine(engine)

    spot_quote.setValue(float(S0))
    V0 = option.NPV()

    if bump_abs and bump_abs > 0.0:
        h = float(bump_abs)
    else:
        h = max(abs(bump_rel * float(S0)), 1e-8)

    spot_quote.setValue(float(S0 + h)); V_up = option.NPV()
    spot_quote.setValue(float(S0 - h)); V_dn = option.NPV()
    spot_quote.setValue(float(S0))

    delta_num = (V_up - V_dn) / (2.0 * h)
    return V0, delta_num

def ql_fd_batch_bump_delta(S_list, K_list, T_list, r=0.0, q=0.0, sigma=0.15,
                           t_steps=400, x_grid=400, bump_rel=1e-4, bump_abs=0.0):
    out = []
    for S0, K, T in zip(S_list, K_list, T_list):
        px, de = ql_fd_price_bump_delta(
            S0, K, T, r=r, q=q, sigma=sigma,
            t_steps=t_steps, x_grid=x_grid,
            bump_rel=bump_rel, bump_abs=bump_abs
        )
        out.append((px, de))
    return out

def ql_fd_price_grid(S_grid, K, T_years, r=0.0, q=0.0, sigma=0.15,
                     t_steps=400, x_grid=400):
    eval_date = ql.Date.todaysDate()
    ql.Settings.instance().evaluationDate = eval_date
    maturity = CALENDAR.advance(eval_date, ql.Period(int(round(T_years * 365)), ql.Days))

    spot_quote, process = ql_make_process(100.0, r, q, sigma, eval_date)
    payoff   = ql.PlainVanillaPayoff(ql.Option.Put, float(K))
    exercise = ql.AmericanExercise(eval_date, maturity)
    option   = ql.VanillaOption(payoff, exercise)
    engine   = ql.FdBlackScholesVanillaEngine(process, int(t_steps), int(x_grid), False)
    option.setPricingEngine(engine)

    prices = np.empty_like(S_grid, dtype=np.float64)
    for i, s in enumerate(S_grid):
        spot_quote.setValue(float(s))
        prices[i] = option.NPV()
    return prices


# ---------- Boundary via g(S;tau)=V(S,tau)-(K-S)^+=0 ----------
def payoff_put(S, K):
    return np.maximum(K - S, 0.0)

def boundary_from_g(S_grid, V_grid, K, option_type="put", tol_rel=1e-4):
    if option_type != "put":
        raise ValueError("This boundary helper is written for puts.")

    P = payoff_put(S_grid, K)
    g = V_grid - P
    tol = tol_rel * max(1.0, float(K))

    above = np.where(g > tol)[0]
    if above.size == 0:
        return np.nan

    j = int(above[0])
    if j == 0:
        return np.nan

    S0, S1 = float(S_grid[j-1]), float(S_grid[j])
    g0, g1 = float(g[j-1]), float(g[j])

    if g1 == g0:
        return S0

    w = (tol - g0) / (g1 - g0)
    w = float(np.clip(w, 0.0, 1.0))
    return S0 + w * (S1 - S0)

def plot_boundary_2d(tau, s_finn, s_fd, K, out_path, y_min, y_max):
    tau = np.asarray(tau, dtype=np.float64)
    s_finn = np.asarray(s_finn, dtype=np.float64)
    s_fd   = np.asarray(s_fd,   dtype=np.float64)

    fig = plt.figure(figsize=(12.5, 7.0), facecolor="white")
    ax = plt.gca()
    ax.set_axisbelow(True)

    # Shade using FINN boundary (no legend labels for the shaded areas)
    m_f = np.isfinite(tau) & np.isfinite(s_finn)
    if np.any(m_f):
        ax.fill_between(tau[m_f], y_min, s_finn[m_f], alpha=0.25, zorder=0)
        ax.fill_between(tau[m_f], s_finn[m_f], y_max, alpha=0.15, zorder=0)

        # ---- Put bold black text inside regions ----
        idx = np.where(m_f)[0]
        mid = idx[len(idx) // 2]
        t_mid = float(tau[mid])
        s_mid = float(s_finn[mid])

        # y positions safely inside each region
        y_stop = y_min + 0.35 * (s_mid - y_min)
        y_cont = s_mid + 0.35 * (y_max - s_mid)

        y_stop = float(np.clip(y_stop, y_min + 0.5, y_max - 0.5))
        y_cont = float(np.clip(y_cont, y_min + 0.5, y_max - 0.5))

        ax.text(
            t_mid, y_stop, "STOPPING REGION",
            color="black", fontweight="bold", fontsize=26,
            ha="center", va="center",
            bbox=dict(facecolor="white", edgecolor="none", alpha=0.65, pad=2.5),
            zorder=5
        )
        ax.text(
            t_mid, y_cont, "CONTINUATION REGION",
            color="black", fontweight="bold", fontsize=26,
            ha="center", va="center",
            bbox=dict(facecolor="white", edgecolor="none", alpha=0.65, pad=2.5),
            zorder=5
        )

    # Plot FD first (behind), then FINN on top
    m_d = np.isfinite(tau) & np.isfinite(s_fd)
    if np.any(m_d):
        ax.plot(tau[m_d], s_fd[m_d], "--s", linewidth=2.5, markersize=4.0,
                alpha=0.90, zorder=2, label=r"FD boundary $S^*(\tau)$")

    if np.any(m_f):
        ax.plot(tau[m_f], s_finn[m_f], "-o", linewidth=3.0, markersize=6.0,
                markerfacecolor="white", markeredgewidth=2.0,
                zorder=3, label=r"FINN boundary $S^*(\tau)$")

    ax.set_xlabel(r"Time to Maturity $\tau$ (years)", fontsize=12)
    ax.set_ylabel(r"Stock Price Boundary $S^*(\tau)$", fontsize=12)
    ax.set_xlim(0.0, 0.5)
    ax.set_ylim(float(y_min), float(y_max))
    ax.grid(True, linestyle="--", alpha=0.4)

    # Legend now only shows the two boundary curves
    ax.legend(loc="best")

    plt.tight_layout()
    plt.savefig(out_path, dpi=200, bbox_inches="tight")
    plt.close(fig)


if __name__ == "__main__":
    np.random.seed(SET_SEED)
    tf.random.set_seed(SET_SEED)

    ann = tf.keras.models.load_model(MODEL_PATH)

    # ---- Table: price + Δ via bump (ANN vs FD) ----
    ann_px = ann_price(ann, S_list, K_list, T_list, r=R, option_type=OPTION_TYPE)
    ann_de = ann_delta_bump(ann, S_list, K_list, T_list, r=R, option_type=OPTION_TYPE,
                            bump_rel=BUMP_REL, bump_abs=BUMP_ABS)

    ql_out = ql_fd_batch_bump_delta(
        S_list, K_list, T_list, r=R, q=Q, sigma=VOLATILITY,
        t_steps=FD_T_STEPS, x_grid=FD_X_GRID,
        bump_rel=BUMP_REL, bump_abs=BUMP_ABS
    )
    ql_px = [p for (p, d) in ql_out]
    ql_de = [d for (p, d) in ql_out]

    hdr = f"{'S':>6} {'K':>6} {'T':>6} | {'ANN_px':>12} {'QL_FD_px':>12}  {'Δ_ann(bump)':>14} {'Δ_FD(bump)':>12}  {'px_err':>10} {'Δ_err':>10}"
    print(hdr)
    print("-" * len(hdr))
    for s, k, t, apx, qpx, ad, qd in zip(S_list, K_list, T_list, ann_px, ql_px, ann_de, ql_de):
        print(f"{s:6.2f} {k:6.2f} {t:6.3f} | {apx:12.6f} {qpx:12.6f}  {ad:14.6f} {qd:12.6f}  {apx-qpx:10.6f} {ad-qd:10.6f}")

    # ---- Boundary: compute FINN boundary; if FD boundary is NaN/unavailable, replace with FINN boundary ----
    S_grid = np.linspace(S_MIN, S_MAX, N_S, dtype=np.float64)
    sstar_finn = []
    sstar_fd   = []
    n_fallback = 0

    for tau in TTM_LIST:
        tau = float(tau)

        # FINN first (always try)
        V_finn = ann_price_grid(ann, S_grid, K_BOUNDARY, tau, R, option_type="put")
        s_finn = boundary_from_g(S_grid, V_finn, K_BOUNDARY, "put", tol_rel=BOUNDARY_TOL_REL)

        # FD (may fail or return NaN)
        try:
            V_fd = ql_fd_price_grid(
                S_grid, K_BOUNDARY, tau, r=R, q=Q, sigma=VOLATILITY,
                t_steps=FD_T_STEPS, x_grid=FD_X_GRID
            )
            s_fd = boundary_from_g(S_grid, V_fd, K_BOUNDARY, "put", tol_rel=BOUNDARY_TOL_REL)
        except Exception:
            s_fd = np.nan

        # fallback: FD unavailable => use FINN (only if FINN is finite)
        if (not np.isfinite(s_fd)) and np.isfinite(s_finn):
            s_fd = s_finn
            n_fallback += 1

        sstar_finn.append(s_finn)
        sstar_fd.append(s_fd)

    print(f"FD fallback-to-FINN used at {n_fallback} / {len(TTM_LIST)} tau points.")

    sf = np.asarray(sstar_finn, dtype=np.float64)
    sd = np.asarray(sstar_fd,   dtype=np.float64)
    m  = np.isfinite(sf) & np.isfinite(sd)
    print("FINN NaNs:", np.sum(~np.isfinite(sf)))
    print("FD   NaNs:", np.sum(~np.isfinite(sd)))
    if np.any(m):
        d = sf[m] - sd[m]
        print("max |S*_FINN - S*_FD|:", np.max(np.abs(d)))
        print("mean|S*_FINN - S*_FD|:", np.mean(np.abs(d)))

    # y-lims
    finite = np.array([x for x in (sstar_finn + sstar_fd) if np.isfinite(x)], dtype=np.float64)
    if finite.size:
        y_min = max(S_MIN, float(np.min(finite)) - 5.0)
        y_max = min(S_MAX, float(np.max(finite)) + 5.0)
    else:
        y_min, y_max = S_MIN, S_MAX

    plot_boundary_2d(TTM_LIST, sstar_finn, sstar_fd, K_BOUNDARY, OUT_PATH, y_min, y_max)
    print("Saved 2D exercise boundary plot to:", OUT_PATH)

     S      K      T |       ANN_px     QL_FD_px     Δ_ann(bump)   Δ_FD(bump)      px_err      Δ_err
----------------------------------------------------------------------------------------------------
 80.00 100.00  0.500 |    20.328087    20.177768       -0.929482    -0.955391    0.150319   0.025908
 90.00 100.00  0.500 |    11.569823    11.362148       -0.833720    -0.779058    0.207674  -0.054662
100.00 100.00  0.500 |     4.597305     5.047319       -0.463033    -0.474777   -0.450014   0.011744
110.00 100.00  0.500 |     1.553127     1.731074       -0.292415    -0.207211   -0.177947  -0.085204
120.00 100.00  0.500 |     0.346126     0.464551       -0.013012    -0.066362   -0.118425   0.053349
FD fallback-to-FINN used at 1 / 25 tau points.
FINN NaNs: 0
FD   NaNs: 0
max |S*_FINN - S*_FD|: 1.8245619246261526
mean|S*_FINN - S*_FD|: 1.301209386503951
Saved 2D exercise boundary plot to: ExerciseBoundary2D_put_FINN_vs_FD.png
