In [1]:
import QuantLib as ql
import math

def zero_curve_from_csv(valuation_date: ql.Date, rows, day_count=ql.Actual365Fixed(), cal=ql.UnitedStates(ql.UnitedStates.NYSE)):
    """
    rows: iterable of (T_years, r_cont_zero). Builds a ZeroCurve dated from valuation_date.
    """
    dates = [valuation_date] + [valuation_date + ql.Period(int(round(T*365)), ql.Days) for T, _ in rows]
    rates = [rows[0][1]] + [float(r) for _, r in rows]  # pad front; QL expects >= 2 nodes
    return ql.YieldTermStructureHandle(ql.ZeroCurve(dates, rates, day_count, cal))

class QLAmericanPricer:
    def __init__(self, valuation_date: ql.Date, r_curve: ql.YieldTermStructureHandle,
                 init_vol: float = 0.20, cal=ql.UnitedStates(ql.UnitedStates.NYSE), dc=ql.Actual365Fixed(),
                 t_grid: int = 400, x_grid: int = 200):
        ql.Settings.instance().evaluationDate = valuation_date
        self.val_date = valuation_date
        self.cal, self.dc = cal, dc
        self.t_grid, self.x_grid = t_grid, x_grid

        # live quotes you can bump
        self.spot_q = ql.SimpleQuote(0.0)
        self.vol_q  = ql.SimpleQuote(max(1e-8, init_vol))

        # handles
        self.u   = ql.QuoteHandle(self.spot_q)
        self.vts = ql.BlackVolTermStructureHandle(ql.BlackConstantVol(valuation_date, cal, ql.QuoteHandle(self.vol_q), dc))
        self.rts = r_curve
        self.dts = ql.YieldTermStructureHandle(ql.FlatForward(valuation_date, 0.0, dc, ql.Continuous))  # cont. q=0; use cash divs explicitly

        # stochastic process reused across prices
        self.process = ql.BlackScholesMertonProcess(self.u, self.dts, self.rts, self.vts)

    def _div_schedule(self, dividends):
        sched = []
        if not dividends: return sched
        for d, amt in dividends:
            dq = d if isinstance(d, ql.Date) else ql.Date(d.day, d.month, d.year)
            if dq > self.val_date:
                sched.append(ql.FixedDividend(float(amt), dq))
        return sched

    def price(self, S, K, maturity: ql.Date, cp: str,
              vol: float | None = None, dividends: list[tuple] | None = None,
              want_greeks: bool = True, vega_eps: float = 1e-4):
        # set live quotes
        self.spot_q.setValue(float(S))
        if vol is not None:
            self.vol_q.setValue(max(1e-8, float(vol)))

        payoff   = ql.PlainVanillaPayoff(ql.Option.Call if cp.upper().startswith('C') else ql.Option.Put, float(K))
        exercise = ql.AmericanExercise(self.val_date, maturity)
        option   = ql.VanillaOption(payoff, exercise)

        divs = self._div_schedule(dividends or [])
        # try FDM engine with explicit dividend schedule
        engine = None
        try:
            engine = ql.FdBlackScholesVanillaEngine(self.process, divs, self.t_grid, self.x_grid)
        except TypeError:
            # fallback: escrowed dividend approx (subtract PV of cash divs from spot)
            pv = sum(d.amount() * self.rts.discount(d.date()) for d in divs)
            self.spot_q.setValue(float(S) - pv)
            engine = ql.FdBlackScholesVanillaEngine(self.process, self.t_grid, self.x_grid)

        option.setPricingEngine(engine)

        out = {"theo": option.NPV()}
        if not want_greeks:
            # restore spot if we escrowed it
            self.spot_q.setValue(float(S))
            return out

        # Greks: ask engine; if missing, do light numeric where needed
        for g in ("delta","gamma","theta"):
            try:
                out[g] = getattr(option, g)()
            except RuntimeError:
                out[g] = None

        # vega: some PDE engines don't provide it → numeric
        try:
            out["vega"] = option.vega()
        except RuntimeError:
            v0 = self.vol_q.value()
            self.vol_q.setValue(v0 + vega_eps); p_up = ql.VanillaOption(payoff, exercise); p_up.setPricingEngine(engine); upv = p_up.NPV()
            self.vol_q.setValue(v0 - vega_eps); p_dn = ql.VanillaOption(payoff, exercise); p_dn.setPricingEngine(engine); dnv = p_dn.NPV()
            self.vol_q.setValue(v0)
            out["vega"] = ((upv - dnv) / (2*vega_eps)) * 0.01  # per 1 vol point

        # restore spot if escrow path
        self.spot_q.setValue(float(S))
        return out

