# Auditable FRED/ALFRED Balanced Snapshot (125 series)
This notebook builds a month-end balanced snapshot from the Fan et al. 125-series list with clear mapping, availability diagnostics, and source tracking.

## Outputs
- `snapshot_raw`: clean paper-mnemonic dataset (125 columns)
- `snapshot_tcode`: transformed dataset via McCracken-Ng tcodes
- `snapshot_obsdate`: last observation date used for each decision date/series
- `snapshot_source`: source used per cell (`ALFRED`, `FRED_FALLBACK`, `COMPUTED_*`, missing)
- Metadata/audit sheets: mapping, availability, coverage, revision diagnostics, issues


In [None]:
import os, time, subprocess
import numpy as np
import pandas as pd
import requests

try:
    from IPython.display import display
except Exception:
    def display(x):
        print(x)

# ---------- Config ----------
API_KEY = os.getenv("FRED_API_KEY") or "3c268be920acd9693b77b400b0a95cf2"
BASE = "https://api.stlouisfed.org/fred"
SERIES_CSV = "data/fan_et_al_FREDMD_series.csv"
OUT_XLSX = "data/fred_balanced_snapshot_auditable.xlsx"
START, END = "1999-01-01", "2025-12-31"
STRICT_ALFRED = True          # True: never use revised FRED fallback when ALFRED is missing
CHUNK_MONTHS, PAD_MONTHS = 24, 24
PROBE_VINTAGES = "2000-01-31,2000-02-29"
S = requests.Session()


def month_ends(start, end):
    try:
        return pd.date_range(start, end, freq="ME")
    except Exception:
        return pd.date_range(start, end, freq="M")


def jget(endpoint, **params):
    params = dict(params, api_key=API_KEY, file_type="json")
    backoff = 0.5
    for _ in range(8):
        r = S.get(f"{BASE}/{endpoint}", params=params, timeout=30)
        if r.status_code == 200:
            return r.json()
        if r.status_code in (429, 500, 502, 503, 504):
            time.sleep(backoff)
            backoff = min(backoff * 2, 20)
            continue
        raise RuntimeError(f"{r.status_code}: {r.text[:320]}")
    raise RuntimeError("retry_exhausted")


def try_jget(endpoint, **params):
    try:
        return jget(endpoint, **params), ""
    except RuntimeError as e:
        return None, str(e)


def month_start(ts):
    return ts.to_period("M").start_time.normalize()

print(f"API key loaded: {'yes' if API_KEY else 'no'}")


In [None]:
# ---------- 1) Load and resolve mapping ----------
a1 = pd.read_csv(SERIES_CSV)
a1.columns = [c.strip() for c in a1.columns]
a1["fred_raw"] = a1["fred_raw"].astype(str).str.strip()
a1["tcode"] = pd.to_numeric(a1["tcode"], errors="raise").astype(int)

OVERRIDES = {
    "S&P500": "SP500",
    "COMPAPFF": "CPFFM",
    "COMPAPFFx": "CPFFM",
}
CONSTRUCTED = {
    "CONSPI": ("NONREVSL", "PI"),
    "HWIURATIO": ("HWI", "UNEMPLOY"),
}

a1["series_id"] = a1["fred_raw"].replace(OVERRIDES)
xmask = a1["series_id"].str.endswith("x", na=False)
a1.loc[xmask, "series_id"] = a1.loc[xmask, "series_id"].str[:-1]
a1["is_constructed"] = a1["fred_raw"].isin(CONSTRUCTED)
a1.loc[a1["is_constructed"], "series_id"] = pd.NA

deps = sorted({d for ds in CONSTRUCTED.values() for d in ds if d != "HWI"})
series_ids = list(dict.fromkeys(a1["series_id"].dropna().tolist() + deps))

