In [5]:
# === 00_audit: end-to-end data audit, exports CSVs into outputs/audit/ ===
import sys, yaml, warnings
from pathlib import Path
import pandas as pd
import numpy as np

# 0) Project root + src path
PROJECT_ROOT = Path.cwd()
if PROJECT_ROOT.name.lower() == "notebooks":
    PROJECT_ROOT = PROJECT_ROOT.parent
sys.path.insert(0, str(PROJECT_ROOT / "src"))

# Optional: autoreload
%load_ext autoreload
%autoreload 2

from utils import print_run_header, read_parquet
print_run_header("00_audit")

# 1) Config and dirs
CFG_PATH = PROJECT_ROOT / "configs" / "config.yaml"
assert CFG_PATH.exists(), f"Missing config at {CFG_PATH}"
cfg = yaml.safe_load(CFG_PATH.read_text())

PROC_DIR = PROJECT_ROOT / cfg["paths"]["processed_dir"]
OUT_DIR  = PROJECT_ROOT / cfg["paths"]["outputs_dir"] / "audit"
OUT_DIR.mkdir(parents=True, exist_ok=True)

# 2) Load available tables
def safe_read_parquet(p: Path):
    try:
        return read_parquet(p)
    except Exception as e:
        warnings.warn(f"Could not read {p}: {e}")
        return None

labels_path  = PROC_DIR / "labels.parquet"
team_path    = PROC_DIR / "team_features.parquet"
starter_path = PROC_DIR / "starter_features.parquet"
lineup_path  = PROC_DIR / "lineup_features.parquet"
pitch_path   = PROC_DIR / "pitch_features.parquet"
pw_path      = PROC_DIR / "park_weather_features.parquet"  # may not exist if weather skipped

tables = {
    "labels":  safe_read_parquet(labels_path),
    "team":    safe_read_parquet(team_path),
    "starter": safe_read_parquet(starter_path),
    "lineup":  safe_read_parquet(lineup_path),
    "pitch":   safe_read_parquet(pitch_path),
    "pw":      safe_read_parquet(pw_path) if pw_path.exists() else None
}

# 3) Helpers
def summary_stats(df: pd.DataFrame, name: str) -> dict:
    if df is None or len(df)==0:
        return dict(table=name, rows=0, cols=0, dup_game_id=np.nan, null_game_id=np.nan)
    dup_gid = df["game_id"].duplicated().sum() if "game_id" in df.columns else np.nan
    null_gid = df["game_id"].isna().sum() if "game_id" in df.columns else np.nan
    return dict(table=name, rows=int(len(df)), cols=int(df.shape[1]),
                dup_game_id=(int(dup_gid) if not pd.isna(dup_gid) else np.nan),
                null_game_id=(int(null_gid) if not pd.isna(null_gid) else np.nan))

def null_rates(df: pd.DataFrame, name: str) -> pd.DataFrame:
    if df is None or len(df)==0: return pd.DataFrame()
    out = df.isna().mean().rename("null_rate").reset_index().rename(columns={"index":"column"})
    out.insert(0, "table", name)
    return out

def numeric_ranges(df: pd.DataFrame, name: str) -> pd.DataFrame:
    if df is None or len(df)==0: return pd.DataFrame()
    num = df.select_dtypes(include=["number","bool"])
    if num.empty: return pd.DataFrame()
    desc = num.describe(percentiles=[0.01,0.05,0.95,0.99]).T.reset_index().rename(columns={"index":"column"})
    desc.insert(0, "table", name)
    return desc

def expected_range_flags(df: pd.DataFrame, name: str) -> pd.DataFrame:
    if df is None or len(df)==0: return pd.DataFrame(columns=["table","column","value","lo","hi","game_id"])
    flags = []
    rules = [
        ("team_fi_rate_sdt",         0.00, 1.00),
        ("starter_fi_allow_rate_sdt",0.00, 1.00),
        ("lineup_top4_ops_prior",    0.50, 1.20),
        ("park_factor_runs",         0.70, 1.40),
        ("air_density_proxy",        0.50, 1.50),
    ]
    for col, lo, hi in rules:
        if col in df.columns:
            bad = df[(df[col].notna()) & ((df[col] < lo) | (df[col] > hi))]
            if not bad.empty:
                for _, r in bad.head(200).iterrows():
                    flags.append(dict(table=name, column=col, value=r[col], lo=lo, hi=hi, game_id=r.get("game_id", None)))
    return pd.DataFrame(flags)

# Robust leakage helpers
def _pick_label_time_col(labels: pd.DataFrame) -> str | None:
    for c in ["game_datetime_utc", "game_time_utc"]:
        if c in labels.columns: return c
    if "date" in labels.columns: return "date"
    return None

def _pick_feature_time_col(df: pd.DataFrame) -> str | None:
    if df is None or df.empty: return None
    dt_cols = [c for c in df.columns if str(df[c].dtype).startswith("datetime64")]
    if dt_cols: return dt_cols[0]
    name_hits = [c for c in df.columns if ("time" in c.lower() or "datetime" in c.lower())]
    return name_hits[0] if name_hits else None

def _to_utc(s: pd.Series) -> pd.Series:
    return pd.to_datetime(s, utc=True, errors="coerce")