In [4]:
import pandas as pd
from datetime import date
from pandas.tseries.offsets import DateOffset

ALLOWED_FREQ_STEPS = {1:12, 2:6, 3:4, 4:3, 6:2, 12:1}

def _roll_to_business_day(ts):
    ts = pd.Timestamp(ts)
    while ts.weekday() >= 5:  # Sat/Sun -> next business day
        ts = ts + pd.Timedelta(days=1)
    return ts

def _fetch_div_history(client, ticker, start=None, end=None, limit=1000):
    kwargs = dict(ticker=ticker, order="asc", sort="ex_dividend_date", limit=limit)
    if start:
        kwargs["ex_dividend_date_gte"] = start
    if end:
        kwargs["ex_dividend_date_lte"] = end
    rows = [d for d in client.list_dividends(**kwargs)]
    cols = ["id","cash_amount","currency","declaration_date","dividend_type",
            "ex_dividend_date","frequency","pay_date","record_date","ticker"]
    return pd.DataFrame(rows) if rows else pd.DataFrame(columns=cols)

def _parse_mmdd_list(dates_like, base_year):
    """Accepts strings 'YYYY-MM-DD' or 'MM-DD', or pandas/py dates; returns sorted ['MM-DD', ...]."""
    out = []
    for x in dates_like:
        if isinstance(x, (pd.Timestamp, )):
            ts = x
        else:
            s = str(x)
            if len(s) == 5 and s[2] == "-":  # MM-DD
                ts = pd.to_datetime(f"{base_year}-{s}", errors="raise")
            else:  # try full date
                ts = pd.to_datetime(s, errors="raise")
        out.append(ts.strftime("%m-%d"))
    return sorted(out)

def _generate_template_from_first(mmdd_first, freq, base_year):
    step = ALLOWED_FREQ_STEPS.get(freq)
    if step is None:
        raise ValueError(f"Unsupported frequency {freq}. Use one of {sorted(ALLOWED_FREQ_STEPS)}.")
    start = pd.Timestamp(f"{base_year}-{mmdd_first}")
    res = []
    for k in range(freq):
        dt = start + DateOffset(months=step*k)
        res.append(dt.strftime("%m-%d"))
    return sorted(res)

def _complete_template(mmdd_list, freq, base_year):
    """If you provided < freq dates, fill the rest by stepping equal months."""
    step = ALLOWED_FREQ_STEPS.get(freq)
    if step is None:
        raise ValueError(f"Unsupported frequency {freq}. Use one of {sorted(ALLOWED_FREQ_STEPS)}.")
    if len(mmdd_list) >= freq:
        return sorted(mmdd_list)[:freq]
    if len(mmdd_list) == 1:
        return _generate_template_from_first(mmdd_list[0], freq, base_year)
    # 2..freq-1 seeds: continue stepping from the last provided
    seeds = sorted(mmdd_list)
    cur = pd.Timestamp(f"{base_year}-{seeds[-1]}")
    out = seeds[:]
    while len(out) < freq:
        cur = cur + DateOffset(months=step)
        out.append(cur.strftime("%m-%d"))
    return sorted(out)

