In [None]:
#!/usr/bin/env python
"""
STAGE 01 · DATA LOAD & PRE-PROCESSING
─────────────────────────────────────
Reads the raw CSV, cleans it, and saves either CSV **or** Parquet
(depending on the SAVE_FORMAT flag passed from the orchestrator).

Outputs
-------
<OUTPUT_ROOT>/event=<YEAR>/<RUN_TAG>/stage01/
    └─ stage01_cleaned_<YEAR>.(csv|parquet)
"""
from __future__ import annotations

import io, logging, os, re, sys
from datetime import datetime
from pathlib import Path
from typing import Literal

import numpy as np
import pandas as pd

from pipeline_utils import load_cfg

# ───────────────────────────────────────────────
# 0 · CONFIG  &  RUN FOLDER   (Stage-01 only)
# ───────────────────────────────────────────────
CFG      = load_cfg()
EVENTS   = {str(k): v for k, v in CFG.get("events", {}).items()}
DEFAULTS = CFG.get("defaults", {})

SWAN_YEAR = str(os.getenv("SWAN_YEAR") or next(iter(EVENTS)))
RUN_TAG   = os.getenv("RUN_TAG") or datetime.today().strftime("%Y-%m-%d")

# ----- OUTPUT_ROOT (env ▸ yaml lower ▸ yaml upper ▸ fallback) -----
OUTPUT_ROOT = Path(
    os.getenv(
        "OUTPUT_ROOT",
        DEFAULTS.get("output_root",
                     DEFAULTS.get("OUTPUT_ROOT", "outputs_rff"))
    )
).expanduser()

RUN_DIR = OUTPUT_ROOT / f"event={SWAN_YEAR}" / RUN_TAG
RUN_DIR.mkdir(parents=True, exist_ok=True)          # create parent dirs

# stage-specific output folder **must exist before logger config**
OUTPUT_DIR = RUN_DIR / "stage01"
OUTPUT_DIR.mkdir(exist_ok=True)

INPUT_CSV  = Path(os.getenv("INPUT_CSV",
                            DEFAULTS.get("INPUT_CSV", ""))).expanduser()

DATE_COL = DEFAULTS.get("DATE_COL", "ReportDate")
ID_COL   = DEFAULTS.get("ID_COL",   "Symbol")
FILTERS  = {
    "pct_non_na": DEFAULTS.get("PCT_NON_NA", 95),
    "pct_zero":   DEFAULTS.get("PCT_ZERO",   98),
    "min_unique": DEFAULTS.get("MIN_UNIQUE", 10),
}
SAVE_FORMAT: Literal["csv", "parquet"] = (
    os.getenv("SAVE_FORMAT", DEFAULTS.get("SAVE_FORMAT", "csv")).lower()
)


# ─────────────────────────── 1 · LOGGER ────────────────────────────
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)-7s | %(message)s",
    handlers=[
        logging.FileHandler(OUTPUT_DIR / "stage01.log", "w", "utf-8"),
        logging.StreamHandler(sys.stdout),
    ],
)
log = logging.getLogger(__name__)
log.info("========== STAGE 01 ==========")
log.info("Run dir      : %s", RUN_DIR)
log.info("Input CSV    : %s", INPUT_CSV)
log.info("Save format  : %s", SAVE_FORMAT)
log.info("DATE / ID    : %s / %s", DATE_COL, ID_COL)
log.info("Filters      : %s", FILTERS)

# ─────────────────────────── 2 · LOAD RAW DATA ─────────────────────
df = pd.read_csv(INPUT_CSV, low_memory=False)
log.info("Rows loaded  : %s", f"{len(df):,}")

# ─────────────────────────── 3 · DATE & ID CLEAN-UP ────────────────
df[DATE_COL] = (
    pd.to_datetime(df[DATE_COL], errors="coerce", dayfirst=True)
      .fillna(pd.to_datetime(df[DATE_COL], errors="coerce", dayfirst=False))
)
df[ID_COL] = df[ID_COL].astype(str).str.strip().str.upper()

