In [None]:
from __future__ import annotations

from datetime import date
import json
import numpy as np
import pandas as pd
import requests
import matplotlib.pyplot as plt

import pymc as pm
import arviz as az



In [None]:
BASE = "https://markets.newyorkfed.org"

START_DATE = "2015-01-01"    # adjust
END_DATE   = str(date.today())

# Outlier threshold in "sigma units" for tail probability
TAIL_K = 3.0

# Feature engineering
ROLL_VOL_WINDOW = 20

# Backtest controls (PyMC sampling is expensive)
TRAIN_WINDOW_DAYS = 365 * 5
STEP_DAYS = 10               # evaluate every ~2 weeks for speed
DRAWS = 300
TUNE  = 300


In [None]:
def fetch_json(url: str, params: dict | None = None) -> dict:
    r = requests.get(url, params=params, timeout=60)
    r.raise_for_status()
    return r.json()

def find_records_list(obj: dict) -> list[dict]:
    """
    NY Fed responses usually contain a list of records somewhere.
    This tries common keys and falls back to first list-of-dicts found.
    """
    common_keys = ["refRates", "rates", "data", "results", "observations"]
    for k in common_keys:
        v = obj.get(k)
        if isinstance(v, list) and (len(v) == 0 or isinstance(v[0], dict)):
            return v
    for v in obj.values():
        if isinstance(v, list) and (len(v) == 0 or isinstance(v[0], dict)):
            return v
    raise ValueError(f"Could not find records list in response keys={list(obj.keys())}")

def records_to_series(records: list[dict], name: str) -> pd.Series:
    """
    Infer date+value fields for simple rate series.
    """
    if not records:
        return pd.Series(dtype=float, name=name)

    date_keys = ["effectiveDate", "date", "asOfDate", "timestamp"]
    val_keys  = ["percentRate", "rate", "value", "pctRate"]

    sample = records[0]
    dk = next((k for k in date_keys if k in sample), None)
    vk = next((k for k in val_keys  if k in sample), None)

    if dk is None or vk is None:
        raise ValueError(f"Unknown schema for {name}. Sample keys={list(sample.keys())}")

    df = pd.DataFrame(records)
    idx = pd.to_datetime(df[dk]).dt.tz_localize(None)
    s = pd.to_numeric(df[vk], errors="coerce")
    out = pd.Series(s.values, index=idx, name=name).sort_index()
    out = out[~out.index.duplicated(keep="last")]
    return out


In [None]:
# SOFR endpoint pattern
sofr_url = f"{BASE}/api/rates/secured/sofr/search.json"
sofr_json = fetch_json(sofr_url, params={"startDate": START_DATE, "endDate": END_DATE})
SOFR = records_to_series(find_records_list(sofr_json), "SOFR")

# EFFR endpoint pattern
effr_url = f"{BASE}/api/rates/unsecured/effr/search.json"
effr_json = fetch_json(effr_url, params={"startDate": START_DATE, "endDate": END_DATE})
EFFR = records_to_series(find_records_list(effr_json), "EFFR")

print("SOFR head:\n", SOFR.dropna().head(3))
print("\nEFFR head:\n", EFFR.dropna().head(3))


In [None]:
def fetch_rp_results(start_date: str, end_date: str, operation_types: str = "repo,reverserepo") -> list[dict]:
    """
    Pull repo + reverse repo operation results in one call.
    operationTypes examples: "repo", "reverserepo", or "repo,reverserepo"
    """
    url = f"{BASE}/api/rp/results/search.json"
    params = {
        "startDate": start_date,
        "endDate": end_date,
        "operationTypes": operation_types,
    }
    js = fetch_json(url, params=params)

    # Many RP responses look like: { "repo": { "operations": [...] } }
    # We'll try common locations:
    if "repo" in js and isinstance(js["repo"], dict):
        for k in ["operations", "results", "data"]:
            if k in js["repo"] and isinstance(js["repo"][k], list):
                return js["repo"][k]

    # fallback: search within response
    try:
        return find_records_list(js)
    except Exception:
        raise ValueError(f"Could not locate operations list in rp response. keys={list(js.keys())}")

def rp_records_to_daily_volume(records: list[dict], name: str = "RP_VOLUME") -> pd.Series:
    """
    Aggregate daily accepted amount across operations.
    Attempts to infer:
      - date field: operationDate / date / effectiveDate
      - amount field: totalAmtAccepted / amountAccepted / totalAmount / amount
    """
    if not records:
        return pd.Series(dtype=float, name=name)

    sample = records[0]
    date_keys = ["operationDate", "date", "effectiveDate", "asOfDate"]
    amt_keys  = ["totalAmtAccepted", "amountAccepted", "totalAmount", "amount", "amt"]

    dk = next((k for k in date_keys if k in sample), None)
    ak = next((k for k in amt_keys  if k in sample), None)

    if dk is None or ak is None:
        print("Sample RP record:\n", json.dumps(sample, indent=2)[:2000])
        raise ValueError(f"Unknown RP schema. Need date+amount keys. sample_keys={list(sample.keys())}")

    df = pd.DataFrame(records).copy()
    df[dk] = pd.to_datetime(df[dk]).dt.tz_localize(None)
    df[ak] = pd.to_numeric(df[ak], errors="coerce")

    # daily sum of accepted amounts (repo + reverse repo together)
    daily = df.groupby(df[dk].dt.normalize())[ak].sum().sort_index()
    daily.name = name
    return daily