def forecast_dividends(
    client,
    ticker,
    years_ahead=3,
    increase_dollars=0.05,
    ref_year=None,
    date_field="ex_dividend_date",      # or "pay_date"
    use_business_day_roll=True,
    df_history=None,                    # pass your own history to avoid refetch
    frequency_override=None,            # 1,2,3,4,6,12
    first_dates_override=None           # ["MM-DD", ...] or ["YYYY-MM-DD", ...]
):
    """
    Build a future dividend forecast.

    Template dates:
      - DEFAULT: use the date pattern from `ref_year` (last full year).
      - OVERRIDE: if `first_dates_override` and/or `frequency_override` are given,
        auto-build the yearly template from those seeds.

    Increase logic:
      - If `ref_year` had an in-year *first* increase (cash_amount jumped),
        apply the annual increase on that (mapped) date each future year.
      - Else apply the increase on the first template date of the *second* forecast year.
    """
    # History
    if df_history is None:
        this_year = date.today().year
        df = _fetch_div_history(client, ticker, start=f"{this_year-3}-01-01")
    else:
        df = df_history.copy()

    if df.empty:
        raise ValueError("No dividend history available to build a forecast.")

    # Normalize types
    for col in ["ex_dividend_date", "pay_date", "declaration_date", "record_date"]:
        if col in df.columns:
            df[col] = pd.to_datetime(df[col], errors="coerce")
    df = df.sort_values("ex_dividend_date").reset_index(drop=True)

    if ref_year is None:
        today = pd.Timestamp.today().tz_localize(None)
        ref_year = today.year - 1

    if date_field not in df.columns:
        raise ValueError(f"`date_field='{date_field}'` not in dataframe: {df.columns.tolist()}")

    # Base years
    last_known_year = int(df[date_field].max().year)
    first_forecast_year = last_known_year + 1

    # ---- Template (per-year schedule) ----
    template_mmdd = None

    if first_dates_override or frequency_override:
        # Build from overrides
        if first_dates_override:
            seeds = _parse_mmdd_list(first_dates_override, base_year=first_forecast_year)
        else:
            # No dates given; derive seeds from ref year’s actual pattern
            df_ref = df[df[date_field].dt.year == ref_year].dropna(subset=[date_field])
            if df_ref.empty:
                raise ValueError(f"No {date_field} rows in reference year {ref_year}.")
            seeds = sorted(df_ref[date_field].dt.strftime("%m-%d").tolist())

        if frequency_override is None:
            freq = len(seeds)
        else:
            freq = int(frequency_override)

        template_mmdd = _complete_template(seeds, freq, base_year=first_forecast_year)

    else:
        # Default: copy last full year's pattern
        df_ref = df[df[date_field].dt.year == ref_year].dropna(subset=[date_field])
        if df_ref.empty:
            raise ValueError(f"No {date_field} rows in reference year {ref_year}.")
        template_mmdd = sorted(df_ref[date_field].dt.strftime("%m-%d").tolist())
        if len(template_mmdd) < 1:
            raise ValueError("Reference year has too few dividends to infer a schedule.")

    # ---- Increase anchor detection (from history) ----
    df_all = df.sort_values(date_field).reset_index(drop=True)
    diffs = df_all["cash_amount"].diff()
    in_ref = df_all[date_field].dt.year == ref_year
    inc_rows = df_all[in_ref & (diffs > 0)]
    increase_anchor = None  # (month, day) from *history*
    if not inc_rows.empty:
        inc_dt = inc_rows.iloc[0][date_field]
        increase_anchor = (inc_dt.month, inc_dt.day)

    # Map anchor to our template (if it’s not an exact match, pick the first template date ON/AFTER it)
    mapped_anchor = None
    if increase_anchor is not None:
        anchor_mmdd = f"{increase_anchor[0]:02d}-{increase_anchor[1]:02d}"
        if anchor_mmdd in template_mmdd:
            mapped_anchor = anchor_mmdd
        else:
            # pick first >= anchor; if none, wrap to first
            later = [d for d in template_mmdd if d >= anchor_mmdd]
            mapped_anchor = later[0] if later else template_mmdd[0]

    # Latest amount → starting level
    current_amount = float(df_all.iloc[-1]["cash_amount"])

    # ---- Build forecast ----
    out = []
    for i in range(1, years_ahead + 1):
        year = last_known_year + i
        for mmdd in template_mmdd:
            month, day = map(int, mmdd.split("-"))
            dt = pd.Timestamp(year=year, month=month, day=day)
            if use_business_day_roll:
                dt = _roll_to_business_day(dt)

            inc_applied = False
            if mapped_anchor is not None and mmdd == mapped_anchor:
                current_amount = current_amount+ increase_dollars
                inc_applied = True
            elif mapped_anchor is None and i >= 2 and mmdd == template_mmdd[0]:
                # No historical anchor -> raise on first date of year 2 and onward
                current_amount = current_amount+ increase_dollars
                inc_applied = True

            out.append({
                "ticker": ticker,
                date_field: dt,
                "cash_amount": float(round(current_amount, 4)),
                "increase_applied": inc_applied,
                "source": "forecast"
            })

    forecast_df = pd.DataFrame(out).sort_values(date_field).reset_index(drop=True)
    return forecast_df