# tcode map in both namespaces (paper and mapped API id)
tcode_by_raw = dict(zip(a1["fred_raw"], a1["tcode"]))
tcode_by_series = (
    a1.dropna(subset=["series_id"]).drop_duplicates("series_id").set_index("series_id")["tcode"].to_dict()
)

mapping_cols = [c for c in ["no", "fred_raw", "series_id", "tcode", "description", "group", "is_constructed"] if c in a1.columns]
mapping_df = a1[mapping_cols].copy()

print("Rows in paper mapping:", len(a1))
print("Mapped API series ids:", len(series_ids))
print("Constructed series:", a1.loc[a1["is_constructed"], "fred_raw"].tolist())
print("Override examples:")
display(mapping_df.loc[mapping_df["fred_raw"] != mapping_df["series_id"], ["fred_raw", "series_id"]].head(20))


In [None]:
# ---------- 2) Availability diagnostics (FRED vs ALFRED) ----------
def classify_availability(series_id):
    _, e1 = try_jget("series", series_id=series_id)
    if e1:
        if "Invalid value for variable series_id" in e1:
            return "INVALID_SERIES_ID", e1
        if "does not exist" in e1:
            return "NOT_IN_FRED", e1
        return "SERIES_ERROR", e1

    _, e2 = try_jget(
        "series/observations",
        series_id=series_id,
        output_type=2,
        vintage_dates=PROBE_VINTAGES,
        observation_start="1999-01-01",
        observation_end="2000-02-29",
        sort_order="asc",
        limit=100000,
    )
    if not e2:
        return "ALFRED_OK", ""
    if "does not exist in ALFRED" in e2:
        return "FRED_ONLY_NO_ALFRED", e2
    return "OBS_ERROR", e2


rows = []
for i, sid in enumerate(series_ids, 1):
    status, detail = classify_availability(sid)
    rows.append((sid, status, detail))
    if i % 25 == 0:
        print(f"availability progress: {i}/{len(series_ids)}")
    time.sleep(0.05)

availability_df = pd.DataFrame(rows, columns=["series_id", "availability", "detail"])

# Attach availability to each paper mnemonic row
mapping_audit_df = mapping_df.merge(availability_df, how="left", on="series_id")
print("Availability counts:")
print(availability_df["availability"].value_counts())
display(mapping_audit_df.head(10))


In [None]:
# ---------- 3) Download balanced snapshot with source tracking ----------
idx = pd.to_datetime(month_ends(START, END))
X = pd.DataFrame(index=idx, columns=series_ids, dtype="float64")
X_obs = pd.DataFrame(index=idx, columns=series_ids, dtype="datetime64[ns]")
X_src = pd.DataFrame(index=idx, columns=series_ids, dtype="object")
issues = []


def fetch_series_chunk(sid, vints, obs_start, obs_end):
    js, err = try_jget(
        "series/observations",
        series_id=sid,
        output_type=2,
        vintage_dates=",".join(vints),
        observation_start=obs_start,
        observation_end=obs_end,
        sort_order="asc",
        limit=100000,
    )
    if err:
        if "does not exist in ALFRED" in err:
            if STRICT_ALFRED:
                return {}, {}, {}, "MISSING_ALFRED"
            js, err2 = try_jget(
                "series/observations",
                series_id=sid,
                observation_start=obs_start,
                observation_end=obs_end,
                sort_order="asc",
                limit=100000,
            )
            if err2:
                return {}, {}, {}, f"FRED_ERROR: {err2[:120]}"
            df = pd.DataFrame(js.get("observations", []) or [])
            if not {"date", "value"} <= set(df.columns):
                return {}, {}, {}, "FRED_BAD_SHAPE"
            df["date"] = pd.to_datetime(df["date"], errors="coerce")
            df["value"] = pd.to_numeric(df["value"].replace(".", np.nan), errors="coerce")
            df = df.dropna(subset=["date", "value"]).sort_values("date")
            vals, obs, src = {}, {}, {}
            for v in vints:
                d = pd.to_datetime(v)
                g = df[df["date"] <= d]
                if g.empty:
                    continue
                last = g.iloc[-1]
                vals[d], obs[d], src[d] = float(last["value"]), last["date"], "FRED_FALLBACK"
            return vals, obs, src, ""
        return {}, {}, {}, f"ALFRED_ERROR: {err[:120]}"

    # ALFRED output_type=2 => date + one column per vintage
    df = pd.DataFrame(js.get("observations", []) or [])
    if "date" not in df.columns:
        return {}, {}, {}, "ALFRED_BAD_SHAPE"

    df["date"] = pd.to_datetime(df["date"], errors="coerce")
    vals, obs, src = {}, {}, {}
    for v in vints:
        d = pd.to_datetime(v)
        col = f"{sid}_{d.strftime('%Y%m%d')}"
        if col not in df.columns:
            continue
        g = pd.DataFrame({
            "date": df["date"],
            "value": pd.to_numeric(df[col].replace(".", np.nan), errors="coerce"),
        }).dropna(subset=["date", "value"])
        if g.empty:
            continue
        last = g.iloc[-1]
        vals[d], obs[d], src[d] = float(last["value"]), last["date"], "ALFRED"
    return vals, obs, src, ""


