
# ABS NAICS3 → CBSA (Step‑Through Notebook)

This notebook bakes in the same parameters as the CLI script, but lets you run the pipeline cell‑by‑cell:
- Load ABS county × NAICS3
- Reconcile county NAICS3 sums to NAICS 00
- Aggregate to CBSA
- Filter to “large” CBSAs
- Save outputs

> Update the **Parameters** cell below if your file paths or thresholds differ.


In [1]:

# --- Parameters (edit these) ---
ABS_PATH = "abs_county_naics3.csv"          # path to your ABS county×NAICS3 CSV
XWALK_PATH = "cbsa_county_crosswalk.csv"    # county→CBSA crosswalk
YEAR = 2022                                  # optional year filter; set to None if not present
LARGE_BY = "firms"                           # "firms" or "population" (requires cbsa_pop column)
LARGE_THRESHOLD = 20000                      # firms count or population threshold (depending on LARGE_BY)
RECON_ATOL = 1.0                             # reconciliation absolute tolerance
OUTDIR = "outputs"                           # where to write CSV outputs


In [2]:

import pandas as pd
import numpy as np
from pathlib import Path

In [3]:
def zfill_series(s, n):
    return s.astype(str).str.extract(r"(\d+)", expand=False).fillna("").str.zfill(n)

def normalize_abs_columns(df):
    # Standardize column names
    rename_map = {}
    for want in ["state","county","naics","naics2022","firmpdemp","emp","payann","rcppdemp","year"]:
        for actual in df.columns:
            if actual.lower() == want:
                rename_map[actual] = want.upper() if want in ["firmpdemp","emp","payann","rcppdemp"] else want
    df = df.rename(columns=rename_map)
    # Derive NAICS3
    if "NAICS2022" in df.columns:
        df["naics3"] = df["NAICS2022"].astype(str).str[:3]
    elif "naics" in df.columns:
        df["naics3"] = df["naics"].astype(str).str[:3]
    else:
        raise ValueError("ABS file must have NAICS2022 or naics column")
    # Standardize FIPS
    df["state"] = zfill_series(df["state"], 2)
    df["county"] = zfill_series(df["county"], 3)
    # Convert ABS $1,000s → dollars for PAYANN/RCPPDEMP
    for k in ["PAYANN","RCPPDEMP"]:
        if k in df.columns:
            df[k] = pd.to_numeric(df[k], errors="coerce") * 1000
    # Cast numerics
    for c in ["FIRMPDEMP","EMP","PAYANN","RCPPDEMP"]:
        if c in df.columns:
            df[c] = pd.to_numeric(df[c], errors="coerce")
    if "year" in df.columns:
        df["year"] = pd.to_numeric(df["year"], errors="coerce")
    return df

def normalize_crosswalk(df):
    need = ["state_fips","county_fips","cbsa_code","cbsa_title"]
    missing = [c for c in need if c not in df.columns]
    if missing:
        raise ValueError(f"Crosswalk missing columns: {missing}")
    df["state_fips"]  = zfill_series(df["state_fips"], 2)
    df["county_fips"] = zfill_series(df["county_fips"], 3)
    if "cbsa_pop" in df.columns:
        df["cbsa_pop"] = pd.to_numeric(df["cbsa_pop"], errors="coerce")
    return df

def reconcile_county_totals(abs_df, year=None, atol=1.0):
    base = abs_df.copy()
    if year is not None and "year" in base.columns:
        base = base[base["year"] == year]
    mask_all = base["naics3"].isin(["000"]) | base.get("NAICS2022","").astype(str).isin(["00"]) | base.get("naics","").astype(str).isin(["00"])
    all_rows = base[mask_all].copy()
    all_rows["naics3"] = "000"
    parts = base[~mask_all].copy()

    key = ["state","county"]
    sums = (parts.groupby(key, as_index=False)[["FIRMPDEMP","EMP","PAYANN","RCPPDEMP"]]
                .sum(min_count=1))
    totals = (all_rows.groupby(key, as_index=False)[["FIRMPDEMP","EMP","PAYANN","RCPPDEMP"]]
                    .sum(min_count=1))

    # Rename for clarity
    sums = sums.rename(columns={"FIRMPDEMP":"sum_firmpdemp","EMP":"sum_emp","PAYANN":"sum_payann","RCPPDEMP":"sum_rcppdemp"})
    totals = totals.rename(columns={"FIRMPDEMP":"tot_firmpdemp","EMP":"tot_emp","PAYANN":"tot_payann","RCPPDEMP":"tot_rcppdemp"})
    rep = sums.merge(totals, on=key, how="outer")

    for c in ["firmpdemp","emp","payann","rcppdemp"]:
        rep[f"delta_{c}"] = rep[f"sum_{c}"] - rep[f"tot_{c}"]
        rep[f"pct_delta_{c}"] = np.where(rep[f"tot_{c}"].abs() > 0, rep[f"delta_{c}"]/rep[f"tot_{c}"], np.nan)
        rep[f"flag_{c}"] = rep[f"delta_{c}"].abs() > atol
    rep["recon_ok"] = ~(rep[[f"flag_{c}" for c in ["firmpdemp","emp","payann","rcppdemp"]]].any(axis=1))
    return rep.sort_values(key)