In [11]:
from polygon import RESTClient
import os
import dotenv
dotenv.load_dotenv()
polygon_key = os.getenv("AWS_SECRET")

client = RESTClient(polygon_key, pagination=False)

In [19]:




fc = forecast_dividends(
    client,
    "MSFT",
    years_ahead=3,          # project 3 years
    increase_dollars=0.08,      # e.g., 8% yearly increase
    date_field="ex_dividend_date"   # or "pay_date" if you want to anchor on pay dates
)

# Combine with your actuals if you want one table:
fc
fc


def convert_fc_to_ql_divs(fc):
    ql_divs=[]
    cash_amounts=fc["cash_amount"].tolist()
    ex_dates=fc["ex_dividend_date"].tolist()
    for pair in zip(ex_dates, cash_amounts):
        #print(pair)
        ql_date=ql.Date(pair[0].day, pair[0].month, pair[0].year)
        ql_divs.append((ql_date, pair[1]))

    ql_divs
    return ql_divs

ql_divs=convert_fc_to_ql_divs(fc)

df_yc=pd.read_csv("yc_snap.csv")

yrs=df_yc["yrs"].tolist()
rates=df_yc["rate"].tolist()
yield_rows=list(zip(yrs, rates))
dt_today=ql.Date(25, 9, 2025)
val=ql.Date(25, 9, 2025)
ql_yield_curve=zero_curve_from_csv(val, yield_rows)


pricer  = QLAmericanPricer(val, ql_yield_curve, init_vol=0.25)

# 2) Price legs quickly by bumping spot/vol only:
mat = ql.Date(17, 1, 2026)
divs = [(ql.Date(15,11,2025), 0.24)]  # cash dividends ≤ maturity
print(pricer.price(S=190, K=200, maturity=mat, cp='P', vol=0.25, dividends=ql_divs))





{'theo': 15.429007422961808, 'delta': -0.6043173887936704, 'gamma': 0.015645801366940666, 'theta': -12.924214414809462, 'vega': 0.4042295145172403}


In [23]:
df_options=pd.DataFrame()
df_options["strike"]=[100,120]
df_options["maturity"]=[pd.Timestamp(year=2026,day=15,month=11), pd.Timestamp(year=2027,day=15,month=11)]
df_options["cp"]=["C", "P"]
df_options["vol"]=[0.25, 0.25]
df_options["divs"]=[ql_divs]*2
df_options["S"]=[190,190]


In [None]:
from typing import Iterable, Tuple, Any


def to_ql_date(x: Any) -> ql.Date:
    """Accept ql.Date / pandas.Timestamp / datetime.date / 'YYYY-MM-DD'."""
    if isinstance(x, ql.Date):
        return x
    if isinstance(x, str):
        x = pd.to_datetime(x)
    if isinstance(x, pd.Timestamp):
        return ql.Date(int(x.day), int(x.month), int(x.year))
    # datetime.date or similar
    try:
        return ql.Date(int(x.day), int(x.month), int(x.year))
    except Exception as e:
        raise TypeError(f"Can't convert {x!r} to ql.Date") from e


def normalize_divs(divs: Iterable[Tuple[Any, float]] | None) -> list[Tuple[ql.Date, float]]:
    """[(date_like, cash_amount)] -> [(ql.Date, float)]"""
    if not divs:
        return []
    out = []
    for dt, amt in divs:
        out.append((to_ql_date(dt), float(amt)))
    return out