def leakage_check(df: pd.DataFrame, labels: pd.DataFrame, name: str) -> pd.DataFrame:
    if df is None or df.empty or labels is None or labels.empty: return pd.DataFrame()
    lab_t = _pick_label_time_col(labels)
    feat_t = _pick_feature_time_col(df)
    if lab_t is None or feat_t is None: return pd.DataFrame()
    L = labels[["game_id", lab_t]].copy()
    F = df[["game_id", feat_t]].copy()
    if lab_t == "date":  # coarse fallback
        L[lab_t] = pd.to_datetime(L[lab_t], utc=True, errors="coerce")
    L["label_time"]  = _to_utc(L[lab_t])
    F["feature_time"]= _to_utc(F[feat_t])
    merged = L.merge(F, on="game_id", how="left")
    mask = merged["feature_time"].notna() & merged["label_time"].notna() & (merged["feature_time"] >= merged["label_time"])
    out = merged.loc[mask, ["game_id","feature_time","label_time"]].copy()
    out.insert(0, "table", name)
    return out.head(1000)

def join_coverage(labels: pd.DataFrame, parts: dict) -> pd.DataFrame:
    """Share of label rows that find a match in each feature table by game_id (uses _merge indicator; no suffix assumptions)."""
    if labels is None or labels.empty: return pd.DataFrame(columns=["table","join_coverage"])
    base = labels[["game_id"]].drop_duplicates()
    rows = []
    for nm, df in parts.items():
        if nm == "labels" or df is None or df.empty or "game_id" not in df.columns:
            continue
        m = base.merge(df[["game_id"]].drop_duplicates(), on="game_id", how="left", indicator=True)
        coverage = (m["_merge"] == "both").mean()
        rows.append(dict(table=nm, join_coverage=round(float(coverage), 4)))
    return pd.DataFrame(rows)

def left_join_health(labels: pd.DataFrame, pieces: list[pd.DataFrame], names: list[str]) -> pd.DataFrame:
    """Track row retention after successive left joins on game_id."""
    if labels is None or labels.empty: return pd.DataFrame(columns=["step","rows_before","rows_after","delta"])
    base = labels[["game_id"]].drop_duplicates()
    out=[]
    for nm, df in zip(names, pieces):
        before = int(len(base))
        if df is None or df.empty or "game_id" not in df.columns:
            out.append(dict(step=nm, rows_before=before, rows_after=before, delta=0))
            continue
        base = base.merge(df[["game_id"]].drop_duplicates(), on="game_id", how="left")
        after = int(base["game_id"].notna().sum())
        out.append(dict(step=nm, rows_before=before, rows_after=after, delta=after-before))
    return pd.DataFrame(out)

# 4) Core summaries
summaries = pd.DataFrame([summary_stats(tables[k], k) for k in tables.keys()])
summaries.to_csv(OUT_DIR / "table_summaries.csv", index=False)

nulls = pd.concat([null_rates(tables[k], k) for k in tables.keys() if tables[k] is not None], ignore_index=True)
nulls.to_csv(OUT_DIR / "null_rates.csv", index=False)

ranges = pd.concat([numeric_ranges(tables[k], k) for k in tables.keys() if tables[k] is not None], ignore_index=True)
ranges.to_csv(OUT_DIR / "numeric_ranges.csv", index=False)

flags = pd.concat([expected_range_flags(tables[k], k) for k in tables.keys() if tables[k] is not None], ignore_index=True)
flags.to_csv(OUT_DIR / "range_flags.csv", index=False)

# 5) Leakage (robust)
leaks_list = [leakage_check(tables.get(n), tables["labels"], n) for n in ["team","starter","lineup","pitch","pw"]]
leaks = pd.concat([x for x in leaks_list if x is not None and not x.empty], ignore_index=True) if any(
    x is not None and not x.empty for x in leaks_list) else pd.DataFrame(columns=["table","game_id","feature_time","label_time"])
leaks.to_csv(OUT_DIR / "leakage_flags.csv", index=False)

# 6) Join coverage & left-join health
coverage = join_coverage(tables["labels"], tables)
coverage.to_csv(OUT_DIR / "join_coverage.csv", index=False)

health = left_join_health(
    tables["labels"],
    [tables.get(nm) for nm in ["team","starter","lineup","pitch","pw"]],
    ["team","starter","lineup","pitch","pw"]
)
health.to_csv(OUT_DIR / "left_join_health.csv", index=False)

# 7) Label sanity
if tables["labels"] is not None and not tables["labels"].empty:
    label_sanity = {
        "rows": int(len(tables["labels"])),
        "yrfi_rate": float(tables["labels"]["yrfi"].mean()),
        "date_min": str(pd.to_datetime(tables["labels"]["date"]).min()),
        "date_max": str(pd.to_datetime(tables["labels"]["date"]).max()),
        "game_id_dupes": int(tables["labels"]["game_id"].duplicated().sum())
    }
else:
    label_sanity = {"rows":0,"yrfi_rate":np.nan,"date_min":"","date_max":"","game_id_dupes":np.nan}
pd.DataFrame([label_sanity]).to_csv(OUT_DIR / "label_sanity.csv", index=False)

print("\n=== Audit files written to:", OUT_DIR, "===\n")
for f in sorted(OUT_DIR.glob("*.csv")):
    print(" -", f.name)


The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload
=== 00_audit ===
Python 3.12.7 | pandas 2.3.1 | numpy 2.2.6 | sklearn 1.7.1 | xgboost 3.0.4
Platform: Windows 11 | Time: 2025-08-20 13:54:33

=== Audit files written to: C:\Users\alex\Desktop\nrfi\outputs\audit ===

 - join_coverage.csv
 - label_sanity.csv
 - leakage_flags.csv
 - left_join_health.csv
 - null_rates.csv
 - numeric_ranges.csv
 - range_flags.csv
 - table_summaries.csv