def aggregate_to_cbsa(abs_df, xwalk_df, year=None):
    base = abs_df.copy()
    if year is not None and "year" in base.columns:
        base = base[base["year"] == year]
    base["state_fips"] = base["state"]
    base["county_fips"] = base["county"]
    merged = base.merge(xwalk_df, on=["state_fips","county_fips"], how="left", validate="m:1")
    parts = merged[~merged["naics3"].isin(["000"])].copy()
    key = ["cbsa_code","cbsa_title","naics3"]
    out = (parts.groupby(key, as_index=False)[["FIRMPDEMP","EMP","PAYANN","RCPPDEMP"]]
                .sum(min_count=1))
    all_cbsa = (merged[merged["naics3"].isin(["000"])]
                .groupby(["cbsa_code","cbsa_title"], as_index=False)[["FIRMPDEMP","EMP","PAYANN","RCPPDEMP"]]
                .sum(min_count=1)
                .rename(columns={
                    "FIRMPDEMP":"cbsa_tot_firms",
                    "EMP":"cbsa_tot_emp",
                    "PAYANN":"cbsa_tot_payroll",
                    "RCPPDEMP":"cbsa_tot_receipts"
                }))
    out = out.merge(all_cbsa, on=["cbsa_code","cbsa_title"], how="left")
    return out

def filter_large_cbsa(cbsa_df, xwalk_df, large_by="firms", threshold=20000):
    if large_by == "population" and "cbsa_pop" in xwalk_df.columns:
        pop = xwalk_df.drop_duplicates(subset=["cbsa_code","cbsa_title"])[["cbsa_code","cbsa_title","cbsa_pop"]]
        cbsa_totals = (cbsa_df.drop_duplicates(subset=["cbsa_code","cbsa_title"])[["cbsa_code","cbsa_title"]]
                            .merge(pop, on=["cbsa_code","cbsa_title"], how="left"))
        big_codes = cbsa_totals.loc[cbsa_totals["cbsa_pop"] >= threshold, ["cbsa_code","cbsa_title"]]
    else:
        totals = (cbsa_df.groupby(["cbsa_code","cbsa_title"], as_index=False)["cbsa_tot_firms"]
                        .max())
        big_codes = totals.loc[totals["cbsa_tot_firms"] >= threshold, ["cbsa_code","cbsa_title"]]
    return cbsa_df.merge(big_codes, on=["cbsa_code","cbsa_title"], how="inner")

In [4]:

abs_df = pd.read_csv(ABS_PATH, dtype=str)
abs_df = normalize_abs_columns(abs_df)
xwalk_df = pd.read_csv(XWALK_PATH, dtype=str)
xwalk_df = normalize_crosswalk(xwalk_df)

print("ABS rows:", len(abs_df), "| Crosswalk rows:", len(xwalk_df))
abs_df.head(3)


FileNotFoundError: [Errno 2] No such file or directory: 'abs_county_naics3.csv'

In [None]:

recon = reconcile_county_totals(abs_df, year=YEAR, atol=RECON_ATOL)
display_cols = ["state","county","sum_firmpdemp","tot_firmpdemp","delta_firmpdemp","pct_delta_firmpdemp","recon_ok"]
recon_head = recon[display_cols].head(10)
recon_head


In [None]:

cbsa = aggregate_to_cbsa(abs_df, xwalk_df, year=YEAR)
cbsa.head(10)


In [None]:

cbsa_large = filter_large_cbsa(cbsa, xwalk_df, large_by=LARGE_BY, threshold=LARGE_THRESHOLD)
cbsa_large.head(10)


In [None]:

outdir = Path(OUTDIR); outdir.mkdir(parents=True, exist_ok=True)
recon.to_csv(outdir / "abs_county_naics3_recon_report.csv", index=False)
cbsa.to_csv(outdir / "abs_cbsa_naics3.csv", index=False)
cbsa_large.to_csv(outdir / "abs_cbsa_naics3_large.csv", index=False)

# Also: flag counties with reconciliation issues joined to CBSA for context
cw_small = xwalk_df.rename(columns={"state_fips":"state", "county_fips":"county"})
bad = recon[~recon["recon_ok"]].merge(cw_small, on=["state","county"], how="left")
bad.to_csv(outdir / "abs_cbsa_naics3_discrepancies.csv", index=False)

print("Wrote outputs to:", outdir.resolve())
