In [None]:
import polars as pl
import duckdb
from pathlib import Path
from collections import Counter
import tabulate

In [None]:
def load_source_csv(cenyear: int, sourcepath: Path) -> pl.DataFrame:
    year_str = str(cenyear)
    csv_file = sourcepath / f"cs{year_str}.csv.gz"
    df = pl.read_csv(csv_file, infer_schema_length=10000)
    df = df.with_columns([
        pl.lit(year_str).str.zfill(4).alias("cenyear"),
        pl.col("serial").cast(str).str.zfill(6).alias("serial_str"),
        (pl.lit(year_str).str.zfill(4) + pl.col("serial").cast(str).str.zfill(6)).alias("hhid")
    ])
    return df

In [None]:
# --- Load Bucketed Parquet Files ---
def load_bucket_data(cenyear: int, basepath: Path) -> pl.DataFrame:
    bucket_dir = basepath / f"cp{str(cenyear)}"
    bucket_files = sorted(bucket_dir.glob("bucket_*.parquet"))
    return pl.concat([pl.read_parquet(f) for f in bucket_files])

# --- Load Manifest ---
def load_manifest(cenyear: int, manifest_db: Path) -> pl.DataFrame:
    with duckdb.connect(str(manifest_db)) as con:
        return pl.from_arrow(con.execute(
            "SELECT * FROM manifest WHERE cenyear = ?", [cenyear]
        ).arrow())

# --- Load Search Index ---
def load_search_index(cenyear: int, index_db: Path) -> pl.DataFrame:
    year_str = str(cenyear)
    with duckdb.connect(str(index_db)) as con:
        return pl.from_arrow(con.execute(
            "SELECT * FROM index WHERE hhid LIKE ?", [f"{year_str}%"]
        ).arrow())

In [None]:
# --- Check Record Totals ---
def check_record_totals(df_src, df_buckets, df_manifest, df_index):
    src_total = df_src.shape[0]
    src_by_hhid = df_src.groupby("hhid").count().select(pl.sum("count")).item()

    bucket_total = df_buckets.shape[0]
    bucket_by_hhid = df_buckets.groupby("hhid").count().select(pl.sum("count")).item()

    manifest_total = df_manifest["record_count"].sum()

    index_total = df_index.shape[0]
    index_by_hhid = df_index.groupby("hhid").count().select(pl.sum("count")).item()

    print(f"üìÑ Source CSV: {src_total} rows total, {src_by_hhid} summed by hhid")
    print(f"üì¶ Buckets:    {bucket_total} rows total, {bucket_by_hhid} summed by hhid")
    print(f"üóÇÔ∏è  Manifest:   {manifest_total} rows total (sum of record_count)")
    print(f"üîé Index:      {index_total} rows total, {index_by_hhid} summed by hhid")

In [None]:
# --- Check cper Uniqueness ---
def check_cper_uniqueness(df_buckets):
    if "cper" not in df_buckets.columns:
        print("‚ö†Ô∏è  'cper' column not found in bucket files.")
        return

    cper_counts = df_buckets["cper"].to_list()
    dupes = [c for c, n in Counter(cper_counts).items() if n > 1]
    if dupes:
        print(f"\n!Found {len(dupes)} duplicated cper values:")
        for c in dupes[:10]:
            print(f"  - {c}")
        if len(dupes) > 10:
            print("  ... (truncated)")
    else:
        print("‚úÖ All cper values are unique.")

In [None]:
def print_ingestion_report(cenyear: int, df_src, df_buckets, df_manifest, df_index):
    src_total = df_src.shape[0]
    src_by_hhid = df_src.groupby("hhid").count().select(pl.sum("count")).item()

    bucket_total = df_buckets.shape[0]
    bucket_by_hhid = df_buckets.groupby("hhid").count().select(pl.sum("count")).item()

    manifest_total = df_manifest["record_count"].sum()

    index_total = df_index.shape[0]
    index_by_hhid = df_index.groupby("hhid").count().select(pl.sum("count")).item()

    table = [{
        "Year": cenyear,
        "Source Rows": src_total,
        "Source by hhid": src_by_hhid,
        "Bucket Rows": bucket_total,
        "Bucket by hhid": bucket_by_hhid,
        "Manifest Rows": manifest_total,
        "Index Rows": index_total,
        "Index by hhid": index_by_hhid
    }]
    print("\nüìä Ingestion Summary:")
    print(tabulate(table, headers="keys", tablefmt="github"))

In [None]:
def verify_ingestion(cenyear: int, basepath: Path, sourcepath: Path):
    print(f"\nüîç Verifying census year {cenyear}...\n")

    df_src = load_source_csv(cenyear, sourcepath)
    df_buckets = load_bucket_data(cenyear, basepath)
    df_manifest = load_manifest(cenyear, basepath / "manifests" / "manifest.duckdb")
    df_index = load_search_index(cenyear, basepath / "index" / "search_index.duckdb")

    check_record_totals(df_src, df_buckets, df_manifest, df_index)
    check_cper_uniqueness(df_buckets)
    print_ingestion_report(cenyear, df_src, df_buckets, df_manifest, df_index)