for i0 in range(0, len(idx), CHUNK_MONTHS):
    chunk = idx[i0:i0 + CHUNK_MONTHS]
    vints = [d.strftime("%Y-%m-%d") for d in chunk]
    obs_start = (month_start(chunk[0]) - pd.DateOffset(months=PAD_MONTHS)).strftime("%Y-%m-%d")
    obs_end = chunk[-1].strftime("%Y-%m-%d")
    print(f"chunk {i0 // CHUNK_MONTHS + 1}: {vints[0]}..{vints[-1]}")

    for sid in series_ids:
        vals, obsd, srcd, issue = fetch_series_chunk(sid, vints, obs_start, obs_end)
        if issue:
            issues.append({"series_id": sid, "stage": "fetch", "issue": issue})
        for d, v in vals.items():
            X.loc[d, sid] = v
            X_obs.loc[d, sid] = obsd[d]
            X_src.loc[d, sid] = srcd[d]

print("Built API-level matrices:", X.shape)
print("Cell source counts:")
print(X_src.stack().value_counts(dropna=False).head(10))


In [None]:
# ---------- 4) Build paper-level dataset (125 columns) ----------
X_paper = pd.DataFrame(index=idx)
X_paper_obs = pd.DataFrame(index=idx)
X_paper_src = pd.DataFrame(index=idx)

for _, r in a1.iterrows():
    raw, sid = r["fred_raw"], r["series_id"]
    if pd.notna(sid) and sid in X.columns:
        X_paper[raw] = X[sid]
        X_paper_obs[raw] = X_obs[sid]
        X_paper_src[raw] = X_src[sid]
    else:
        X_paper[raw] = np.nan
        X_paper_obs[raw] = pd.NaT
        X_paper_src[raw] = "MISSING_MAPPING"

# Constructed features (when dependencies exist)
if {"NONREVSL", "PI"} <= set(X.columns):
    X_paper["CONSPI"] = X["NONREVSL"] / X["PI"]
    X_paper_obs["CONSPI"] = pd.concat([X_obs["NONREVSL"], X_obs["PI"]], axis=1).max(axis=1)
    same = X_src["NONREVSL"].eq(X_src["PI"])
    X_paper_src["CONSPI"] = np.where(same, "COMPUTED_" + X_src["NONREVSL"].fillna("MISSING"), "COMPUTED_MIXED")
else:
    issues.append({"series_id": "CONSPI", "stage": "construct", "issue": "missing NONREVSL or PI"})