bad_dates = df[df[DATE_COL].isna()]
if not bad_dates.empty:
    bad_dates.to_csv(OUTPUT_DIR / "bad_dates.csv", index=False)
    log.warning("Bad dates → %d rows written to bad_dates.csv", len(bad_dates))

# ─────────────────────────── 4 · COERCE TEXT-NUMBERS ───────────────
_num_rx = re.compile(r"[$€£,%]")
def to_num(s: pd.Series) -> pd.Series:
    if s.dtype != "object":
        return s
    out = pd.to_numeric(s.str.replace(_num_rx, "", regex=True), errors="coerce")
    return out if out.notna().mean() >= 0.50 else s

df = df.apply(to_num)

# ─────────────────────────── 5 · BASIC FILTERS ─────────────────────
before = len(df)
df = df.dropna(subset=[ID_COL, DATE_COL])
log.info("After ID/date filter: %d rows (%.1f%% kept)", len(df), len(df)/before*100)

num_cols = df.select_dtypes(include=[np.number]).columns
meta = pd.DataFrame({
    "pct_non_na": df[num_cols].notna().mean()*100,
    "pct_zero":   (df[num_cols] == 0).mean()*100,
    "n_unique":   df[num_cols].nunique(dropna=True),
})
good = (
    (meta["pct_non_na"] >= FILTERS["pct_non_na"]) &
    (meta["pct_zero"]   <  FILTERS["pct_zero"])   &
    (meta["n_unique"]   >= FILTERS["min_unique"])
)
drop_cols = list(meta.index[~good])
if drop_cols:
    df = df.drop(columns=drop_cols)
    log.info("Dropped %d noisy numeric columns", len(drop_cols))

# ─────────────────────────── 6 · ONE REPORT PER FIRM-YEAR ──────────
df["Year"] = df[DATE_COL].dt.year
df = (
    df.sort_values(DATE_COL)
      .groupby([ID_COL, "Year"], as_index=False)
      .last()
)

# ─────────────────────────── 7 · SAVE RESULT ───────────────────────
out_file = OUTPUT_DIR / f"stage01_cleaned_{SWAN_YEAR}.{SAVE_FORMAT}"
if SAVE_FORMAT == "parquet":
    df.to_parquet(out_file, index=False)
else:
    df.to_csv(out_file, index=False)

buf = io.StringIO(); df.info(buf=buf)
log.info("Final DataFrame info:\n%s", buf.getvalue())
log.info("Saved cleaned file → %s", out_file.name)
log.info("✅ STAGE 01 complete")


2025-06-18 11:36:57,958 | INFO    | [utils] Run dir      : C:\Users\Jason Pohl\OneDrive - Bond University\PhD\rff\outputs_rff\event=2008\2025-06-18
2025-06-18 11:36:57,960 | INFO    | [utils] Input CSV    : C:\Users\Jason Pohl\OneDrive - Bond University\PhD\rff\NEW_DATA.csv
2025-06-18 11:36:57,961 | INFO    | [utils] Save format  : csv
2025-06-18 11:36:57,962 | INFO    | [utils] DATE / ID    : ReportDate / Symbol
2025-06-18 11:36:57,963 | INFO    | [utils] Filters      : {'pct_non_na': 95, 'pct_zero': 98, 'min_unique': 10}
2025-06-18 11:37:00,336 | INFO    | [utils] Rows loaded  : 55,800


  pd.to_datetime(df[DATE_COL], errors="coerce", dayfirst=True)


2025-06-18 11:37:01,086 | INFO    | [utils] After ID/date filter: 55800 rows (100.0% kept)
2025-06-18 11:37:01,617 | INFO    | [utils] Dropped 94 noisy numeric columns
2025-06-18 11:37:05,152 | INFO    | [utils] Final DataFrame info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 34862 entries, 0 to 34861
Columns: 164 entries, Symbol to ReportDate
dtypes: datetime64[ns](1), float64(148), int32(1), int64(4), object(10)
memory usage: 43.5+ MB

2025-06-18 11:37:05,152 | INFO    | [utils] Saved cleaned file → stage01_cleaned_2008.csv
2025-06-18 11:37:05,153 | INFO    | [utils] ✅ STAGE 01 complete
