### Upload Data

In [1]:
"""
Process Indonesian 10Y government bond yield history from:
  /mnt/data/Data Historis Imbal Hasil Obligasi Indonesia 10 Tahun (7).xlsx

What this script does:
1) Loads the Excel, auto-detects the date and yield columns (robust to header names).
2) Cleans the series (parses dates, converts yield to numeric, sorts, drops NA).
3) Computes rate-change shocks:
   - Daily (or native frequency) Δy
   - Monthly Δy (end-of-month)
   - Percentile shocks (e.g., 95%, 99% up; 5%, 1% down)
   - Worst window shocks (e.g., worst 1M / 3M change)
4) Prints a concise summary for use in stress-test design.
"""

from __future__ import annotations

import re
import numpy as np
import pandas as pd


FILEPATH = "/mnt/data/Data Historis Imbal Hasil Obligasi Indonesia 10 Tahun (7).xlsx"


def _read_first_nonempty_sheet(path: str) -> pd.DataFrame:
    xls = pd.ExcelFile(path)
    for sh in xls.sheet_names:
        df = pd.read_excel(path, sheet_name=sh)
        # consider "nonempty" if at least a few non-null cells exist
        if df.size > 0 and df.notna().sum().sum() > 10:
            df["_sheet_name"] = sh
            return df
    raise ValueError("No non-empty sheet found in the Excel file.")


def _guess_date_col(df: pd.DataFrame) -> str:
    # Prefer columns with typical date keywords
    candidates = []
    for c in df.columns:
        if not isinstance(c, str):
            continue
        cl = c.lower()
        if any(k in cl for k in ["date", "tanggal", "time", "periode", "period"]):
            candidates.append(c)
    if candidates:
        return candidates[0]

    # Otherwise, choose the column with the highest date-parsing success rate
    best_col = None
    best_score = -1
    for c in df.columns:
        if c == "_sheet_name":
            continue
        parsed = pd.to_datetime(df[c], errors="coerce", dayfirst=True)
        score = parsed.notna().mean()
        if score > best_score:
            best_score = score
            best_col = c
    if best_col is None or best_score < 0.2:
        raise ValueError("Could not confidently detect a date column.")
    return best_col


def _guess_yield_col(df: pd.DataFrame, date_col: str) -> str:
    # Prefer columns with yield keywords
    yield_kw = ["yield", "imbal", "hasil", "yld", "rate", "kupon", "return"]
    candidates = []
    for c in df.columns:
        if c in [date_col, "_sheet_name"]:
            continue
        if isinstance(c, str) and any(k in c.lower() for k in yield_kw):
            candidates.append(c)
    if candidates:
        return candidates[0]

    # Otherwise, choose the "most numeric" column (after coercion)
    best_col = None
    best_score = -1
    for c in df.columns:
        if c in [date_col, "_sheet_name"]:
            continue
        s = pd.to_numeric(df[c], errors="coerce")
        score = s.notna().mean()
        if score > best_score:
            best_score = score
            best_col = c
    if best_col is None or best_score < 0.2:
        raise ValueError("Could not confidently detect a yield column.")
    return best_col


def _to_numeric_yield(series: pd.Series) -> pd.Series:
    """
    Robust numeric conversion:
    - Handles '7,25' vs '7.25'
    - Handles percent strings like '7.25%' or '7,25 %'
    """
    s = series.astype(str).str.strip()
    s = s.replace({"": np.nan, "nan": np.nan, "None": np.nan, "-": np.nan})

    # Remove percent signs and spaces
    s = s.str.replace("%", "", regex=False).str.replace(" ", "", regex=False)

    # Convert comma decimal to dot decimal where appropriate
    # e.g., '7,25' -> '7.25'
    s = s.str.replace(r"(?<=\d),(?=\d)", ".", regex=True)

    out = pd.to_numeric(s, errors="coerce")
    return out


def prepare_yield_series(path: str) -> pd.DataFrame:
    raw = _read_first_nonempty_sheet(path)
    sheet = raw["_sheet_name"].iloc[0] if "_sheet_name" in raw.columns else "unknown"

    date_col = _guess_date_col(raw)
    y_col = _guess_yield_col(raw, date_col)

    df = raw[[date_col, y_col]].copy()
    df.columns = ["date", "yield_raw"]

    df["date"] = pd.to_datetime(df["date"], errors="coerce", dayfirst=True)
    df["y"] = _to_numeric_yield(df["yield_raw"])

    df = df.dropna(subset=["date", "y"]).sort_values("date").reset_index(drop=True)

    # Heuristic: if yields look like 0.07 rather than 7.0, scale to percent.
    # We'll keep both: y_pct (in %) and y_dec (in decimal)
    y_med = df["y"].median()
    if y_med < 1.0:
        df["y_dec"] = df["y"]
        df["y_pct"] = df["y"] * 100.0
    else:
        df["y_pct"] = df["y"]
        df["y_dec"] = df["y"] / 100.0

    df.attrs["sheet_name"] = sheet
    df.attrs["date_col_detected"] = date_col
    df.attrs["yield_col_detected"] = y_col
    return df