if {"HWI", "UNEMPLOY"} <= set(X.columns):
    X_paper["HWIURATIO"] = X["HWI"] / X["UNEMPLOY"]
    X_paper_obs["HWIURATIO"] = pd.concat([X_obs["HWI"], X_obs["UNEMPLOY"]], axis=1).max(axis=1)
    same = X_src["HWI"].eq(X_src["UNEMPLOY"])
    X_paper_src["HWIURATIO"] = np.where(same, "COMPUTED_" + X_src["HWI"].fillna("MISSING"), "COMPUTED_MIXED")
else:
    issues.append({"series_id": "HWIURATIO", "stage": "construct", "issue": "missing HWI or UNEMPLOY"})


# Diagnostics
coverage_by_series = pd.DataFrame({
    "non_null_count": X_paper.notna().sum(),
    "coverage_share": X_paper.notna().mean(),
}).sort_values(["coverage_share", "non_null_count"])
coverage_by_date = pd.DataFrame({"coverage_share": X_paper.notna().mean(axis=1)})


def rev_new_counts(v, d):
    rev = d.eq(d.shift(1)) & v.ne(v.shift(1))
    new = d.gt(d.shift(1))
    return int(rev.sum()), int(new.sum())

rev_new_diag = pd.DataFrame(
    [rev_new_counts(X_paper[c], X_paper_obs[c]) for c in X_paper.columns],
    index=X_paper.columns,
    columns=["revision_events", "new_obs_events"],
)

source_by_series = X_paper_src.apply(lambda s: s.value_counts(dropna=False)).fillna(0).astype(int)

# Add high-impact edge cases to issue table
for sid in availability_df.loc[availability_df["availability"].isin(["INVALID_SERIES_ID", "NOT_IN_FRED", "FRED_ONLY_NO_ALFRED"]), "series_id"]:
    issues.append({"series_id": sid, "stage": "availability", "issue": availability_df.set_index("series_id").loc[sid, "availability"]})
for sid in coverage_by_series.index[coverage_by_series["coverage_share"] < 0.2]:
    issues.append({"series_id": sid, "stage": "coverage", "issue": "coverage_share < 0.20"})

issues_df = pd.DataFrame(issues).drop_duplicates().sort_values(["stage", "series_id"]).reset_index(drop=True)

print("Paper-level shape:", X_paper.shape)
print("Lowest coverage series:")
display(coverage_by_series.head(12))
print("Issue count:", len(issues_df))
display(issues_df.head(20))


## Edge Cases / Weaknesses to Watch
- `FRED_ONLY_NO_ALFRED`: series exists in FRED but has no historical vintages, so strict non-revised snapshots are structurally sparse.
- `INVALID_SERIES_ID`: legacy paper names that are not valid API IDs must be mapped manually.
- Constructed series (`CONSPI`, `HWIURATIO`) depend on component availability and can be partially missing.
- Even for `ALFRED_OK` series, some decision dates can miss a vintage column/value and reduce coverage.
- If `STRICT_ALFRED=False`, revised values enter through `FRED_FALLBACK` and must be filtered in downstream analysis when non-revised purity is required.


In [None]:
# ---------- 5) Transform + write auditable workbook ----------
def apply_tcodes(df, tcode_map):
    out = pd.DataFrame(index=df.index)
    for col, x in df.items():
        x = pd.to_numeric(x, errors="coerce")
        t = int(tcode_map.get(col, 1))
        if t == 1: y = x
        elif t == 2: y = x.diff()
        elif t == 3: y = x.diff().diff()
        elif t == 4: y = np.log(x.where(x > 0))
        elif t == 5: y = np.log(x.where(x > 0)).diff()
        elif t == 6: y = np.log(x.where(x > 0)).diff().diff()
        elif t == 7: y = (x / x.shift(1) - 1.0).diff()
        else:
            y = np.nan
            issues.append({"series_id": col, "stage": "tcode", "issue": f"unknown tcode {t}"})
        out[col] = y
    return out


X_t = apply_tcodes(X_paper, tcode_by_raw)