def price_dataframe(
    df: pd.DataFrame,
    pricer,  # your QLAmericanPricer instance
    *,
    s_col="S", k_col="strike", mat_col="maturity", cp_col="cp", vol_col="vol", divs_col="divs",
    compute_greeks=False,
    dS_pct=0.01,           # +1% spot bump for delta
    dvol_abs=0.01          # +1 vol point for vega (e.g., 0.25 -> 0.26)
) -> pd.DataFrame:
    """
    Apply QuantLib pricer row-wise and return a NEW dataframe with inputs + outputs.
    If compute_greeks=True, adds delta & vega via bump-and-reprice.
    """
    price_fn = pricer.price

    results = []
    for row in df.itertuples(index=False, name="Opt"):
        S   = getattr(row, s_col)
        K   = getattr(row, k_col)
        vol = getattr(row, vol_col)
        cp  = str(getattr(row, cp_col)).upper()
        mat = to_ql_date(getattr(row, mat_col))
        divs_raw = getattr(row, divs_col, None)
        divs = normalize_divs(divs_raw)
        #print(pricer.price(S=S, K=K, maturity=mat, cp=cp, vol=vol, dividends=ql_divs))
        base = price_fn(S=S, K=K, maturity=mat, cp=cp, vol=vol, dividends=divs)

        rec = {
            s_col: S, k_col: K, vol_col: vol, cp_col: cp, mat_col: getattr(row, mat_col),
            divs_col: divs_raw,           # keep the original as you provided it
            "price": base["theo"],
            "delta": base["delta"],
            "vega": base["vega"],
            "theta": base["theta"],
            "gamma": base["gamma"]
        }

        results.append(rec)

    return pd.DataFrame(results)


In [55]:
results=price_dataframe(df_options,pricer)
results

Unnamed: 0,S,strike,vol,cp,maturity,divs,price,delta,vega,theta,gamma
0,190,100,0.25,C,2026-11-15,"[(February 16th, 2026, 0.91), (May 15th, 2026,...",91.496674,0.995811,0.024831,-3.853898,0.000255
1,190,120,0.25,P,2027-11-15,"[(February 16th, 2026, 0.91), (May 15th, 2026,...",2.308611,-0.066732,0.352726,-1.604859,0.001908


In [25]:
def ql_price_options(df,ql_pricer):
    df_out=pd.DataFrame()
    for index, row in df.iterrows():
        df_out=df_out.append(ql_pricer.price(S=row["S"], K=row["strike"], maturity=row["maturity"], cp=row["cp"], vol=row["vol"], dividends=row["divs"]))
    return df_out



In [2]:
# pip install scipy
import numpy as np
from scipy.linalg import solve_banded