def compute_shocks(df: pd.DataFrame) -> dict:
    """
    Returns shocks in basis points (bps) for convenience.
    - Δy computed on y_dec (decimal), then converted to bps: 1bp = 0.0001
    """
    out = {}

    # Native frequency changes
    df = df.copy()
    df["dy"] = df["y_dec"].diff()
    dy = df["dy"].dropna()

    out["native"] = {
        "count": int(dy.shape[0]),
        "p95_up_bps": float(dy.quantile(0.95) / 1e-4),
        "p99_up_bps": float(dy.quantile(0.99) / 1e-4),
        "p05_down_bps": float(dy.quantile(0.05) / 1e-4),
        "p01_down_bps": float(dy.quantile(0.01) / 1e-4),
        "max_up_bps": float(dy.max() / 1e-4),
        "max_down_bps": float(dy.min() / 1e-4),
    }

    # Monthly end-of-month series and changes
    m = (
        df.set_index("date")[["y_dec"]]
        .resample("M")
        .last()
        .dropna()
        .rename(columns={"y_dec": "y_dec_m"})
    )
    m["dy_m"] = m["y_dec_m"].diff()
    dy_m = m["dy_m"].dropna()

    out["monthly"] = {
        "count": int(dy_m.shape[0]),
        "p95_up_bps": float(dy_m.quantile(0.95) / 1e-4),
        "p99_up_bps": float(dy_m.quantile(0.99) / 1e-4),
        "p05_down_bps": float(dy_m.quantile(0.05) / 1e-4),
        "p01_down_bps": float(dy_m.quantile(0.01) / 1e-4),
        "max_up_bps": float(dy_m.max() / 1e-4),
        "max_down_bps": float(dy_m.min() / 1e-4),
    }

    # Worst window shocks (e.g., 1M and 3M) using monthly data
    # Δ over k months: y(t) - y(t-k)
    for k in [1, 3, 6, 12]:
        dk = m["y_dec_m"].diff(k).dropna()
        out[f"worst_{k}m"] = {
            "worst_up_bps": float(dk.max() / 1e-4),
            "worst_down_bps": float(dk.min() / 1e-4),
        }

    return out


def print_summary(df: pd.DataFrame, shocks: dict) -> None:
    sheet = df.attrs.get("sheet_name", "unknown")
    print("=== DATASET SUMMARY ===")
    print(f"Sheet used           : {sheet}")
    print(f"Detected date col    : {df.attrs.get('date_col_detected')}")
    print(f"Detected yield col   : {df.attrs.get('yield_col_detected')}")
    print(f"Date range           : {df['date'].min().date()} to {df['date'].max().date()}")
    print(f"Observations         : {len(df):,}")
    print(f"Yield (median, %)    : {df['y_pct'].median():.3f}")

    print("\n=== SHOCKS (BASIS POINTS) ===")
    for block in ["native", "monthly"]:
        b = shocks[block]
        print(f"\n[{block.upper()} Δy]")
        print(f"  Count        : {b['count']:,}")
        print(f"  Up 95% / 99% : {b['p95_up_bps']:.1f} bps / {b['p99_up_bps']:.1f} bps")
        print(f"  Dn 5% / 1%   : {b['p05_down_bps']:.1f} bps / {b['p01_down_bps']:.1f} bps")
        print(f"  Max up/down  : {b['max_up_bps']:.1f} bps / {b['max_down_bps']:.1f} bps")

    for k in [1, 3, 6, 12]:
        b = shocks[f"worst_{k}m"]
        print(f"\n[WORST {k}M WINDOW (monthly)]")
        print(f"  Worst up     : {b['worst_up_bps']:.1f} bps")
        print(f"  Worst down   : {b['worst_down_bps']:.1f} bps")


if __name__ == "__main__":
    df = prepare_yield_series(FILEPATH)
    shocks = compute_shocks(df)
    print_summary(df, shocks)

    # Optional: export cleaned series for audit trail
    out_csv = "/mnt/data/indo_10y_yield_clean.csv"
    df[["date", "y_pct", "y_dec"]].to_csv(out_csv, index=False)
    print(f"\nSaved cleaned series to: {out_csv}")


ModuleNotFoundError: No module named 'numpy'