run_meta = pd.DataFrame([
    ("run_utc", pd.Timestamp.utcnow().isoformat()),
    ("branch", subprocess.check_output(["git", "branch", "--show-current"], text=True).strip()),
    ("series_csv", SERIES_CSV),
    ("start", START),
    ("end", END),
    ("strict_alfred", STRICT_ALFRED),
    ("chunk_months", CHUNK_MONTHS),
    ("pad_months", PAD_MONTHS),
    ("probe_vintages", PROBE_VINTAGES),
], columns=["key", "value"])

source_legend = pd.DataFrame([
    ("ALFRED", "Non-revised real-time vintage value"),
    ("FRED_FALLBACK", "Revised/latest fallback (only when STRICT_ALFRED=False)"),
    ("COMPUTED_*", "Constructed series from dependencies"),
    ("MISSING_MAPPING", "No mapped source series id"),
], columns=["source", "meaning"])

with pd.ExcelWriter(OUT_XLSX, engine="openpyxl") as w:
    run_meta.to_excel(w, sheet_name="run_metadata", index=False)
    source_legend.to_excel(w, sheet_name="source_legend", index=False)
    mapping_df.to_excel(w, sheet_name="series_mapping", index=False)
    availability_df.to_excel(w, sheet_name="series_availability", index=False)
    mapping_audit_df.to_excel(w, sheet_name="mapping_audit", index=False)
    issues_df.to_excel(w, sheet_name="issues", index=False)

    X_paper.reset_index(names="decision_date").to_excel(w, sheet_name="snapshot_raw", index=False)
    X_paper_obs.reset_index(names="decision_date").to_excel(w, sheet_name="snapshot_obsdate", index=False)
    X_paper_src.reset_index(names="decision_date").to_excel(w, sheet_name="snapshot_source", index=False)
    X_t.reset_index(names="decision_date").to_excel(w, sheet_name="snapshot_tcode", index=False)

    coverage_by_series.reset_index(names="series_id").to_excel(w, sheet_name="coverage_by_series", index=False)
    coverage_by_date.reset_index(names="decision_date").to_excel(w, sheet_name="coverage_by_date", index=False)
    rev_new_diag.reset_index(names="series_id").to_excel(w, sheet_name="rev_new_diag", index=False)
    source_by_series.reset_index(names="series_id").to_excel(w, sheet_name="source_by_series", index=False)

print("Wrote:", OUT_XLSX)
print("Final shape raw/tcode:", X_paper.shape, X_t.shape)
print("Availability summary:")
print(availability_df["availability"].value_counts())


## 6) Post-process Existing Workbook: Latest-Vintage Backfill
This section does **not** rerun the full ALFRED snapshot pull. It reads the existing Excel output and fills missing cells by:
- using the **latest ALFRED vintage** when vintages exist,
- otherwise using **latest FRED** for `FRED_ONLY_NO_ALFRED` series.

All filled cells are source-labeled and diagnostics are written to new sheets.


In [None]:
import os, time
import numpy as np
import pandas as pd
import requests

API_KEY = os.getenv("FRED_API_KEY") or "3c268be920acd9693b77b400b0a95cf2"
BASE = "https://api.stlouisfed.org/fred"
IN_XLSX = "data/fred_balanced_snapshot_auditable.xlsx"
OUT_XLSX = "data/fred_balanced_snapshot_auditable_latest_vintage_backfill.xlsx"
WRITE_IN_PLACE = False  # set True to overwrite IN_XLSX
S = requests.Session()


def jget(endpoint, **params):
    params = dict(params, api_key=API_KEY, file_type="json")
    backoff = 0.5
    for _ in range(8):
        r = S.get(f"{BASE}/{endpoint}", params=params, timeout=30)
        if r.status_code == 200:
            return r.json()
        if r.status_code in (429, 500, 502, 503, 504):
            time.sleep(backoff)
            backoff = min(backoff * 2, 20)
            continue
        raise RuntimeError(f"{r.status_code}: {r.text[:320]}")
    raise RuntimeError("retry_exhausted")