def american_pde_price_banded(
    S0, K, T, sigma,
    r_func, q_func=lambda t: 0.0,
    dividends=None,                 # [(t_years, cash), ...]
    cp='C',
    NS=400, NT=800,                 # you can often use NT ~ NS with Rannacher
    theta=0.5,                      # 0.5 = Crank–Nicolson
    rannacher=2,                    # first 2 steps implicit Euler to smooth payoff kink
    Smax=None,
    return_grid=False
):
    """FDM PDE with discrete cash dividends, early exercise, banded solver."""
    dividends = sorted(dividends or [], key=lambda x: x[0])

    # spot grid
    if Smax is None:
        D_sum = sum(d for _, d in dividends)
        base = max(S0, K)
        Smax = max(4*base, 2*base + 4*D_sum)
    S = np.linspace(0.0, Smax, NS+1).astype(np.float64)
    dS = S[1] - S[0]

    # payoff at maturity
    V = (S - K).clip(min=0.0) if cp.upper().startswith('C') else (K - S).clip(min=0.0)

    # time grid (include dividend times exactly)
    times = list(np.linspace(0.0, T, NT+1))
    for t, _ in dividends:
        if 0 < t < T: times.append(t)
    times = np.array(sorted(set(times)), dtype=np.float64)  # ascending
    div_amt = {t: 0.0 for t, _ in dividends if 0 < t <= T}
    for t, a in dividends:
        if 0 < t <= T: div_amt[t] = div_amt.get(t, 0.0) + float(a)
    div_times = set(div_amt.keys())

    # optional full grid storage
    V_grid = None
    if return_grid:
        V_grid = np.empty((len(times), NS+1), dtype=np.float64)
        V_grid[-1] = V

    # prealloc work arrays
    N = NS - 1                         # number of interior nodes
    Si = S[1:-1]
    lower = np.empty(N, dtype=np.float64)
    diag  = np.empty(N, dtype=np.float64)
    upper = np.empty(N, dtype=np.float64)
    rhs   = np.empty(N, dtype=np.float64)
    # banded matrix container (3, N): [upper; diag; lower]
    ab = np.zeros((3, N), dtype=np.float64)

    # backward time stepping
    steps_done = 0
    for j in range(len(times)-1, 0, -1):
        t_cur, t_prev = times[j], times[j-1]
        dt = t_cur - t_prev
        th = 1.0 if steps_done < rannacher else theta
        t_mid = t_prev + th*dt

        r = float(r_func(t_mid)); q = float(q_func(t_mid))
        sig2 = float(sigma)*float(sigma)

        A = 0.5 * sig2 * (Si**2) / (dS**2)
        B = (r - q) * Si / (2.0*dS)

        # LHS (theta part)
        lower[:] = -th * dt * (A - B)   # length N
        diag[:]  =  1.0 + th * dt * (2.0*A + r)
        upper[:] = -th * dt * (A + B)

        # RHS ((1-theta) part)
        lower_r = (1.0 - th) * dt * (A - B)
        diag_r  = 1.0 - (1.0 - th) * dt * (2.0*A + r)
        upper_r = (1.0 - th) * dt * (A + B)

        # boundaries
        if cp.upper().startswith('C'):
            V0, VN = 0.0, S[-1]
        else:
            V0, VN = K, 0.0

        rhs[:] = lower_r * V[:-2] + diag_r * V[1:-1] + upper_r * V[2:]
        rhs[0]  -= lower[0] * V0
        rhs[-1] -= upper[-1] * VN

        # build banded tri-diagonal:
        # ab[0,1:] = upper[:-1], ab[1,:] = diag, ab[2,:-1] = lower[1:]
        ab[0, 1:] = upper[:-1]
        ab[1, :]  = diag
        ab[2, :-1]= lower[1:]
        # solve
        V_inner = solve_banded((1, 1), ab, rhs, overwrite_ab=True, overwrite_b=True, check_finite=False)

        V[0], V[-1] = V0, VN
        V[1:-1] = V_inner

        # early exercise
        if cp.upper().startswith('C'):
            np.maximum(V, S - K, out=V)
        else:
            np.maximum(V, K - S, out=V)

        # discrete dividend jump at t_prev
        if t_prev in div_times:
            D = div_amt[t_prev]
            S_shift = np.clip(S - D, 0.0, S[-1])
            V[:] = np.interp(S_shift, S, V)
            # re-enforce exercise right before ex-div
            if cp.upper().startswith('C'):
                np.maximum(V, S - K, out=V)
            else:
                np.maximum(V, K - S, out=V)

        if return_grid:
            V_grid[j-1] = V

        steps_done += 1

    price = float(np.interp(S0, S, V))
    debug = {"S_grid": S, "times": times}
    if return_grid:
        debug["V_grid"] = V_grid
    else:
        debug["V0"] = V
    return price, debug

In [3]:
# Basic parameters
S0 = 150.0         # Current stock price
K = 155.0          # Strike price
T = 0.5            # Time to expiration (6 months)
sigma = 0.30       # Volatility (30%)

# Rate functions
r_func = lambda t: 0.04    # 4% continuous risk-free rate
q_func = lambda t: 0.02    # 2% continuous dividend yield

# Optional parameters
dividends = None   # No discrete dividends
cp = 'C'          # Call option
NS = 600          # Higher space resolution
NT = 1200         # Higher time resolution
theta = 0.5       # Crank-Nicolson scheme
rannacher = 4     # More Rannacher steps for stability
Smax = None       # Auto-calculate max stock price
return_grid = True # Return full grid for analysis