In [None]:
"""
DATA PIPELINE (Standardize + Validate + Output) — pathlib version
---------------------------------------------------------------------
Architecture:
load_data → standardize_data → run_validations
   ├─ if ERROR → export DQ report + STOP
   └─ if OK/WARN → export DQ report + export clean dataset
"""

from __future__ import annotations

import sys
from pathlib import Path

import pandas as pd


# ============================
# CONFIG
# ============================

INPUT_FILE = Path("data") / "finance_data.csv"

OUTPUT_FOLDER = Path("output")
OUTPUT_FOLDER.mkdir(parents=True, exist_ok=True)

DQ_REPORT_CSV = OUTPUT_FOLDER / "dq_report.csv"
DQ_REPORT_XLSX = OUTPUT_FOLDER / "dq_report.xlsx"

CLEAN_DATASET_CSV = OUTPUT_FOLDER / "finance_clean.csv"
CLEAN_DATASET_XLSX = OUTPUT_FOLDER / "finance_clean.xlsx"

REQUIRED_COLUMNS = ["date", "account", "cost_center", "entity", "scenario", "amount"]
KEY_COLUMNS = ["date", "account", "cost_center", "entity", "scenario"]

ALLOWED_SCENARIOS = {"Actual", "Budget", "Forecast"}
VARIANCE_THRESHOLD = 0.30  # 30% MoM threshold -> WARNING


# ============================
# LOAD
# ============================

def load_data(file_path: Path) -> pd.DataFrame:
    """Load CSV into DataFrame and parse 'date' as datetime."""
    if not file_path.exists():
        raise FileNotFoundError(f"Input file not found: {file_path}")
    return pd.read_csv(file_path, parse_dates=["date"])


# ============================
# STANDARDIZE (safe cleanup)
# ============================

def standardize_data(df: pd.DataFrame) -> pd.DataFrame:
    """
    Safe, mechanical standardization:
    - trim text columns
    - normalize scenario values
    - coerce amount to numeric
    - drop fully empty rows
    Note: does NOT "fix" business issues like missing keys.
    """
    df = df.copy()

    # Trim whitespace in key text columns (prevents false mismatches)
    for col in ["account", "cost_center", "entity", "scenario"]:
        if col in df.columns:
            df[col] = df[col].astype(str).str.strip()

    # Normalize scenario variants -> canonical names
    scenario_map = {
        "actual": "Actual", "act": "Actual",
        "budget": "Budget", "bud": "Budget",
        "forecast": "Forecast", "fc": "Forecast", "fcast": "Forecast",
    }
    if "scenario" in df.columns:
        df["scenario"] = (
            df["scenario"]
            .str.lower()
            .map(scenario_map)
            .fillna(df["scenario"])
        )

    # Amount as numeric; invalid values become NaN (caught in validations)
    if "amount" in df.columns:
        df["amount"] = pd.to_numeric(df["amount"], errors="coerce")

    # Safe cleanup: remove fully empty rows
    df = df.dropna(how="all")

    return df


# ============================
# VALIDATIONS (quality gates)
# ============================

def _issue(check_type: str, severity: str, description: str, value) -> dict:
    """Uniform schema for DQ report rows."""
    return {
        "check_type": check_type,      # e.g. SCHEMA_MISSING_COLUMNS
        "severity": severity,          # OK / WARNING / ERROR
        "description": description,    # human-friendly message
        "value": value,               # numeric (count/ratio/amount/etc.)
    }


def check_schema(df: pd.DataFrame) -> pd.DataFrame:
    """Required columns must exist (ERROR). Extra columns are usually WARNING."""
    issues = []
    expected = set(REQUIRED_COLUMNS)
    actual = set(df.columns)

    missing = expected - actual
    extra = actual - expected

    if missing:
        issues.append(_issue(
            "SCHEMA_MISSING_COLUMNS",
            "ERROR",
            f"Missing required columns: {', '.join(sorted(missing))}",
            len(missing),
        ))

    if extra:
        issues.append(_issue(
            "SCHEMA_EXTRA_COLUMNS",
            "WARNING",
            f"Unexpected extra columns detected: {', '.join(sorted(extra))}",
            len(extra),
        ))

    return pd.DataFrame(issues)


def check_missing_values(df: pd.DataFrame) -> pd.DataFrame:
    """Missing values in required columns (ERROR)."""
    issues = []
    for col in REQUIRED_COLUMNS:
        if col not in df.columns:
            continue
        missing_count = int(df[col].isna().sum())
        if missing_count > 0:
            issues.append(_issue(
                "MISSING_VALUES",
                "ERROR",
                f"Missing values in column '{col}'",
                missing_count,
            ))
    return pd.DataFrame(issues)


def check_scenarios(df: pd.DataFrame) -> pd.DataFrame:
    """Scenario values must be known (ERROR)."""
    issues = []
    if "scenario" not in df.columns:
        return pd.DataFrame(issues)

    unknown = set(df["scenario"].dropna().unique()) - ALLOWED_SCENARIOS
    if unknown:
        issues.append(_issue(
            "SCENARIO_UNKNOWN",
            "ERROR",
            f"Unknown scenario values: {', '.join(sorted(map(str, unknown)))}",
            len(unknown),
        ))
    return pd.DataFrame(issues)