rp_records = fetch_rp_results(START_DATE, END_DATE, operation_types="repo,reverserepo")
RP_VOL = rp_records_to_daily_volume(rp_records, name="RP_VOLUME_TOTAL")

print("RP_VOL head:\n", RP_VOL.dropna().head(3))
print("RP_VOL tail:\n", RP_VOL.dropna().tail(3))


In [None]:
df = pd.concat([SOFR, EFFR, RP_VOL], axis=1).dropna()

# Price feature
df["SPREAD_SOFR_EFFR"] = df["SOFR"] - df["EFFR"]
df["SPREAD_VOL_20D"] = df["SPREAD_SOFR_EFFR"].rolling(ROLL_VOL_WINDOW).std()

# Quantity feature (stabilize with log; volumes can be huge / regime-shifting)
df["RP_LOG"] = np.log1p(df["RP_VOLUME_TOTAL"])
df["RP_LOG_CHG_5D"] = df["RP_LOG"].diff(5)
df["RP_LOG_VOL_20D"] = df["RP_LOG"].rolling(ROLL_VOL_WINDOW).std()

df = df.dropna()
df.tail()


In [None]:
plt.figure()
df[["SOFR", "EFFR"]].plot(title="SOFR vs EFFR")
plt.show()

plt.figure()
df["SPREAD_SOFR_EFFR"].plot(title="Spread: SOFR - EFFR")
plt.axhline(0.0)
plt.show()

plt.figure()
df["RP_LOG"].plot(title="Repo/Reverse Repo Volume (log1p aggregated)")
plt.show()


In [None]:
def fit_student_t_outlier_prob(
    y_train: np.ndarray,
    y_today: float,
    tail_k: float = 3.0,
    draws: int = 300,
    tune: int = 300,
):
    # Robust prior scale
    med = float(np.median(y_train))
    mad = float(np.median(np.abs(y_train - med))) + 1e-6
    scale = 1.4826 * mad

    with pm.Model() as m:
        mu = pm.Normal("mu", mu=med, sigma=5 * scale)
        sigma = pm.HalfNormal("sigma", sigma=5 * scale)
        nu = pm.Exponential("nu", lam=1/10) + 2  # heavy tails

        pm.StudentT("obs", nu=nu, mu=mu, sigma=sigma, observed=y_train)
        idata = pm.sample(draws=draws, tune=tune, chains=4, target_accept=0.9, progressbar=True)

    mu_s = idata.posterior["mu"].values.reshape(-1)
    sig_s = idata.posterior["sigma"].values.reshape(-1)

    z = (y_today - mu_s) / sig_s
    p_tail = float(np.mean(np.abs(z) > tail_k))
    return p_tail, idata


In [None]:
def walk_forward_p_tail(
    y: pd.Series,
    tail_k: float,
    train_window_days: int = 365*5,
    step: int = 10,
    draws: int = 300,
    tune: int = 300,
) -> pd.Series:
    y = y.dropna()
    out = []

    for i in range(train_window_days, len(y), step):
        window = y.iloc[i-train_window_days : i+1]
        y_train = window.iloc[:-1].values.astype(float)
        y_today = float(window.iloc[-1])

        try:
            p_tail, _ = fit_student_t_outlier_prob(y_train, y_today, tail_k=tail_k, draws=draws, tune=tune)
        except Exception:
            p_tail = np.nan

        out.append((y.index[i], p_tail))

    return pd.Series([v for _, v in out], index=[d for d, _ in out], name=f"p_tail_{y.name}")

# Two separate models
p_spread = walk_forward_p_tail(df["SPREAD_SOFR_EFFR"], TAIL_K, TRAIN_WINDOW_DAYS, STEP_DAYS, DRAWS, TUNE)
p_repo   = walk_forward_p_tail(df["RP_LOG_CHG_5D"], TAIL_K, TRAIN_WINDOW_DAYS, STEP_DAYS, DRAWS, TUNE)

p_spread.dropna().tail(), p_repo.dropna().tail()


In [None]:
p = pd.concat([p_spread, p_repo], axis=1).dropna()
p.columns = ["p_spread", "p_repo"]

p["p_stress_indep"] = 1.0 - (1.0 - p["p_spread"]) * (1.0 - p["p_repo"])
p["p_stress_max"]   = np.maximum(p["p_spread"], p["p_repo"])

p.tail()


In [None]:
plt.figure()
p[["p_spread", "p_repo"]].plot(title="Outlier probabilities (walk-forward)")
plt.axhline(0.10, linestyle="--")
plt.axhline(0.25, linestyle="--")
plt.show()

plt.figure()
p[["p_stress_indep", "p_stress_max"]].plot(title="Combined stress probability")
plt.axhline(0.25, linestyle="--")
plt.axhline(0.50, linestyle="--")
plt.axhline(0.80, linestyle="--")
plt.show()


In [None]:
stress_windows = [
    ("2019-09-10", "2019-10-15", "Repo spike"),
    ("2020-02-20", "2020-05-15", "COVID"),
    ("2023-03-08", "2023-05-01", "Regional banks"),
]

fig = plt.figure()
ax = p["p_stress_indep"].plot(title="Combined stress probability with stress windows")

for s, e, label in stress_windows:
    ax.axvspan(pd.to_datetime(s), pd.to_datetime(e), alpha=0.2)
ax.axhline(0.80, linestyle="--")
plt.show()