def try_jget(endpoint, **params):
    try:
        return jget(endpoint, **params), ""
    except RuntimeError as e:
        return None, str(e)


def get_vintages(series_id):
    js, err = try_jget("series/vintagedates", series_id=series_id, limit=10000)
    if err:
        return [], err
    return js.get("vintage_dates", []) or [], ""


def get_history(series_id, obs_start, obs_end, realtime_date=None):
    p = dict(
        series_id=series_id,
        observation_start=obs_start,
        observation_end=obs_end,
        sort_order="asc",
        limit=100000,
    )
    if realtime_date is not None:
        p["realtime_start"] = realtime_date
        p["realtime_end"] = realtime_date

    js, err = try_jget("series/observations", **p)
    if err:
        return pd.DataFrame(columns=["date", "value"]), err

    df = pd.DataFrame(js.get("observations", []) or [])
    if not {"date", "value"} <= set(df.columns):
        return pd.DataFrame(columns=["date", "value"]), "bad_shape"

    df["date"] = pd.to_datetime(df["date"], errors="coerce")
    df["value"] = pd.to_numeric(df["value"].replace(".", np.nan), errors="coerce")
    df = df.dropna(subset=["date", "value"]).sort_values("date")
    return df[["date", "value"]], ""


# Load base workbook outputs
base_xls = pd.ExcelFile(IN_XLSX)
all_sheets = {s: pd.read_excel(IN_XLSX, sheet_name=s) for s in base_xls.sheet_names}

raw_b = all_sheets["snapshot_raw"].copy()
obs_b = all_sheets["snapshot_obsdate"].copy()
src_b = all_sheets["snapshot_source"].copy()
# source labels may load as float64 when mostly NaN; force object before writing strings
src_cols = [c for c in src_b.columns if c != "decision_date"]
src_b[src_cols] = src_b[src_cols].astype("object")

map_df = all_sheets["series_mapping"].copy()
avail_df = all_sheets.get("series_availability", pd.DataFrame(columns=["series_id", "availability"]))

raw_before = raw_b.copy()
obs_before = obs_b.copy()
src_before = src_b.copy()

decision = pd.to_datetime(raw_b["decision_date"])
obs_start = (decision.min() - pd.DateOffset(years=2)).strftime("%Y-%m-%d")
obs_end = decision.max().strftime("%Y-%m-%d")

series_cols = [c for c in raw_b.columns if c != "decision_date"]
raw_to_sid = map_df.drop_duplicates("fred_raw").set_index("fred_raw")["series_id"].to_dict()
avail_map = avail_df.drop_duplicates("series_id").set_index("series_id")["availability"].to_dict() if len(avail_df) else {}

for c in series_cols:
    obs_b[c] = pd.to_datetime(obs_b[c], errors="coerce")


In [12]:
diag_rows = []