def check_duplicates(df: pd.DataFrame) -> pd.DataFrame:
    """
    - Fully identical duplicates (WARNING)
    - Duplicates by business key (ERROR)
    """
    issues = []

    # Fully identical duplicates (informational)
    before = len(df)
    after = len(df.drop_duplicates())
    removed = before - after
    if removed > 0:
        issues.append(_issue(
            "DUPLICATES_IDENTICAL",
            "WARNING",
            "Fully identical duplicate rows detected (review upstream process)",
            int(removed),
        ))

    # Duplicates by business key (ambiguous truth)
    if all(col in df.columns for col in KEY_COLUMNS):
        dup_by_key = int(df.duplicated(subset=KEY_COLUMNS, keep=False).sum())
        if dup_by_key > 0:
            issues.append(_issue(
                "DUPLICATES_BY_KEY",
                "ERROR",
                "Duplicate records detected for the same business key (Date+Account+CostCenter+Entity+Scenario)",
                dup_by_key,
            ))

    return pd.DataFrame(issues)


def check_control_totals(df: pd.DataFrame) -> pd.DataFrame:
    """
    Basic control totals per scenario.
    - Missing scenario (ERROR)
    - Total NaN (ERROR)
    - Total == 0 (WARNING; might be valid, depends on context)
    """
    issues = []
    if "scenario" not in df.columns or "amount" not in df.columns:
        return pd.DataFrame(issues)

    totals = df.groupby("scenario")["amount"].sum(min_count=1)

    for scenario in ALLOWED_SCENARIOS:
        if scenario not in totals.index:
            issues.append(_issue(
                "CONTROL_TOTAL_MISSING_SCENARIO",
                "ERROR",
                f"Scenario '{scenario}' missing in dataset",
                1,
            ))
            continue

        total = totals.loc[scenario]
        if pd.isna(total):
            issues.append(_issue(
                "CONTROL_TOTAL_NAN",
                "ERROR",
                f"Scenario '{scenario}' total is NaN (likely invalid amounts)",
                1,
            ))
        elif float(total) == 0.0:
            issues.append(_issue(
                "CONTROL_TOTAL_ZERO",
                "WARNING",
                f"Scenario '{scenario}' total equals 0 (verify if expected)",
                0,
            ))

    return pd.DataFrame(issues)


def check_monthly_anomalies(df: pd.DataFrame) -> pd.DataFrame:
    """Flag large Month-over-Month changes per scenario (WARNING)."""
    if not {"date", "scenario", "amount"}.issubset(df.columns):
        return pd.DataFrame()

    monthly = (
        df.sort_values("date")
          .groupby([pd.Grouper(key="date", freq="M"), "scenario"])["amount"]
          .sum()
          .reset_index()
    )

    monthly["prev_amount"] = monthly.groupby("scenario")["amount"].shift(1)
    monthly["change_pct"] = (monthly["amount"] - monthly["prev_amount"]) / monthly["prev_amount"].abs()

    anomalies = monthly[monthly["change_pct"].abs() > VARIANCE_THRESHOLD].copy()
    if anomalies.empty:
        return pd.DataFrame()

    return pd.DataFrame({
        "check_type": "MONTHLY_ANOMALY",
        "severity": "WARNING",
        "description": (
            "High MoM change in "
            + anomalies["scenario"].astype(str)
            + " for "
            + anomalies["date"].dt.strftime("%Y-%m")
        ),
        "value": anomalies["change_pct"].round(2),
    })


def run_validations(df: pd.DataFrame) -> pd.DataFrame:
    """
    Run checks and return a unified DQ report.
    Schema check first; if it has ERROR, skip deeper checks.
    """
    frames = []

    schema_issues = check_schema(df)
    frames.append(schema_issues)

    if not schema_issues.empty and (schema_issues["severity"] == "ERROR").any():
        return pd.concat(frames, ignore_index=True)

    frames.extend([
        check_scenarios(df),
        check_missing_values(df),
        check_duplicates(df),
        check_control_totals(df),
        check_monthly_anomalies(df),
    ])

    report = pd.concat(frames, ignore_index=True)
    if report.empty:
        report = pd.DataFrame([_issue("ALL_CHECKS", "OK", "No data quality issues detected", 0)])

    return report


def has_errors(report_df: pd.DataFrame) -> bool:
    """True if DQ report contains any ERROR."""
    return (not report_df.empty) and ("severity" in report_df.columns) and (report_df["severity"] == "ERROR").any()


# ============================
# OUTPUT
# ============================

def export_dq_report(report_df: pd.DataFrame) -> None:
    """Export DQ report to CSV and Excel."""
    report_df.to_csv(DQ_REPORT_CSV, index=False)
    report_df.to_excel(DQ_REPORT_XLSX, index=False)


def export_clean_dataset(df: pd.DataFrame) -> None:
    """Export standardized (clean) dataset to CSV and Excel."""
    df.to_csv(CLEAN_DATASET_CSV, index=False)
    df.to_excel(CLEAN_DATASET_XLSX, index=False)


# ============================
# ENTRY POINT
# ============================

def main() -> int:
    # 1) Load
    df_raw = load_data(INPUT_FILE)

    # 2) Standardize
    df_std = standardize_data(df_raw)

    # 3) Validate
    dq_report = run_validations(df_std)

    # 4) Always export DQ report
    export_dq_report(dq_report)

    # 5) Stop on ERROR; else export clean dataset
    if has_errors(dq_report):
        print(f"DQ FAILED (ERROR). Report exported to: {DQ_REPORT_XLSX}")
        print("Clean dataset NOT exported.")
        return 1

    export_clean_dataset(df_std)
    print(f"DQ PASSED (OK/WARN). DQ report: {DQ_REPORT_XLSX}")
    print(f"Clean dataset exported to: {CLEAN_DATASET_CSV} and {CLEAN_DATASET_XLSX}")
    return 0


if __name__ == "__main__":
    sys.exit(main())
