In [None]:
# 1) Ventas: limpieza + persistencia (desde RAW)
def clean_and_persist_ventas_from_raw(con: sqlite3.Connection) -> tuple[int, int, int]:
    df = pd.read_sql_query("SELECT * FROM raw_ventas", con)
    raw_rows = len(df)

    if df.empty:
        (QUALITY_DIR / "ventas_invalidas.csv").write_text("", encoding="utf-8")
        return 0, 0, 0

    df = strip_strings(df)
    for c in ["fecha", "id_cliente", "id_producto", "unidades", "precio_unitario", "_ingest_ts", "_source_file"]:
        if c not in df.columns:
            df[c] = None

    df["fecha"] = pd.to_datetime(df["fecha"], errors="coerce").dt.date
    df["unidades"] = pd.to_numeric(df["unidades"], errors="coerce")
    df["precio_unitario"] = df["precio_unitario"].apply(to_float_money)

    valid = (
        df["fecha"].notna()
        & df["unidades"].notna() & (df["unidades"] >= 0)
        & df["precio_unitario"].notna() & (df["precio_unitario"] >= 0)
        & df["id_cliente"].notna() & (df["id_cliente"] != "")
        & df["id_producto"].notna() & (df["id_producto"] != "")
    )
    quarantine = df.loc[~valid].copy()
    clean = df.loc[valid].copy()

    if not clean.empty:
        clean = (
            clean.sort_values("_ingest_ts")
                .drop_duplicates(subset=["fecha", "id_cliente", "id_producto"], keep="last")
        )
        clean["importe"] = clean["unidades"] * clean["precio_unitario"]

    quarantine.to_csv(QUALITY_DIR / "ventas_invalidas.csv", index=False)

    if not clean.empty:
        clean.to_parquet(PARQUET_DIR / "clean_ventas.parquet", index=False)

        upsert_sql = (ROOT / "sql" / "10_upserts.sql").read_text(encoding="utf-8")
        for _, r in clean.iterrows():
            con.execute(
                upsert_sql,
                {
                    "fecha": str(r["fecha"]),
                    "idc": r["id_cliente"],
                    "idp": r["id_producto"],
                    "u": float(r["unidades"]),
                    "p": float(r["precio_unitario"]),
                    "ts": r["_ingest_ts"],
                },
            )
        con.commit()

    return raw_rows, len(clean), len(quarantine)

# 2) Clientes: limpieza + persistencia (desde RAW)
def clean_and_persist_clientes_from_raw(con: sqlite3.Connection) -> tuple[int, int, int]:
    df = pd.read_sql_query("SELECT * FROM raw_clientes", con)
    raw_rows = len(df)

    if df.empty:
        (QUALITY_DIR / "clientes_invalidos.csv").write_text("", encoding="utf-8")
        return 0, 0, 0

    df = strip_strings(df)
    for c in ["fecha", "nombre", "apellido", "id_cliente", "_ingest_ts", "_source_file"]:
        if c not in df.columns:
            df[c] = None

    df["fecha"] = pd.to_datetime(df["fecha"], errors="coerce").dt.date
    valid = df["id_cliente"].notna() & (df["id_cliente"] != "")
    quarantine = df.loc[~valid].copy()
    clean = df.loc[valid].copy()

    if not clean.empty:
        clean = (
            clean.sort_values("_ingest_ts")
                .drop_duplicates(subset=["id_cliente"], keep="last")
        )

    quarantine.to_csv(QUALITY_DIR / "clientes_invalidos.csv", index=False)

    if not clean.empty:
        clean[["id_cliente", "nombre", "apellido", "fecha"]].to_parquet(
            PARQUET_DIR / "dim_clientes.parquet", index=False
        )
        clean[["id_cliente", "nombre", "apellido", "fecha"]].to_sql(
            "dim_clientes", con, if_exists="replace", index=False
        )

    return raw_rows, len(clean), len(quarantine)

# 3) Productos: limpieza + persistencia (desde RAW)
def clean_and_persist_productos_from_raw(con: sqlite3.Connection) -> tuple[int, int, int]:
    df = pd.read_sql_query("SELECT * FROM raw_productos", con)
    raw_rows = len(df)

    if df.empty:
        (QUALITY_DIR / "productos_invalidos.csv").write_text("", encoding="utf-8")
        return 0, 0, 0

    df = strip_strings(df)
    for c in ["fecha_entrada", "nombre_producto", "id_producto", "unidades", "precio_unitario", "categoria", "_ingest_ts", "_source_file"]:
        if c not in df.columns:
            df[c] = None

    df["fecha_entrada"] = pd.to_datetime(df["fecha_entrada"], errors="coerce").dt.date
    df["unidades"] = pd.to_numeric(df["unidades"], errors="coerce")
    df["precio_unitario"] = df["precio_unitario"].apply(to_float_money)

    valid = (
        df["id_producto"].notna() & (df["id_producto"] != "")
        & df["precio_unitario"].notna() & (df["precio_unitario"] >= 0)
        & df["unidades"].notna() & (df["unidades"] >= 0)
    )
    quarantine = df.loc[~valid].copy()
    clean = df.loc[valid].copy()

    if not clean.empty:
        clean = (
            clean.sort_values("_ingest_ts")
            .drop_duplicates(subset=["id_producto"], keep="last")
        )

    quarantine.to_csv(QUALITY_DIR / "productos_invalidos.csv", index=False)

    if not clean.empty:
        clean[["id_producto", "nombre_producto", "categoria", "precio_unitario", "unidades", "fecha_entrada"]].to_parquet(
            PARQUET_DIR / "dim_productos.parquet", index=False
        )
        clean[["id_producto", "nombre_producto", "categoria", "precio_unitario", "unidades", "fecha_entrada"]].to_sql(
            "dim_productos", con, if_exists="replace", index=False
        )

    return raw_rows, len(clean), len(quarantine)

# Runner: llama a las tres limpiezas desde RAW
if __name__ == "__main__":
    DB = OUT / "ut1.db"
    con = sqlite3.connect(DB)

    # Asegura el esquema (clean_ventas, etc.) y vistas si aplica
    # con.executescript((ROOT / "sql" / "00_schema.sql").read_text(encoding="utf-8"))

    print("RAW counts:",
            con.execute("SELECT COUNT(*) FROM raw_ventas").fetchone()[0],
            con.execute("SELECT COUNT(*) FROM raw_clientes").fetchone()[0],
            con.execute("SELECT COUNT(*) FROM raw_productos").fetchone()[0])

    rv = clean_and_persist_ventas_from_raw(con)
    rc = clean_and_persist_clientes_from_raw(con)
    rp = clean_and_persist_productos_from_raw(con)
    print("Ventas (raw, clean, quar):", rv)
    print("Clientes (raw, clean, quar):", rc)
    print("Productos (raw, clean, quar):", rp)

    con.close()