for i, raw_col in enumerate(series_cols, 1):
    sid = raw_to_sid.get(raw_col)
    availability = avail_map.get(sid, "UNKNOWN")

    x_before = pd.to_numeric(raw_b[raw_col], errors="coerce")
    miss_before = x_before.isna()
    filled_cells = 0
    fill_method = "NONE"
    ref_date = pd.NaT
    first_vintage = pd.NaT
    last_vintage = pd.NaT
    vintage_count = np.nan
    hist_obs_count = 0
    err = ""

    if miss_before.any():
        if not isinstance(sid, str) or not sid.strip():
            fill_method = "SKIP_NO_MAPPING"
        elif availability in {"INVALID_SERIES_ID", "NOT_IN_FRED"}:
            fill_method = f"SKIP_{availability}"
        else:
            use_realtime = None
            if availability == "FRED_ONLY_NO_ALFRED":
                fill_method = "BACKFILL_LATEST_FRED"
            else:
                vintages, v_err = get_vintages(sid)
                if vintages:
                    fill_method = "BACKFILL_LATEST_VINTAGE"
                    vintage_count = len(vintages)
                    first_vintage = pd.to_datetime(vintages[0])
                    last_vintage = pd.to_datetime(vintages[-1])
                    ref_date = last_vintage
                    use_realtime = vintages[-1]
                else:
                    fill_method = "BACKFILL_LATEST_FRED_IF_NO_VINTAGE"
                    err = v_err

            hist, h_err = get_history(sid, obs_start, obs_end, realtime_date=use_realtime)
            hist_obs_count = len(hist)
            if h_err:
                err = (err + " | " + h_err).strip(" |") if err else h_err
            elif len(hist):
                asof = pd.merge_asof(
                    pd.DataFrame({"decision_date": decision}),
                    hist.rename(columns={"date": "obs_date"}),
                    left_on="decision_date",
                    right_on="obs_date",
                    direction="backward",
                )
                can_fill = miss_before & asof["value"].notna()
                filled_cells = int(can_fill.sum())
                if filled_cells:
                    raw_b.loc[can_fill, raw_col] = asof.loc[can_fill, "value"].values
                    obs_b.loc[can_fill, raw_col] = asof.loc[can_fill, "obs_date"].values
                    src_b.loc[can_fill, raw_col] = fill_method

        time.sleep(0.03)

    x_after = pd.to_numeric(raw_b[raw_col], errors="coerce")
    miss_after = x_after.isna()
    filled_idx = miss_before & x_after.notna()

    diag_rows.append({
        "fred_raw": raw_col,
        "series_id": sid,
        "availability": availability,
        "fill_method": fill_method,
        "reference_realtime_date": ref_date,
        "vintage_count": vintage_count,
        "first_vintage": first_vintage,
        "last_vintage": last_vintage,
        "hist_obs_count": hist_obs_count,
        "non_null_before": int(x_before.notna().sum()),
        "non_null_after": int(x_after.notna().sum()),
        "filled_cells": filled_cells,
        "still_missing_after": int(miss_after.sum()),
        "first_decision_with_data_before": decision[x_before.notna()].min() if x_before.notna().any() else pd.NaT,
        "first_decision_with_data_after": decision[x_after.notna()].min() if x_after.notna().any() else pd.NaT,
        "first_filled_decision_date": decision[filled_idx].min() if filled_idx.any() else pd.NaT,
        "last_filled_decision_date": decision[filled_idx].max() if filled_idx.any() else pd.NaT,
        "error": err,
    })

    if i % 25 == 0:
        print(f"backfill progress: {i}/{len(series_cols)}")


diag_df = pd.DataFrame(diag_rows).sort_values(["filled_cells", "still_missing_after"], ascending=[False, False])

summary_df = (
    diag_df.groupby("fill_method", dropna=False)
    .agg(
        series_count=("fred_raw", "count"),
        total_filled_cells=("filled_cells", "sum"),
        total_still_missing=("still_missing_after", "sum"),
        avg_vintage_count=("vintage_count", "mean"),
    )
    .reset_index()
    .sort_values("total_filled_cells", ascending=False)
)

# transformed output for backfilled raw
tcode_by_raw = map_df.drop_duplicates("fred_raw").set_index("fred_raw")["tcode"].to_dict()

def apply_tcodes(df, tcodes):
    out = pd.DataFrame(index=df.index)
    for col, x in df.items():
        x = pd.to_numeric(x, errors="coerce")
        t = int(tcodes.get(col, 1))
        if t == 1: y = x
        elif t == 2: y = x.diff()
        elif t == 3: y = x.diff().diff()
        elif t == 4: y = np.log(x.where(x > 0))
        elif t == 5: y = np.log(x.where(x > 0)).diff()
        elif t == 6: y = np.log(x.where(x > 0)).diff().diff()
        elif t == 7: y = (x / x.shift(1) - 1.0).diff()
        else: y = np.nan
        out[col] = y
    return out

Xb_t = apply_tcodes(raw_b.drop(columns=["decision_date"]), tcode_by_raw)
Xb_t.insert(0, "decision_date", decision)

run_meta = pd.DataFrame([
    ("input_workbook", IN_XLSX),
    ("output_workbook", IN_XLSX if WRITE_IN_PLACE else OUT_XLSX),
    ("strategy", "latest_vintage_else_latest_fred"),
    ("api_obs_start", obs_start),
    ("api_obs_end", obs_end),
    ("run_utc", pd.Timestamp.utcnow().isoformat()),
], columns=["key", "value"])

source_legend_new = pd.DataFrame([
    ("BACKFILL_LATEST_VINTAGE", "Filled missing cells using historical path at latest ALFRED vintage"),
    ("BACKFILL_LATEST_FRED", "Filled missing cells using latest revised FRED path (no ALFRED history)"),
    ("BACKFILL_LATEST_FRED_IF_NO_VINTAGE", "Fallback when ALFRED vintage listing failed/empty"),
], columns=["source", "meaning"])

out_path = IN_XLSX if WRITE_IN_PLACE else OUT_XLSX
with pd.ExcelWriter(out_path, engine="openpyxl") as w:
    # keep original sheets
    for name, df in all_sheets.items():
        df.to_excel(w, sheet_name=name[:31], index=False)

    # add backfilled datasets and diagnostics
    raw_b.to_excel(w, sheet_name="snapshot_raw_bfill_latest", index=False)
    obs_b.to_excel(w, sheet_name="snapshot_obs_bfill_latest", index=False)
    src_b.to_excel(w, sheet_name="snapshot_src_bfill_latest", index=False)
    Xb_t.to_excel(w, sheet_name="snapshot_tcode_bfill_latest", index=False)

    diag_df.to_excel(w, sheet_name="bfill_diag_by_series", index=False)
    summary_df.to_excel(w, sheet_name="bfill_summary", index=False)
    diag_df[[
        "fred_raw", "series_id", "availability", "vintage_count", "first_vintage", "last_vintage",
        "fill_method", "reference_realtime_date", "hist_obs_count"
    ]].to_excel(w, sheet_name="vintage_diag", index=False)
    run_meta.to_excel(w, sheet_name="bfill_run_meta", index=False)
    source_legend_new.to_excel(w, sheet_name="bfill_source_legend", index=False)

print("Wrote backfill workbook:", out_path)
print("Rows x cols:", raw_b.shape)
print("Filled cells total:", int(diag_df["filled_cells"].sum()))
print("Still missing total:", int(diag_df["still_missing_after"].sum()))
print("Top methods:")
print(summary_df.to_string(index=False))


backfill progress: 25/125
backfill progress: 50/125
backfill progress: 75/125
backfill progress: 100/125
backfill progress: 125/125


  out[col] = y
  Xb_t.insert(0, "decision_date", decision)
  ("run_utc", pd.Timestamp.utcnow().isoformat()),


Wrote backfill workbook: fred_balanced_snapshot_auditable_latest_vintage_backfill.xlsx
Rows x cols: (744, 126)
Filled cells total: 24759
Still missing total: 9060
Top methods:
            fill_method  series_count  total_filled_cells  total_still_missing  avg_vintage_count
   BACKFILL_LATEST_FRED            36               17443                 2331                NaN
BACKFILL_LATEST_VINTAGE            21                7316                 1095         472.952381
                   NONE            60                   0                    0                NaN
 SKIP_INVALID_SERIES_ID             3                   0                 2232                NaN
       SKIP_NOT_IN_FRED             3                   0                 2232                NaN
        SKIP_NO_MAPPING             2                   0                 1170                NaN
