In [2]:
import pandas as pd
import numpy as np
import re
import config
from pathlib import Path
import snowflake.connector as snow

In [3]:
pip install XlsxWriter

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [4]:
# Set up your connection parameters
user = config.credentials['USERNAME']
password = config.credentials['PASSWORD']
account = 'PCA67849'
warehouse = config.credentials['WAREHOUSE']

# Connect to Snowflake
conn = snow.connect(
    user=user,
    password=password,
    account=account,
    warehouse=warehouse,
    database='',
    schema=''
)

In [5]:
# ============================================================================
#  LOAD CROSSWALK ONCE AT STARTUP
# ============================================================================
# Query NAICS 2017 and 2022 crosswalk
combined_query = f"""
SELECT*
FROM
CROSSWALKS.RAW.NAICS_2017_NAICS_2022
"""
# Execute query and get data
NAICS_CROSSWALK = pd.read_sql(combined_query, conn)


  NAICS_CROSSWALK = pd.read_sql(combined_query, conn)


In [6]:
# ============================================================================
#  LOAD CROSSWALK ONCE AT STARTUP
# ============================================================================
# Query NAICS 2017 and 2022 crosswalk
combined_query = f"""
SELECT*
FROM
CROSSWALKS.CUSTOM.NAICS_2017_LIST
"""
# Execute query and get data
NAICS_CROSSWALK_2017 = pd.read_sql(combined_query, conn)

  NAICS_CROSSWALK_2017 = pd.read_sql(combined_query, conn)


In [7]:
# ============================================================================
#  LOAD CROSSWALK ONCE AT STARTUP
# ============================================================================
# Query NAICS 2017 and 2022 crosswalk
combined_query = f"""
SELECT*
FROM
CROSSWALKS.CUSTOM.NAICS_2022_LIST
"""
# Execute query and get data
NAICS_CROSSWALK_2022 = pd.read_sql(combined_query, conn)

  NAICS_CROSSWALK_2022 = pd.read_sql(combined_query, conn)


In [8]:
NAICS_CROSSWALK

Unnamed: 0,NAICS_2022,NAICS_Title_2022,NAICS_2017,NAICS_Title_2017
0,111110.0,Soybean Farming,111110.0,Soybean Farming
1,928120.0,International Affairs,928120.0,International Affairs
2,928110.0,National Security,928110.0,National Security
3,927110.0,Space Research and Technology,927110.0,Space Research and Technology
4,926150.0,"Regulation, Licensing, and Inspection of Misce...",926150.0,"Regulation, Licensing, and Inspection of Misce..."
...,...,...,...,...
1145,111160.0,Rice Farming,111160.0,Rice Farming
1146,111150.0,Corn Farming,111150.0,Corn Farming
1147,111140.0,Wheat Farming,111140.0,Wheat Farming
1148,111130.0,Dry Pea and Bean Farming,111130.0,Dry Pea and Bean Farming


In [9]:
# ============================================================================
#  LOAD FULL COUNTY COUNT
# ============================================================================
# Load County from Scott's database
combined_query = f"""
SELECT*
FROM 
temporary_data.sspitze.county_population_2020_2023
"""
# Execute query and get data
county_population_df  = pd.read_sql(combined_query, conn)

  county_population_df  = pd.read_sql(combined_query, conn)


In [10]:
county_population_df 

Unnamed: 0,county_fips_code,county_name,cbsa_status,county_population_2020,county_population_2021,county_population_2022,county_population_2023
0,01001,"Autauga County, Alabama",1.0,58915.0,59203.0,59726.0,60342.0
1,56045,"Weston County, Wyoming",0.0,6816.0,6746.0,6858.0,6808.0
2,56043,"Washakie County, Wyoming",0.0,7657.0,7719.0,7724.0,7710.0
3,56041,"Uinta County, Wyoming",1.0,20457.0,20681.0,20727.0,20745.0
4,56039,"Teton County, Wyoming",1.0,23379.0,23605.0,23297.0,23232.0
...,...,...,...,...,...,...,...
3139,01011,"Bullock County, Alabama",0.0,10229.0,10143.0,10143.0,9897.0
3140,01009,"Blount County, Alabama",1.0,59107.0,59079.0,59516.0,59816.0
3141,01007,"Bibb County, Alabama",1.0,22188.0,22359.0,21986.0,21868.0
3142,01005,"Barbour County, Alabama",0.0,24969.0,24533.0,24700.0,24585.0


In [11]:
def create_standardized_county_fips(employed_data):
    """
    Create standardized county FIPS codes, handling historical discrepancies
    between IPUMS COUNTYFIP codes and standard FIPS codes.
    """
    # Known discrepancies between IPUMS COUNTYFIP and standard FIPS codes
    # fips_mapping = {
    #     # Miami-Dade County, Florida: IPUMS uses 025, standard FIPS uses 086
    #     '12025': '12086'
    # }
    
    # Create IPUMS-style FIPS codes first
    employed_data['county_fips_ipums'] = (
        employed_data['STATEFIP'].astype(int).astype(str).str.zfill(2) + 
        employed_data['COUNTYFIP'].astype(int).astype(str).str.zfill(3)
    )
    
    # Map to standard FIPS codes
    # employed_data['county_fips'] = employed_data['county_fips_ipums'].map(
    #     lambda x: fips_mapping.get(x, x)  # Use mapping if exists, otherwise keep original
    # )
    
    # Just use the IPUMS codes as final codes (no mapping needed)
    employed_data['county_fips'] = employed_data['county_fips_ipums']
    
    # Log the mapping results
    # mapped_counties = employed_data[employed_data['county_fips'] != employed_data['county_fips_ipums']]
    # if len(mapped_counties) > 0:
    #     print(f"FIPS mapping applied to {len(mapped_counties)} records:")
    #     for ipums_fips, standard_fips in fips_mapping.items():
    #         count = len(mapped_counties[mapped_counties['county_fips_ipums'] == ipums_fips])
    #         if count > 0:
    #             print(f"  {ipums_fips} → {standard_fips}: {count} records")
    
    print("FIPS codes created - no mapping applied")
    
    return employed_data

In [12]:
def create_full_county_universe(county_population_df):
    """
    Create complete county universe with population buckets from Snowflake data.
    
    Args:
        county_population_df: DataFrame from Snowflake with columns:
            - county_fips_code: 5-digit FIPS (string)
            - county_name: County name  
            - county_population_2023: Most recent population
    
    Returns:
        DataFrame with county_fips, county_name, pop_label for ALL counties
    """
    print("Creating full county universe from Snowflake data...")
    
    # Population buckets (same as your ACS analysis)
    POPULATION_BUCKETS = [
        (0, 25000, '0-25K'),
        (25000, 50000, '25K-50K'), 
        (50000, 75000, '50K-75K'),
        (75000, 100000, '75K-100K'),
        (100000, 1000000, '100K-1M'),
        (1000000, float('inf'), '1M+')
    ]
    
    # Use 2023 population for bucketing
    df = county_population_df[['county_fips_code', 'county_name', 'county_population_2023']].copy()
    df = df.rename(columns={
        'county_fips_code': 'county_fips',
        'county_population_2023': 'total_population'
    })
    
    # Assign population buckets
    conditions = [df['total_population'] < bucket[1] for bucket in POPULATION_BUCKETS[:-1]]
    choices = [bucket[2] for bucket in POPULATION_BUCKETS[:-1]]
    df['pop_label'] = np.select(conditions, choices, default=POPULATION_BUCKETS[-1][2])
    
    print(f"✓ Full county universe created: {len(df)} counties")
    print("Population bucket distribution (all US counties):")
    bucket_dist = df['pop_label'].value_counts().sort_index()
    for bucket, count in bucket_dist.items():
        print(f"  {bucket}: {count} counties")
    
    return df[['county_fips', 'county_name', 'pop_label']]



In [13]:
def add_pop_label_metro_only(df):
    """Create pop_label_metro for downstream analysis (e.g., Step 10)."""
    if 'metro_status' not in df.columns or 'pop_label' not in df.columns:
        raise ValueError("Both 'metro_status' and 'pop_label' must exist to create 'pop_label_metro'")
    
    df['pop_label_metro'] = df['pop_label'].astype(str) + ' ' + df['metro_status'].astype(str)
    return df


In [14]:
# --------------------------------------------
# HELPER FUNCTIONS FOR REPRESENTATION ANALYSIS
# --------------------------------------------

def create_geographic_presence_matrix_stacked(df, unit_col, code_levels, title_levels, is_soc=False):
    """
    Final: Calculate % of geographic units (PUMA or County) where each code is present,
    stacked across levels, and reshaped into wide format by pop+metro group.
    """

    # Ensure required grouping field exists
    assert 'pop_label' in df.columns and 'metro_status' in df.columns, \
        "Missing 'pop_label' or 'metro_status' columns"

    df_valid = df[df[unit_col].notna()].copy()

    # Combined group for reshaping: e.g. "50K-75K Metro:1"
    df_valid['group'] = df_valid['pop_label'] + ' Metro:' + df_valid['metro_status'].map({
        'Metropolitan': '1',
        'Non-metro/Rural': '0'
    })

    results = []

    for code_col, title_col in zip(code_levels, title_levels):
        level_df = df_valid[df_valid[code_col].notna() & (df_valid[code_col] != 'Unclassified')].copy()

        # Count number of unique units where the code appears in each group
        unit_counts = (
            level_df.groupby([code_col, title_col, 'group'])[unit_col]
            .nunique()
            .reset_index(name='num_units')
        )

        # Total number of unique units per group
        total_units = (
            df_valid.groupby('group')[unit_col]
            .nunique()
            .reset_index(name='total_units')
        )

        # Merge + compute % presence
        merged = unit_counts.merge(total_units, on='group', how='left')
        merged['representation_pct'] = round(100 * merged['num_units'] / merged['total_units'], 1)

        # Pivot to wide format
        wide = merged.pivot_table(index=[code_col, title_col], columns='group', values='representation_pct').reset_index()

        # Rename columns for SOC vs NAICS
        if is_soc:
            wide = wide.rename(columns={
                code_col: 'occupation_code',
                title_col: 'occupation_title'
            })
        else:
            wide = wide.rename(columns={
                code_col: 'industry_code',
                title_col: 'industry_title'
            })

        results.append(wide)

    # Combine all levels (already stacked, no level col needed)
    final = pd.concat(results, axis=0, ignore_index=True)

    # Order columns: ID + pop-metro groups in population ascending order
    ordered_groups = [
        '0-25K Metro:0', '0-25K Metro:1',
        '25K-50K Metro:0', '25K-50K Metro:1',
        '50K-75K Metro:0', '50K-75K Metro:1',
        '75K-100K Metro:0', '75K-100K Metro:1',
        '100K-1M Metro:0', '100K-1M Metro:1',
        '1M+ Metro:0', '1M+ Metro:1',
        'Unknown County Metro:0', 'Unknown County Metro:1'
    ]
    id_cols = ['occupation_code', 'occupation_title'] if is_soc else ['industry_code', 'industry_title']
    existing_groups = [col for col in ordered_groups if col in final.columns]
    final = final[id_cols + existing_groups]

    return final

def create_naics_representation_matrix(employed_data, unit='PUMA', full_county_universe=None):
    """
    VECTORIZED version: Create NAICS representation matrix with pure ACS denominators.
    """
    print(f"Creating NAICS representation matrix for {unit} (vectorized)...")
    
    # All expected population buckets and levels
    all_buckets = ['0-25K', '25K-50K', '50K-75K', '75K-100K', '100K-1M', '1M+']
    metro_statuses = ['Rural', 'Urban']
    naics_levels = ['naics_2digit', 'naics_3digit', 'naics_4digit', 'naics_5digit', 'naics_6digit']
    
    # Create geographic identifier
    if unit == 'County':
        if 'county_fips' not in employed_data.columns:
            employed_data['county_fips'] = (
                employed_data['STATEFIP'].astype(int).astype(str).str.zfill(2) + 
                employed_data['COUNTYFIP'].astype(int).astype(str).str.zfill(3)
            )
        geo_id_col = 'county_fips'
    else:
        geo_id_col = 'geo_id'
    
    # Create metro label mapping
    employed_data['metro_label'] = employed_data['metro_status'].map({
        'Non-metro/Rural': 'Rural',
        'Metropolitan': 'Urban'
    })
    
    # Always use pure ACS denominators - no merge, no filtering
    print("Using pure ACS denominators (all counties with employment data)...")
    total_counts = employed_data.groupby(['pop_label', 'metro_label'])[geo_id_col].nunique().reset_index(name='total_units')
    print(f"Total ACS counties: {len(employed_data[geo_id_col].unique())}")
    
    # Show denominator breakdown
    print("ACS denominator breakdown:")
    for _, row in total_counts.iterrows():
        print(f"  {row['pop_label']} {row['metro_label']}: {row['total_units']} counties")
    
    # Create lookup for total counts
    total_counts_dict = {}
    for _, row in total_counts.iterrows():
        key = (row['pop_label'], row['metro_label'])
        total_counts_dict[key] = row['total_units']
    
    all_results = []
    
    # Process each NAICS level
    for level in naics_levels:
        code_col = level
        title_col = f"{level}_title"
        
        if code_col not in employed_data.columns:
            continue
            
        # Filter to classified data for this level
        level_data = employed_data[employed_data[code_col] != 'Unclassified'].copy()
        if len(level_data) == 0:
            continue
        
        # Count unique units with each code by bucket+metro
        industry_counts = level_data.groupby([code_col, 'pop_label', 'metro_label'])[geo_id_col].nunique().reset_index()
        industry_counts = industry_counts.rename(columns={geo_id_col: 'units_with_industry'})
        
        # Get titles
        title_map = level_data[[code_col, title_col]].drop_duplicates().set_index(code_col)[title_col].to_dict()
        
        # Create all combinations of codes and bucket+metro
        unique_codes = level_data[code_col].unique()
        combinations = []
        for code in unique_codes:
            for bucket in all_buckets:
                for metro in metro_statuses:
                    combinations.append({
                        code_col: code,
                        'pop_label': bucket,
                        'metro_label': metro
                    })
        
        combo_df = pd.DataFrame(combinations)
        
        # Merge with counts (left join to keep all combinations)
        merged = combo_df.merge(industry_counts, on=[code_col, 'pop_label', 'metro_label'], how='left')
        merged['units_with_industry'] = merged['units_with_industry'].fillna(0)
        
        # Add total counts and calculate percentages
        merged['total_units'] = merged.apply(lambda row: total_counts_dict.get((row['pop_label'], row['metro_label']), 0), axis=1)
        merged['percentage'] = (merged['units_with_industry'] / merged['total_units'] * 100).fillna(0).round(1)
        
        # Reshape to wide format
        pivot = merged.pivot_table(
            index=code_col,
            columns=['pop_label', 'metro_label'],
            values='percentage',
            fill_value=0
        ).reset_index()
        
        # Flatten MultiIndex columns properly
        if isinstance(pivot.columns, pd.MultiIndex):
            # Flatten the MultiIndex columns
            new_cols = []
            for col in pivot.columns:
                if col[0] == code_col:  # This is the index column
                    new_cols.append(code_col)
                else:
                    # This is a data column, format as "bucket metro"
                    new_cols.append(f"{col[0]} {col[1]}")
            pivot.columns = new_cols
        
        # Ensure all expected columns exist
        expected_cols = [code_col]
        for bucket in all_buckets:
            for metro in metro_statuses:
                col_name = f"{bucket} {metro}"
                expected_cols.append(col_name)
                if col_name not in pivot.columns:
                    pivot[col_name] = 0.0
        
        # Add titles and rename columns
        pivot['industry_title'] = pivot[code_col].map(title_map)
        pivot = pivot.rename(columns={code_col: 'industry_code'})
        
        # Reorder columns properly
        final_columns = ['industry_code', 'industry_title']
        for bucket in all_buckets:
            for metro in metro_statuses:
                col_name = f"{bucket} {metro}"
                if col_name in pivot.columns:
                    final_columns.append(col_name)
        
        pivot = pivot[final_columns]
        
        all_results.append(pivot)
        print(f"  {level}: {len(pivot)} codes processed")
    
    if not all_results:
        return pd.DataFrame()
    
    # Combine all levels
    result_df = pd.concat(all_results, ignore_index=True)
    result_df = result_df.sort_values('industry_code').reset_index(drop=True)
    
    print(f"✓ NAICS representation matrix (pure ACS denominators): {len(result_df)} total codes")
    return result_df

def create_soc_representation_matrix(employed_data, unit='PUMA', full_county_universe=None):
    """
    VECTORIZED version: Create SOC representation matrix with pure ACS denominators.
    """
    print(f"Creating SOC representation matrix for {unit} (vectorized)...")
    
    # All expected population buckets and levels
    all_buckets = ['0-25K', '25K-50K', '50K-75K', '75K-100K', '100K-1M', '1M+']
    metro_statuses = ['Rural', 'Urban']
    soc_levels = ['soc_2digit', 'soc_3digit', 'soc_4digit', 'soc_5digit', 'soc_6digit']
    
    # Create geographic identifier
    if unit == 'County':
        if 'county_fips' not in employed_data.columns:
            employed_data['county_fips'] = (
                employed_data['STATEFIP'].astype(int).astype(str).str.zfill(2) + 
                employed_data['COUNTYFIP'].astype(int).astype(str).str.zfill(3)
            )
        geo_id_col = 'county_fips'
    else:
        geo_id_col = 'geo_id'
    
    # Create metro label mapping
    employed_data['metro_label'] = employed_data['metro_status'].map({
        'Non-metro/Rural': 'Rural',
        'Metropolitan': 'Urban'
    })
    
    # Always use pure ACS denominators - no merge, no filtering
    print("Using pure ACS denominators (all counties with employment data)...")
    total_counts = employed_data.groupby(['pop_label', 'metro_label'])[geo_id_col].nunique().reset_index(name='total_units')
    
    # Create lookup for total counts
    total_counts_dict = {}
    for _, row in total_counts.iterrows():
        key = (row['pop_label'], row['metro_label'])
        total_counts_dict[key] = row['total_units']
    
    all_results = []
    
    # Process each SOC level
    for level in soc_levels:
        code_col = level
        title_col = f"{level}_title"
        
        if code_col not in employed_data.columns:
            continue
            
        # Filter to classified data for this level
        level_data = employed_data[employed_data[code_col] != 'Unclassified'].copy()
        if len(level_data) == 0:
            continue
        
        # Count unique units with each code by bucket+metro
        occupation_counts = level_data.groupby([code_col, 'pop_label', 'metro_label'])[geo_id_col].nunique().reset_index()
        occupation_counts = occupation_counts.rename(columns={geo_id_col: 'units_with_occupation'})
        
        # Get titles
        title_map = level_data[[code_col, title_col]].drop_duplicates().set_index(code_col)[title_col].to_dict()
        
        # Create all combinations of codes and bucket+metro
        unique_codes = level_data[code_col].unique()
        combinations = []
        for code in unique_codes:
            for bucket in all_buckets:
                for metro in metro_statuses:
                    combinations.append({
                        code_col: code,
                        'pop_label': bucket,
                        'metro_label': metro
                    })
        
        combo_df = pd.DataFrame(combinations)
        
        # Merge with counts (left join to keep all combinations)
        merged = combo_df.merge(occupation_counts, on=[code_col, 'pop_label', 'metro_label'], how='left')
        merged['units_with_occupation'] = merged['units_with_occupation'].fillna(0)
        
        # Add total counts and calculate percentages
        merged['total_units'] = merged.apply(lambda row: total_counts_dict.get((row['pop_label'], row['metro_label']), 0), axis=1)
        merged['percentage'] = (merged['units_with_occupation'] / merged['total_units'] * 100).fillna(0).round(1)
        
        # Reshape to wide format
        pivot = merged.pivot_table(
            index=code_col,
            columns=['pop_label', 'metro_label'],
            values='percentage',
            fill_value=0
        ).reset_index()
        
        # Flatten MultiIndex columns properly
        if isinstance(pivot.columns, pd.MultiIndex):
            # Flatten the MultiIndex columns
            new_cols = []
            for col in pivot.columns:
                if col[0] == code_col:  # This is the index column
                    new_cols.append(code_col)
                else:
                    # This is a data column, format as "bucket metro"
                    new_cols.append(f"{col[0]} {col[1]}")
            pivot.columns = new_cols
        
        # Ensure all expected columns exist
        expected_cols = [code_col]
        for bucket in all_buckets:
            for metro in metro_statuses:
                col_name = f"{bucket} {metro}"
                expected_cols.append(col_name)
                if col_name not in pivot.columns:
                    pivot[col_name] = 0.0
        
        # Add titles and rename columns
        pivot['occupation_title'] = pivot[code_col].map(title_map)
        pivot = pivot.rename(columns={code_col: 'occupation_code'})
        
        # Reorder columns properly
        final_columns = ['occupation_code', 'occupation_title']
        for bucket in all_buckets:
            for metro in metro_statuses:
                col_name = f"{bucket} {metro}"
                if col_name in pivot.columns:
                    final_columns.append(col_name)
        
        pivot = pivot[final_columns]
        
        all_results.append(pivot)
        print(f"  {level}: {len(pivot)} codes processed")
    
    if not all_results:
        return pd.DataFrame()
    
    # Combine all levels
    result_df = pd.concat(all_results, ignore_index=True)
    result_df = result_df.sort_values('occupation_code').reset_index(drop=True)
    
    print(f"✓ SOC representation matrix (pure ACS denominators): {len(result_df)} total codes")
    return result_df

In [15]:
# ============================================================================
# OPTIMIZED NAICS CROSSWALK WITH CONSOLIDATION RULES
# ============================================================================

def load_excel_naics_crosswalk(crosswalk_path):
    """OPTIMIZED: Load and process the NAICS crosswalk Excel file using vectorized operations"""
    
    try:
        print(f"Loading Excel crosswalk from {crosswalk_path}...")
        crosswalk_df = pd.read_excel(crosswalk_path)
        
        # Use the correct columns for 2023-2027 data
        naics_col = '2023-2027 ACS/PRCS INDNAICS CODE'  
        title_col = 'Industry Title'  
        
        # OPTIMIZATION: Vectorized data cleaning instead of iterrows()
        # Filter out invalid rows first
        valid_mask = (
            crosswalk_df[naics_col].notna() & 
            crosswalk_df[title_col].notna() &
            (crosswalk_df[naics_col].astype(str).str.strip() != '0') &
            (crosswalk_df[naics_col].astype(str).str.strip() != 'nan')
        )
        
        valid_df = crosswalk_df[valid_mask].copy()
        
        if len(valid_df) == 0:
            print("No valid NAICS codes found in Excel file")
            return {}
        
        # OPTIMIZATION: Vectorized string cleaning
        valid_df['naics_clean'] = valid_df[naics_col].astype(str).str.strip()
        valid_df['title_clean'] = valid_df[title_col].astype(str).str.strip()
        
        # OPTIMIZATION: Use to_dict() instead of iterrows()
        excel_mapping = dict(zip(valid_df['naics_clean'], valid_df['title_clean']))
        
        print(f"Excel NAICS crosswalk loaded: {len(excel_mapping)} codes")
        return excel_mapping
        
    except Exception as e:
        print(f"Error loading Excel crosswalk: {e}")
        return {}

def create_combined_naics_mapping(naics_crosswalk_df, excel_crosswalk_path):
    """OPTIMIZED: Create NAICS mapping from Snowflake with Excel fallback and consolidation rules"""
    
    print("Creating combined NAICS mapping...")
    
    # OPTIMIZATION: Vectorized cleaning instead of iterrows()
    snowflake_df = naics_crosswalk_df.copy()
    
    # Clean all columns at once
    for col in ['NAICS_2017', 'NAICS_2022', 'NAICS_Title_2017', 'NAICS_Title_2022']:
        if col in snowflake_df.columns:
            snowflake_df[f'{col}_clean'] = (
                snowflake_df[col]
                .astype(str)
                .str.strip()
                .replace(['0', 'nan', 'NaN', ''], None)
            )
    
    # OPTIMIZATION: Process 2017 codes vectorized
    snowflake_mapping = {}
    
    # 2017 mappings
    valid_2017_mask = (
        snowflake_df['NAICS_2017_clean'].notna() & 
        snowflake_df['NAICS_Title_2017_clean'].notna()
    )
    
    if valid_2017_mask.any():
        valid_2017 = snowflake_df[valid_2017_mask]
        mapping_2017 = dict(zip(valid_2017['NAICS_2017_clean'], valid_2017['NAICS_Title_2017_clean']))
        snowflake_mapping.update(mapping_2017)
    
    # 2022 mappings (only add if not already present)
    valid_2022_mask = (
        snowflake_df['NAICS_2022_clean'].notna() & 
        snowflake_df['NAICS_Title_2022_clean'].notna()
    )
    
    if valid_2022_mask.any():
        valid_2022 = snowflake_df[valid_2022_mask]
        
        # Only add codes not already in snowflake_mapping
        new_2022_codes = ~valid_2022['NAICS_2022_clean'].isin(snowflake_mapping.keys())
        if new_2022_codes.any():
            new_2022_df = valid_2022[new_2022_codes]
            mapping_2022 = dict(zip(new_2022_df['NAICS_2022_clean'], new_2022_df['NAICS_Title_2022_clean']))
            snowflake_mapping.update(mapping_2022)
    
    # Load Excel fallback mapping
    excel_mapping = load_excel_naics_crosswalk(excel_crosswalk_path)
    
    # OPTIMIZATION: Efficient combination using set operations
    combined_mapping = snowflake_mapping.copy()
    
    # Add Excel codes that aren't in Snowflake
    snowflake_codes = set(combined_mapping.keys())
    excel_only_codes = set(excel_mapping.keys()) - snowflake_codes
    excel_only_count = len(excel_only_codes)
    
    for code in excel_only_codes:
        combined_mapping[code] = excel_mapping[code]
    
    # OPTIMIZATION: Pre-defined sector mappings with consolidation rules
    SECTOR_MAPPINGS = {
        '11': 'Agriculture, Forestry, Fishing and Hunting',
        '21': 'Mining, Quarrying, and Oil and Gas Extraction',
        '22': 'Utilities',
        '23': 'Construction',
        '31': 'Manufacturing',  # Consolidated: includes 31, 32, 33, 3M
        '42': 'Wholesale Trade',
        '44': 'Retail Trade',   # Consolidated: includes 44, 45
        '48': 'Transportation and Warehousing',  # Consolidated: includes 48, 49
        '51': 'Information',
        '52': 'Finance and Insurance',
        '53': 'Real Estate and Rental and Leasing',
        '54': 'Professional, Scientific, and Technical Services',
        '55': 'Management of Companies and Enterprises',
        '56': 'Administrative and Support and Waste Management Services',
        '61': 'Educational Services',
        '62': 'Health Care and Social Assistance',
        '71': 'Arts, Entertainment, and Recreation',
        '72': 'Accommodation and Food Services',
        '81': 'Other Services (except Public Administration)',
        '92': 'Public Administration'
    }
    
    # Add sector mappings only if not already present
    existing_codes = set(combined_mapping.keys())
    missing_sectors = set(SECTOR_MAPPINGS.keys()) - existing_codes
    sector_added_count = len(missing_sectors)
    
    for code in missing_sectors:
        combined_mapping[code] = SECTOR_MAPPINGS[code]
    
    # CONSOLIDATION: Map the variants to consolidated codes
    consolidation_mappings = {
        '32': ('31', 'Manufacturing'),
        '33': ('31', 'Manufacturing'), 
        '3M': ('31', 'Manufacturing'),
        '45': ('44', 'Retail Trade'),
        '49': ('48', 'Transportation and Warehousing')
    }
    
    consolidation_added_count = 0
    for old_code, (new_code, title) in consolidation_mappings.items():
        if old_code not in combined_mapping:
            combined_mapping[old_code] = title
            consolidation_added_count += 1
    
    # Print summary
    print(f"Combined NAICS crosswalk created: {len(combined_mapping)} total codes")
    print(f"  - From Snowflake: {len(snowflake_mapping)} codes")
    print(f"  - From Excel (additional): {excel_only_count} codes")
    print(f"  - From 2-digit sectors (additional): {sector_added_count} codes")
    print(f"  - Consolidation mappings added: {consolidation_added_count} codes")
    print("  - Consolidation rules: 32/33/3M→31, 45→44, 49→48")
    
    return combined_mapping

def get_naics_info_with_fallback(indnaics_code, combined_naics_mapping):
    """OPTIMIZED: Get NAICS title with efficient string operations and sector consolidation"""
    
    # OPTIMIZATION: Early returns for common cases
    if not indnaics_code or pd.isna(indnaics_code):
        return 'Unclassified', 'Unclassified'
    
    # Convert to string once
    code_str = str(indnaics_code).strip()
    
    # OPTIMIZATION: Fast check for unclassified codes
    if not code_str or code_str in {'0', 'nan', 'NaN'} or code_str.startswith('99'):
        return 'Unclassified', 'Unclassified'
    
    # SECTOR CONSOLIDATION: Apply 2-digit consolidation rules (3+ digits unaffected)
    if len(code_str) == 2:
        if code_str in ['32', '33', '3M']:  # Manufacturing consolidation
            code_str = '31'
        elif code_str == '45':  # Retail consolidation
            code_str = '44'
        elif code_str == '49':  # Transportation consolidation
            code_str = '48'
    
    # OPTIMIZATION: Direct lookup first (most common case)
    if code_str in combined_naics_mapping:
        return code_str, combined_naics_mapping[code_str]
    
    # OPTIMIZATION: Generate test codes more efficiently
    test_codes = []
    
    # Handle decimal points
    if '.' in code_str:
        clean_code = code_str.split('.')[0]
        if clean_code != code_str:
            test_codes.append(clean_code)
    
    # Handle numeric codes with leading zeros
    if code_str.isdigit() and len(code_str) > 1:
        int_code = str(int(code_str))
        if int_code != code_str:
            test_codes.append(int_code)
    
    # Generate truncated versions efficiently
    if len(code_str) > 2:
        for length in [5, 4, 3, 2]:
            if len(code_str) > length:
                truncated = code_str[:length]
                # Apply consolidation to truncated 2-digit codes
                if len(truncated) == 2:
                    if truncated in ['32', '33', '3M']:
                        truncated = '31'
                    elif truncated == '45':
                        truncated = '44'
                    elif truncated == '49':
                        truncated = '48'
                
                if truncated not in test_codes:
                    test_codes.append(truncated)
    
    # OPTIMIZATION: Check all test codes efficiently
    for test_code in test_codes:
        if test_code in combined_naics_mapping:
            return code_str, combined_naics_mapping[test_code]
    
    # If not found, return with ACS prefix
    return code_str, f'ACS INDNAICS {code_str}'

def initialize_naics_mapping(naics_crosswalk_df, excel_crosswalk_path):
    """Initialize the NAICS mapping with error handling and validation"""
    
    try:
        print("Initializing NAICS mapping...")
        
        # Validate inputs
        if naics_crosswalk_df is None or len(naics_crosswalk_df) == 0:
            print("Warning: No Snowflake crosswalk data provided")
            naics_crosswalk_df = pd.DataFrame()  # Empty DataFrame
        
        # Create the mapping
        naics_mapping = create_combined_naics_mapping(naics_crosswalk_df, excel_crosswalk_path)
        
        if len(naics_mapping) == 0:
            print("Error: No NAICS mappings created. Check your input files.")
            return {}
        
        print(f"NAICS mapping initialized successfully: {len(naics_mapping)} codes available")
        return naics_mapping
        
    except Exception as e:
        print(f"Error initializing NAICS mapping: {e}")
        return {}



In [16]:
# ============================================================================
# Occupation Crosswalk
# ============================================================================

def create_occupation_crosswalk(acs_onet, onet_soc2019):
    """Create ACS OCCSOC → Full SOC 2019 hierarchy mapping using ONET join"""
    print("Creating occupation crosswalk...")

    # Validate input columns
    required_acs_cols = ['SOC_2019_5_ACS', 'ONET_2019']
    required_onet_cols = ['ONET_2019', 'SOC_2019_2', 'SOC_2019_2_NAME', 'SOC_2019_3',
                          'SOC_2019_3_NAME', 'SOC_2019_4', 'SOC_2019_4_NAME',
                          'SOC_2019_5', 'SOC_2019_5_NAME']
    
    for col in required_acs_cols:
        if col not in acs_onet.columns:
            raise ValueError(f"Missing required column in acs_onet: {col}")
    for col in required_onet_cols:
        if col not in onet_soc2019.columns:
            raise ValueError(f"Missing required column in onet_soc2019: {col}")
    
    # Clean ONET_2019 columns
    acs_onet['ONET_2019'] = acs_onet['ONET_2019'].astype(str).str.strip()
    onet_soc2019['ONET_2019'] = onet_soc2019['ONET_2019'].astype(str).str.strip()

    # Merge to get full SOC hierarchy
    merged = acs_onet.merge(
        onet_soc2019,
        on='ONET_2019',
        how='left',
        validate='many_to_one'
    )

    # Clean ACS code key
    merged['SOC_2019_5_ACS'] = merged['SOC_2019_5_ACS'].astype(str).str.strip()

    # Build mapping from ACS 6-digit code to full SOC hierarchy
    soc_mapping = {}
    for _, row in merged.iterrows():
        acs_code = row['SOC_2019_5_ACS']
        if pd.isna(acs_code) or not acs_code:
            continue
        
        soc5 = str(row['SOC_2019_5']).strip()

        if '-' in soc5 and len(soc5) == 7:
            # Correct 7-character SOC code, like "17-2011"
            soc2 = soc5[:2]
            soc3 = soc5[:4]
            soc4 = soc5[:5] + '0'  # Always pad SOC-4 with final 0
        else:
            soc2, soc3, soc4 = 'Unclassified', 'Unclassified', 'Unclassified'
        
        soc_mapping[acs_code] = {
            'soc2_code': soc2,
            'soc2_title': row['SOC_2019_2_NAME'],
            'soc3_code': soc3,
            'soc3_title': row['SOC_2019_3_NAME'],
            'soc4_code': soc4,
            'soc4_title': row['SOC_2019_4_NAME'],
            'soc5_code': soc5,
            'soc5_title': row['SOC_2019_5_NAME']
        }

    print(f"✓ Created {len(soc_mapping):,} ACS SOC → SOC 2019 mappings")
    return soc_mapping


def create_soc_level_summary(occ_mapping):
    """Create summary of available SOC levels"""
    
    if not occ_mapping:
        return {}
    
    soc_levels = ['soc2_code', 'soc3_code', 'soc4_code', 'soc5_code', 'soc6_code']
    level_summary = {}
    
    for level in soc_levels:
        unique_codes = set()
        for soc_hierarchy in occ_mapping.values():
            code = soc_hierarchy.get(level)
            if code and code != 'Unclassified':
                unique_codes.add(code)
        
        level_summary[level] = {
            'unique_codes': len(unique_codes),
            'sample_codes': list(unique_codes)[:5]
        }
    
    return level_summary



In [17]:
# Query ACS crosswalk to ONET
combined_query = f"""
SELECT*
FROM
CROSSWALKS.CUSTOM.ONET_2019_ACS_SOC_CROSSWALK
"""
# Execute query and get data
acs_onet = pd.read_sql(combined_query, conn)

  acs_onet = pd.read_sql(combined_query, conn)


In [18]:
acs_onet

Unnamed: 0,ONET_2019,ONET_2019_NAME,SOC_2019_5_OEWS,SOC_2019_5_OEWS_NAME,SOC_2019_5_ACS,SOC_2019_5_ACS_NAME
0,43-4051.00,Customer Service Representatives,43-4051,Customer Service Representatives,434051,Customer Service Representatives
1,43-4199.00,"Information and Record Clerks, All Other",43-4199,"Information and Record Clerks, All Other",434XXX,Correspondent clerks and order clerks
2,43-4151.00,Order Clerks,43-4151,Order Clerks,434XXX,Correspondent clerks and order clerks
3,43-4021.00,Correspondence Clerks,43-4021,Correspondence Clerks,434XXX,Correspondent clerks and order clerks
4,43-4011.00,Brokerage Clerks,43-4011,Brokerage Clerks,434XXX,Correspondent clerks and order clerks
...,...,...,...,...,...,...
1040,53-7062.00,"Laborers and Freight, Stock, and Material Move...",53-7062,"Laborers and Freight, Stock, and Material Move...",537062,"Laborers and Freight, Stock, and Material Move..."
1041,47-2061.00,Construction Laborers,47-2061,Construction Laborers,472061,Construction Laborers
1042,25-3041.00,Tutors,25-3041,Tutors,253041,Tutors
1043,35-9031.00,"Hosts and Hostesses, Restaurant, Lounge, and C...",35-9031,"Hosts and Hostesses, Restaurant, Lounge, and C...",359031,"Host and Hostesses, Restaurant, Lounge, and Co..."


In [19]:
# Query ONET 2019 to SOC 2019
combined_query = f"""
SELECT*
FROM
CROSSWALKS.CUSTOM.ONET_2019_SOC_2019_CROSSWALK_FULL
"""
# Execute query and get data
onet_soc2019 = pd.read_sql(combined_query, conn)


  onet_soc2019 = pd.read_sql(combined_query, conn)


In [20]:
onet_soc2019

Unnamed: 0,SOC_2019_2,SOC_2019_2_NAME,SOC_2019_3,SOC_2019_3_NAME,SOC_2019_4,SOC_2019_4_NAME,SOC_2019_5,SOC_2019_5_NAME,ONET_2019,ONET_2019_NAME
0,11-0000,Management Occupations,11-1000,Top Executives,11-1010,Chief Executives,11-1011,Chief Executives,11-1011.00,Chief Executives
1,55-0000,Military Specific Occupations,55-3000,Military Enlisted Tactical Operations and Air/...,55-3010,Military Enlisted Tactical Operations and Air/...,55-3019,Military Enlisted Tactical Operations and Air/...,55-3019.00,Military Enlisted Tactical Operations and Air/...
2,55-0000,Military Specific Occupations,55-3000,Military Enlisted Tactical Operations and Air/...,55-3010,Military Enlisted Tactical Operations and Air/...,55-3018,Special Forces,55-3018.00,Special Forces
3,55-0000,Military Specific Occupations,55-3000,Military Enlisted Tactical Operations and Air/...,55-3010,Military Enlisted Tactical Operations and Air/...,55-3016,Infantry,55-3016.00,Infantry
4,55-0000,Military Specific Occupations,55-3000,Military Enlisted Tactical Operations and Air/...,55-3010,Military Enlisted Tactical Operations and Air/...,55-3015,Command and Control Center Specialists,55-3015.00,Command and Control Center Specialists
...,...,...,...,...,...,...,...,...,...,...
1011,11-0000,Management Occupations,11-2000,"Advertising, Marketing, Promotions, Public Rel...",11-2020,Marketing and Sales Managers,11-2021,Marketing Managers,11-2021.00,Marketing Managers
1012,11-0000,Management Occupations,11-2000,"Advertising, Marketing, Promotions, Public Rel...",11-2010,Advertising and Promotions Managers,11-2011,Advertising and Promotions Managers,11-2011.00,Advertising and Promotions Managers
1013,11-0000,Management Occupations,11-1000,Top Executives,11-1030,Legislators,11-1031,Legislators,11-1031.00,Legislators
1014,11-0000,Management Occupations,11-1000,Top Executives,11-1020,General and Operations Managers,11-1021,General and Operations Managers,11-1021.00,General and Operations Managers


In [21]:
# ============================================================================
# LOAD CROSSWALKS WITH FALLBACK
# ============================================================================

# Usage example (assuming NAICS_CROSSWALK DataFrame exists):
EXCEL_CROSSWALK_PATH = "indnaics_crosswalk_2023.xlsx"
NAICS_MAPPING = initialize_naics_mapping(NAICS_CROSSWALK, EXCEL_CROSSWALK_PATH)
SOC_MAPPING = create_occupation_crosswalk(acs_onet, onet_soc2019)


# ADD THIS: Enhance with 2017 and 2022 crosswalks
print("Enhancing NAICS mapping with 2017 and 2022 crosswalks...")
initial_count = len(NAICS_MAPPING)

# Add 2017 codes
naics_levels = ['NAICS2', 'NAICS3', 'NAICS4', 'NAICS5', 'NAICS6']
title_levels = ['NAICS2_NAME', 'NAICS3_NAME', 'NAICS4_NAME', 'NAICS5_NAME', 'NAICS6_NAME']

for code_col, title_col in zip(naics_levels, title_levels):
    if code_col in NAICS_CROSSWALK_2017.columns and title_col in NAICS_CROSSWALK_2017.columns:
        valid_2017 = NAICS_CROSSWALK_2017[
            NAICS_CROSSWALK_2017[code_col].notna() & 
            NAICS_CROSSWALK_2017[title_col].notna()
        ]
        for idx, row in valid_2017.iterrows():
            code = str(row[code_col]).strip()
            title = str(row[title_col]).strip()
            if code and title and code not in NAICS_MAPPING:
                NAICS_MAPPING[code] = title

# Add 2022 codes
for code_col, title_col in zip(naics_levels, title_levels):
    if code_col in NAICS_CROSSWALK_2022.columns and title_col in NAICS_CROSSWALK_2022.columns:
        valid_2022 = NAICS_CROSSWALK_2022[
            NAICS_CROSSWALK_2022[code_col].notna() & 
            NAICS_CROSSWALK_2022[title_col].notna()
        ]
        for idx, row in valid_2022.iterrows():
            code = str(row[code_col]).strip()
            title = str(row[title_col]).strip()
            if code and title and code not in NAICS_MAPPING:
                NAICS_MAPPING[code] = title

print(f"✓ Enhanced NAICS mapping: {len(NAICS_MAPPING)} codes ({len(NAICS_MAPPING) - initial_count} added)")

Initializing NAICS mapping...
Creating combined NAICS mapping...
Loading Excel crosswalk from indnaics_crosswalk_2023.xlsx...
Excel NAICS crosswalk loaded: 265 codes
Combined NAICS crosswalk created: 1439 total codes
  - From Snowflake: 1151 codes
  - From Excel (additional): 265 codes
  - From 2-digit sectors (additional): 18 codes
  - Consolidation mappings added: 5 codes
  - Consolidation rules: 32/33/3M→31, 45→44, 49→48
NAICS mapping initialized successfully: 1439 codes available
Creating occupation crosswalk...
✓ Created 528 ACS SOC → SOC 2019 mappings
Enhancing NAICS mapping with 2017 and 2022 crosswalks...
✓ Enhanced NAICS mapping: 3625 codes (2186 added)


In [22]:
# ============================================================================
# CONFIGURATION
# ============================================================================

POPULATION_BUCKETS = [
    (0, 25000, '0-25K'),
    (25000, 50000, '25K-50K'),
    (50000, 75000, '50K-75K'),
    (75000, 100000, '75K-100K'),
    (100000, 1000000, '100K-1M'),
    (1000000, float('inf'), '1M+')
]

BUCKET_ORDER = ['0-25K', '25K-50K', '50K-75K', '75K-100K', '100K-1M', '1M+', 'Unknown County']
NAICS_LEVELS = ['naics_2digit', 'naics_3digit', 'naics_4digit', 'naics_5digit', 'naics_6digit']
SOC_LEVELS = ['soc_2digit', 'soc_3digit', 'soc_4digit', 'soc_5digit', 'soc_6digit']

In [23]:
def load_and_prepare_data(csv_path):
    """Load and do basic data preparation"""
    print(f"Loading data from {csv_path}...")
    df = pd.read_csv(csv_path, compression='gzip')
    print(f"Loaded {len(df):,} records")
    
    # Basic data cleaning
    df['INDNAICS'] = df['INDNAICS'].fillna('0').astype(str).str.strip()
    df['OCCSOC'] = df['OCCSOC'].fillna('0').astype(str).str.strip()
    
    # Fix float-like strings
    float_mask = df['INDNAICS'].str.contains(r'\.0$', na=False)
    if float_mask.any():
        df.loc[float_mask, 'INDNAICS'] = df.loc[float_mask, 'INDNAICS'].str.replace('.0', '', regex=False)
    
    return df


In [24]:
df = load_and_prepare_data("usa_00052.csv.gz")


Loading data from usa_00052.csv.gz...
Loaded 15,912,393 records


In [25]:
# Combine STATEFIP and COUNTYICP to form unique county identifiers
df_valid = df[df['COUNTYICP'].notna() & (df['COUNTYICP'] != 0)]

# Create a full county FIPS code
df_valid['county_fips'] = df_valid['STATEFIP'].astype(int).astype(str).str.zfill(2) + \
                          df_valid['COUNTYICP'].astype(int).astype(str).str.zfill(3)

# Calculate population by county_fips
county_pop = (
    df_valid
    .groupby('county_fips')['PERWT']
    .sum()
    .reset_index(name='population')
)


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_valid['county_fips'] = df_valid['STATEFIP'].astype(int).astype(str).str.zfill(2) + \


In [26]:

# Filter counties with population over 1 million
high_pop_counties = county_pop[county_pop['population'] >= 1_000_000]

# Count how many counties meet that threshold
num_high_pop_counties = len(high_pop_counties)

# Display
print(f"Number of counties with population ≥ 1 million: {num_high_pop_counties}")
display(high_pop_counties.sort_values(by='population', ascending=False))

Number of counties with population ≥ 1 million: 45


Unnamed: 0,county_fips,population
25,6370,9846959.0
126,17310,5185799.0
374,482010,4758253.0
7,4130,4491638.0
35,6730,3283122.0
30,6590,3164099.0
257,36470,2643951.0
365,481130,2603698.0
32,6650,2450683.0
264,36810,2328854.0


In [54]:
def apply_naics_consolidation(naics_code):
    """
    Apply NAICS 2-digit consolidation rules to standardize industry groupings.
    
    Consolidation rules:
    - Manufacturing: 31, 32, 33, 3M → 31
    - Retail Trade: 44, 45 → 44  
    - Transportation: 48, 49 → 48
    - 3+ digit codes are unchanged
    """
    
    if not naics_code or pd.isna(naics_code):
        return naics_code
    
    # Convert to string and clean
    code_str = str(naics_code).strip()
    
    # Only apply consolidation to 2-digit codes
    if len(code_str) == 2:
        if code_str in ['32', '33', '3M']:  # Manufacturing consolidation
            return '31'
        elif code_str == '45':  # Retail consolidation  
            return '44'
        elif code_str == '49':  # Transportation consolidation
            return '48'
    
    # Return original code for all other cases (3+ digits, other 2-digits, etc.)
    return code_str

def add_classifications(df):
    """Add NAICS and SOC classification columns + titles to dataframe with NAICS consolidation."""
    print("  Processing NAICS with consolidation...")

    # Flag unclassified NAICS codes
    naics_unclassified_mask = (
        (df['INDNAICS'] == '0') |
        (df['INDNAICS'] == '') |
        (df['INDNAICS'].astype(str).str.startswith('99', na=False))
    )

    # Initialize NAICS levels
    for level in ['naics_2digit', 'naics_3digit', 'naics_4digit', 'naics_5digit', 'naics_6digit']:
        df[level] = 'Unclassified'

    # Apply valid NAICS slicing WITH CONSOLIDATION
    naics_classified_df = df[~naics_unclassified_mask].copy()
    if len(naics_classified_df) > 0:
        full_codes = naics_classified_df['INDNAICS'].astype(str).str.strip()
        code_lengths = full_codes.str.len()

        # Process each level with consolidation for 2-digit only
        for min_len, level in [(2, 'naics_2digit'), (3, 'naics_3digit'),
                               (4, 'naics_4digit'), (5, 'naics_5digit'), (6, 'naics_6digit')]:
            mask = code_lengths >= min_len
            
            if level == 'naics_2digit':
                # Apply consolidation to 2-digit codes
                codes_to_assign = full_codes[mask].str[:min_len].apply(apply_naics_consolidation)
            else:
                # No consolidation for 3+ digit codes
                codes_to_assign = full_codes[mask].str[:min_len]
            
            df.loc[naics_classified_df.index[mask], level] = codes_to_assign

    # Show consolidation results
    consolidation_counts = {}
    if len(naics_classified_df) > 0:
        for old_code in ['32', '33', '3M']:
            count = (naics_classified_df['INDNAICS'].astype(str).str[:2] == old_code).sum()
            if count > 0:
                consolidation_counts[f'{old_code}→31'] = count
        
        for old_code in ['45']:
            count = (naics_classified_df['INDNAICS'].astype(str).str[:2] == old_code).sum()
            if count > 0:
                consolidation_counts[f'{old_code}→44'] = count
        
        for old_code in ['49']:
            count = (naics_classified_df['INDNAICS'].astype(str).str[:2] == old_code).sum()
            if count > 0:
                consolidation_counts[f'{old_code}→48'] = count
        
        if consolidation_counts:
            print("  NAICS consolidation applied:")
            for consolidation, count in consolidation_counts.items():
                print(f"    {consolidation}: {count:,} records")

    # Assign NAICS titles using your NAICS_MAPPING dict
    for level in ['naics_2digit', 'naics_3digit', 'naics_4digit', 'naics_5digit', 'naics_6digit']:
        df[f"{level}_title"] = df[level].map(NAICS_MAPPING).fillna('Unclassified')

    print("  Processing SOC...")

    # Flag unclassified SOC
    soc_unclassified_mask = (
        (df['OCCSOC'] == '0') |
        (df['OCCSOC'] == '') |
        (df['OCCSOC'].astype(str).str.startswith('99', na=False))
    )

    for level in ['soc_2digit', 'soc_3digit', 'soc_4digit', 'soc_5digit', 'soc_6digit']:
        df[level] = 'Unclassified'
        df[f"{level}_title"] = 'Unclassified'

    soc_classified_df = df[~soc_unclassified_mask].copy()

    if len(soc_classified_df) > 0 and SOC_MAPPING:
        print(f"    Processing {len(soc_classified_df):,} SOC records with mapping...")

        level_map = {
            'soc2_code': 'soc_2digit',
            'soc3_code': 'soc_3digit',
            'soc4_code': 'soc_4digit',
            'soc5_code': 'soc_5digit',
            'soc6_code': 'soc_6digit'
        }

        # Create mapping dictionaries
        soc_code_maps = {v: {} for v in level_map.values()}
        soc_title_maps = {v: {} for v in level_map.values()}

        for acs_code, soc_data in SOC_MAPPING.items():
            for soc_key, final_col in level_map.items():
                title_col = final_col + '_title'
                soc_code_maps[final_col][acs_code] = soc_data.get(soc_key, 'Unclassified')
                soc_title_maps[final_col][acs_code] = soc_data.get(soc_key.replace("code", "title"), 'Unclassified')

        occsoc_clean = soc_classified_df['OCCSOC'].astype(str).str.strip()

        for col in level_map.values():
            df.loc[soc_classified_df.index, col] = occsoc_clean.map(soc_code_maps[col]).fillna('Unclassified')
            df.loc[soc_classified_df.index, col + '_title'] = occsoc_clean.map(soc_title_maps[col]).fillna('Unclassified')

    print("  ✓ NAICS classifications created with consolidation")
    print("  ✓ SOC classifications created")
    
    return df

def filter_employment_data(df, geo_type='PUMA'):
    """Filter data for employed records with valid classifications."""
    print(f"\nSTEP 3A: Filtering {geo_type} employment data...")
    
    # Common employment filter
    employed_mask = (
        (df['EMPSTAT'] == 1) & 
        (df['INDNAICS'] != '0') & 
        (df['OCCSOC'] != '0') & 
        (df['PERWT'] > 0) &
        (df['naics_2digit'] != 'Unclassified') &  # Must have valid NAICS
        (df['soc_2digit'] != 'Unclassified')      # Must have valid SOC
    )
    
    # Additional filters based on geography type
    if geo_type == 'PUMA':
        employed_mask &= (df['PUMA'] != 0)
    
    employed = df[employed_mask].copy()
    
    print(f"✓ {geo_type} employment records with valid classifications: {len(employed):,}")
    print(f"  (Filtered out INDNAICS=0 and OCCSOC=0 as unemployed/not applicable)")
    
    return employed


def add_geographic_features(employed, geo_type='PUMA', full_dataset=None):
    """
    Add geographic identifiers and population buckets to employment data.
    
    Args:
        employed: Filtered employment data (for analysis)
        geo_type: 'PUMA' or 'County'  
        full_dataset: Complete ACS data (for population bucketing). If None, uses employed data.
    """
    print(f"\nSTEP 3B: Adding geographic features for {geo_type}...")

    # Set geography-specific parameters
    if geo_type == 'PUMA':
        geo_col = 'PUMA'
        geo_digits = 5
    else:  # County
        geo_col = 'COUNTYICP'
        geo_digits = 4

    # Add geographic identifiers to employment data
    employed['geo_id'] = (
        employed['STATEFIP'].astype(str).str.zfill(2) +
        '_' +
        employed[geo_col].astype(str).str.zfill(geo_digits)
    )

    # Add metro status using MET2013
    employed['metro_status'] = np.where(
        employed['MET2013'] == 0, 'Non-metro/Rural', 'Metropolitan'
    )

    # Choose dataset for population calculation
    if full_dataset is not None:
        print(f"    Using full dataset for population bucketing (total population)...")
        pop_dataset = full_dataset.copy()
        
        # Create geo_id for full dataset
        pop_dataset['geo_id'] = (
            pop_dataset['STATEFIP'].astype(str).str.zfill(2) +
            '_' +
            pop_dataset[geo_col].astype(str).str.zfill(geo_digits)
        )
        
        # Calculate TOTAL population by geographic unit
        if geo_type == 'County':
            identifiable = pop_dataset[pop_dataset['COUNTYICP'] != 0]
            geo_pop = identifiable.groupby('geo_id')['PERWT'].sum() if len(identifiable) > 0 else pd.Series(dtype='float64')
        else:
            geo_pop = pop_dataset.groupby('geo_id')['PERWT'].sum()
        
        print(f"    Total geographic units with population data: {len(geo_pop)}")
        
    else:
        print(f"    Using employment data for population bucketing (employed population only)...")
        # Original logic - calculate from employment data only
        if geo_type == 'County':
            identifiable = employed[employed['COUNTYICP'] != 0]
            geo_pop = identifiable.groupby('geo_id')['PERWT'].sum() if len(identifiable) > 0 else pd.Series(dtype='float64')
        else:
            geo_pop = employed.groupby('geo_id')['PERWT'].sum()

    # Map population to employment records
    employed['area_population'] = employed['geo_id'].map(geo_pop).fillna(0)

    # Assign population buckets
    conditions = [employed['area_population'] < bucket[1] for bucket in POPULATION_BUCKETS[:-1]]
    choices = [bucket[2] for bucket in POPULATION_BUCKETS[:-1]]
    employed['pop_label'] = np.select(conditions, choices, default=POPULATION_BUCKETS[-1][2])

    # Special handling for unknown counties
    if geo_type == 'County':
        unknown_mask = employed['COUNTYICP'] == 0
        employed.loc[unknown_mask, 'pop_label'] = 'Unknown County'
        print(f"    Assigned {unknown_mask.sum():,} employment records to 'Unknown County'")

    # Print summary
    print_distribution_summary(employed, geo_type)

    return employed

def print_distribution_summary(employed, geo_type):
    """Print metro status and population bucket distributions."""
    print(f"\n{geo_type} metro status distribution (employment):")
    metro_dist = employed.groupby('metro_status')['PERWT'].sum()
    total_emp = metro_dist.sum()
    for status, emp in metro_dist.items():
        print(f"  {status}: {emp:,.0f} employed ({emp/total_emp*100:.1f}%)")
    
    print(f"\n{geo_type} population bucket distribution:")
    pop_dist = employed.groupby('pop_label')['PERWT'].sum().sort_index()
    for bucket, emp in pop_dist.items():
        # Also show how many geographic units in each bucket
        units_in_bucket = employed[employed['pop_label'] == bucket]['geo_id'].nunique()
        geo_label = 'counties' if geo_type == 'County' else 'PUMAs'
        print(f"  {bucket}: {emp:,.0f} employed in {units_in_bucket} {geo_label} ({emp/total_emp*100:.1f}%)")
    
    if geo_type == 'County':
        unknown_county = (employed['pop_label'] == 'Unknown County').sum()
        unknown_emp = employed[employed['pop_label'] == 'Unknown County']['PERWT'].sum()
        print(f"\n✓ Unknown County check: {unknown_county:,} employment records, {unknown_emp:,.0f} employment")

def create_combined_dataframe(employed_data, analysis_type):
    """Create combined dataframe for either NAICS or SOC analysis."""
    print(f"  Creating combined {analysis_type} dataframe...")
    
    if analysis_type == 'NAICS':
        levels = ['naics_2digit', 'naics_3digit', 'naics_4digit', 'naics_5digit', 'naics_6digit']
        code_col = 'industry_code'
        title_col = 'industry_title'
        level_col = 'naics_level'
    else:  # SOC
        levels = ['soc_2digit', 'soc_3digit', 'soc_4digit', 'soc_5digit', 'soc_6digit']
        code_col = 'occupation_code'
        title_col = 'occupation_title'
        level_col = 'soc_level'
    
    all_results = []
    
    for level in levels:
        if level not in employed_data.columns:
            print(f"    Warning: {level} not found in data")
            continue
        
        # Filter out unclassified records
        level_data = employed_data[employed_data[level] != 'Unclassified'].copy()
        
        if len(level_data) == 0:
            print(f"    Warning: No classified data for {level}")
            continue
        
        # Calculate employment shares by code, metro status, and population bucket
        grouped = (level_data.groupby([level, 'metro_status', 'pop_label'])['PERWT']
                  .sum()
                  .reset_index())
        
        # Get total employment by code and metro status
        totals = (level_data.groupby([level, 'metro_status'])['PERWT']
                 .sum()
                 .reset_index()
                 .rename(columns={'PERWT': 'total_employment'}))
        
        # Merge and calculate percentages
        merged = grouped.merge(totals, on=[level, 'metro_status'], how='left')
        merged['employment_share'] = (
            (merged['PERWT'] / merged['total_employment'] * 100)
            .round(1)
            .fillna(0)
        )
        
        # Create pivot table
        try:
            pivot = merged.pivot_table(
                index=[level, 'metro_status'],
                columns='pop_label',
                values='employment_share',
                fill_value=0
            ).reset_index()
        except Exception as e:
            print(f"    Error creating pivot for {level}: {e}")
            continue
        
        # Ensure all bucket columns exist
        for bucket in BUCKET_ORDER:
            if bucket not in pivot.columns:
                pivot[bucket] = 0.0
        
        # Add titles
        if analysis_type == 'NAICS' and 'NAICS_MAPPING' in globals() and NAICS_MAPPING:
            def get_title(code):
                code_str = str(code).strip().upper()  # Ensure consistent case for alphanumeric codes
                
                # Try direct lookup first
                if code_str in NAICS_MAPPING:
                    mapping_value = NAICS_MAPPING[code_str]
                    if isinstance(mapping_value, dict):
                        return mapping_value.get('title', f'NAICS {code_str}')
                    elif isinstance(mapping_value, str):
                        return mapping_value
                
                # ENHANCED FALLBACK: Try shorter codes (handles alphanumeric like 611M1)
                if len(code_str) > 2:
                    for fallback_length in [5, 4, 3, 2]:
                        if len(code_str) > fallback_length:
                            fallback_code = code_str[:fallback_length]
                            if fallback_code in NAICS_MAPPING:
                                mapping_value = NAICS_MAPPING[fallback_code]
                                if isinstance(mapping_value, str):
                                    return f"{mapping_value} (detailed)"
                
                # Special handling for alphanumeric codes - try numeric portion
                import re
                numeric_match = re.match(r'^(\d+)', code_str)
                if numeric_match:
                    numeric_part = numeric_match.group(1)
                    if numeric_part in NAICS_MAPPING:
                        mapping_value = NAICS_MAPPING[numeric_part]
                        if isinstance(mapping_value, str):
                            return f"{mapping_value} (specialized)"
                        elif isinstance(mapping_value, dict):
                            title = mapping_value.get('title', f'NAICS {numeric_part}')
                            return f"{title} (specialized)"
                
                return f'NAICS {code_str}'
        elif analysis_type == 'SOC' and 'SOC_MAPPING' in globals() and SOC_MAPPING:
            # Use SOC mapping
            def get_title(code):
                code_str = str(code).strip()
                
                # Look for this code in the SOC mapping
                for acs_code, soc_hierarchy in SOC_MAPPING.items():
                    for soc_level_key in ['soc2_code', 'soc3_code', 'soc4_code', 'soc5_code', 'soc6_code']:
                        if soc_hierarchy.get(soc_level_key) == code_str:
                            title_key = soc_level_key.replace('_code', '_title')
                            title = soc_hierarchy.get(title_key, f'SOC {code_str}')
                            if title and title != f'SOC {code_str}':
                                return title
                
                # Fallback to basic SOC titles
                basic_soc_titles = {
                    '11': 'Management Occupations',
                    '13': 'Business and Financial Operations Occupations',
                    '15': 'Computer and Mathematical Occupations',
                    '17': 'Architecture and Engineering Occupations',
                    '19': 'Life, Physical, and Social Science Occupations',
                    '21': 'Community and Social Service Occupations',
                    '23': 'Legal Occupations',
                    '25': 'Educational Instruction and Library Occupations',
                    '27': 'Arts, Design, Entertainment, Sports, and Media Occupations',
                    '29': 'Healthcare Practitioners and Technical Occupations',
                    '31': 'Healthcare Support Occupations',
                    '33': 'Protective Service Occupations',
                    '35': 'Food Preparation and Serving Related Occupations',
                    '37': 'Building and Grounds Cleaning and Maintenance Occupations',
                    '39': 'Personal Care and Service Occupations',
                    '41': 'Sales and Related Occupations',
                    '43': 'Office and Administrative Support Occupations',
                    '45': 'Farming, Fishing, and Forestry Occupations',
                    '47': 'Construction and Extraction Occupations',
                    '49': 'Installation, Maintenance, and Repair Occupations',
                    '51': 'Production Occupations',
                    '53': 'Transportation and Material Moving Occupations',
                    '55': 'Military Specific Occupations'
                }
                
                code_2digit = code_str[:2] if len(code_str) >= 2 else code_str
                return basic_soc_titles.get(code_2digit, f'SOC {code_str}')
        else:
            # Generic titles
            def get_title(code):
                return f'{analysis_type} {code}'
        
        unique_codes = pivot[level].unique()
        title_mapping = {code: get_title(code) for code in unique_codes}
        pivot[title_col] = pivot[level].map(title_mapping)
        
        # Rename and format columns
        pivot = pivot.rename(columns={
            level: code_col,
            'metro_status': 'rural_urban_status'
        })
        
        # Add level identifier
        pivot[level_col] = level
        
        # Reorder columns
        base_columns = [level_col, code_col, title_col, 'rural_urban_status']
        final_columns = base_columns + BUCKET_ORDER
        existing_columns = [col for col in final_columns if col in pivot.columns]
        pivot = pivot[existing_columns]
        
        # Sort
        pivot['rural_urban_status'] = pd.Categorical(
            pivot['rural_urban_status'],
            categories=['Non-metro/Rural', 'Metropolitan'],
            ordered=True
        )
        pivot = pivot.sort_values([code_col, 'rural_urban_status']).reset_index(drop=True)
        
        all_results.append(pivot)
        print(f"    {level}: {len(pivot)} records processed")
    
    # Combine all levels
    if all_results:
        combined_df = pd.concat(all_results, ignore_index=True)
        print(f"  Combined {analysis_type} dataframe: {len(combined_df)} total records")
        return combined_df
    else:
        print(f"  No {analysis_type} data available")
        return pd.DataFrame()


def print_sample_data(combined_df, analysis_type, geo_type):
    """Print sample data for the analysis."""
    print(f"\nSample {geo_type} {analysis_type} data:")
    
    if analysis_type == 'NAICS':
        sample_cols = ['naics_level', 'industry_code', 'industry_title', 'rural_urban_status', '0-25K', '25K-50K', '50K-75K']
    else:  # SOC
        sample_cols = ['soc_level', 'occupation_code', 'occupation_title', 'rural_urban_status', '0-25K', '25K-50K']
    
    available_cols = [col for col in sample_cols if col in combined_df.columns]
    print(combined_df[available_cols].head())

def verify_percentages(combined_df):
    """Verify that percentage columns sum to approximately 100."""
    bucket_cols = [col for col in combined_df.columns if col in BUCKET_ORDER]
    combined_df['row_sum'] = combined_df[bucket_cols].sum(axis=1)
    print(f"\nRow sums (should be ~100): min={combined_df['row_sum'].min():.1f}, max={combined_df['row_sum'].max():.1f}")
    combined_df.drop('row_sum', axis=1, inplace=True)  # Remove verification column

def print_final_summary(analysis_results):
    """Print final analysis summary."""
    print(f"\n" + "="*60)
    print("ANALYSIS COMPLETE - SUMMARY")
    print("="*60)
    
    for analysis_type, (puma_df, county_df) in analysis_results.items():
        print(f"{analysis_type} Analysis:")
        print(f"  PUMA tab: {len(puma_df):,} rows")
        print(f"  County tab: {len(county_df):,} rows")
    
    # Show sample of final results
    if 'NAICS' in analysis_results and len(analysis_results['NAICS'][0]) > 0:
        print(f"\nSample PUMA NAICS results:")
        print(analysis_results['NAICS'][0].head(3))
    
    print("\n✓ Analysis complete! You can now open the Excel files to view results.")

In [55]:
# ============================================================================
# NAICS COVERAGE ANALYSIS FUNCTIONS
# ============================================================================

def calculate_naics_coverage_vectorized(employed_with_pop, unit_pop, unit_type, total_us_employment, full_county_universe=None):
    """
    Calculate NAICS coverage using vectorized operations for efficiency.
    Updated to support full county universe for accurate county counts.
    """
    print(f"  Starting optimized NAICS coverage calculation for {unit_type}...")
    
    all_buckets = ['0-25K', '25K-50K', '50K-75K', '75K-100K', '100K-1M', '1M+']
    unit_label = 'pumas' if unit_type == 'PUMA' else 'counties'
    
    # Determine if using full universe for county counts
    use_full_universe = (unit_type == 'County' and full_county_universe is not None)
    
    if use_full_universe:
        print(f"    Using full county universe for accurate county counts...")
        # Create county_fips if not exists
        if 'county_fips' not in employed_with_pop.columns:
            employed_with_pop['county_fips'] = (
                employed_with_pop['STATEFIP'].astype(int).astype(str).str.zfill(2) + 
                employed_with_pop['COUNTYFIP'].astype(int).astype(str).str.zfill(3)
            )
        
        # Create metro label mapping
        employed_with_pop['metro_label'] = employed_with_pop['metro_status'].map({
            'Non-metro/Rural': 'Rural',
            'Metropolitan': 'Urban'
        })
        
        # Get metro mapping from ACS data
        metro_mapping = employed_with_pop[['county_fips', 'metro_label']].drop_duplicates()
        
        # FIXED: Use full universe for counts, estimate metro status for missing counties
        universe_with_metro = full_county_universe.merge(metro_mapping, on='county_fips', how='left')
        
        counties_without_metro = universe_with_metro['metro_label'].isna().sum()
        print(f"    Warning: {counties_without_metro} counties have no ACS metro status")
        
        # For missing metro status, estimate based on population size (simple heuristic)
        # Counties over 50K are more likely to be urban
        universe_with_metro.loc[
            universe_with_metro['metro_label'].isna() & 
            universe_with_metro['pop_label'].isin(['75K-100K', '100K-1M', '1M+']), 
            'metro_label'
        ] = 'Urban'
        
        universe_with_metro.loc[
            universe_with_metro['metro_label'].isna(), 
            'metro_label'
        ] = 'Rural'
        
        # Now use ALL counties for counts
        unit_counts_full = universe_with_metro.groupby(['pop_label', 'metro_label']).size().reset_index(name='count')
        unit_counts_pivot = unit_counts_full.pivot(index='pop_label', columns='metro_label', values='count').fillna(0)
        unit_counts_pivot.columns = ['Non-metro/Rural', 'Metropolitan']  # Standardize column names
        unit_totals_full = universe_with_metro.groupby('pop_label').size()
        
        print(f"    Using ALL {len(universe_with_metro)} counties for counts (estimated metro status for missing)")
    else:
        print(f"    Using ACS data only for unit counts...")
        # Standardize metro status values
        print(f"    Standardizing metro status values...")
        
        employed_with_pop = employed_with_pop.copy()
        unit_pop = unit_pop.copy()
        
        # Map employed data to match unit_pop naming if needed
        employed_metro_mapping = {
            'Rural': 'Non-metro/Rural',
            'Urban': 'Metropolitan'
        }
        
        # Apply mapping if values need to be standardized
        if 'Rural' in employed_with_pop['metro_status'].values or 'Urban' in employed_with_pop['metro_status'].values:
            employed_with_pop['metro_status'] = employed_with_pop['metro_status'].map(employed_metro_mapping)
        
        print(f"    Employment metro status values: {employed_with_pop['metro_status'].unique()}")
        print(f"    Unit metro status values: {unit_pop['metro_status'].unique()}")
        
        # Pre-create bucket categorical to ensure all buckets are present
        employed_with_pop['pop_label'] = pd.Categorical(employed_with_pop['pop_label'], categories=all_buckets, ordered=True)
        unit_pop['pop_label'] = pd.Categorical(unit_pop['pop_label'], categories=all_buckets, ordered=True)
        
        # Vectorized unit counts by bucket and metro status
        print(f"    Computing unit counts...")
        unit_counts_pivot = unit_pop.groupby(['pop_label', 'metro_status']).size().unstack(fill_value=0)
        unit_totals_full = unit_pop.groupby('pop_label').size()
    
    # Vectorized employment totals by bucket and metro status (always from ACS data)
    print(f"    Computing employment totals...")
    emp_by_bucket_metro = employed_with_pop.groupby(['pop_label', 'metro_status'])['PERWT'].sum().unstack(fill_value=0)
    emp_by_bucket_total = employed_with_pop.groupby('pop_label')['PERWT'].sum()
    
    # Calculate NAICS coverage by bucket and metro status
    print(f"    Computing NAICS coverage rates...")
    
    naics_levels = ['naics_2digit', 'naics_3digit', 'naics_4digit']
    
    # Pre-calculate ALL NAICS coverage by bucket and metro status
    coverage_data = {}
    
    for level in naics_levels:
        # Validate NAICS level exists
        if level not in employed_with_pop.columns:
            print(f"    Warning: {level} not found in data")
            continue
            
        # Create mask for classified records
        classified_mask = employed_with_pop[level] != 'Unclassified'
        classified_data = employed_with_pop[classified_mask]
        
        # Vectorized coverage calculation by bucket and metro status
        classified_by_bucket_metro = classified_data.groupby(['pop_label', 'metro_status'])['PERWT'].sum().unstack(fill_value=0)
        classified_by_bucket_total = classified_data.groupby('pop_label')['PERWT'].sum()
        
        # Calculate coverage percentages - vectorized division
        coverage_total = (classified_by_bucket_total / emp_by_bucket_total * 100).fillna(0)
        
        # Handle metro status coverage (avoid division by zero)
        coverage_metro = classified_by_bucket_metro.div(emp_by_bucket_metro, fill_value=0) * 100
        coverage_metro = coverage_metro.fillna(0)

        coverage_data[level] = {
            'total': coverage_total,
            'metro': coverage_metro
        }
    
    # Build results DataFrame efficiently
    print(f"    Assembling results...")
    results = []
    
    for bucket in all_buckets:
        # Get unit counts (from full universe if available, otherwise from ACS)
        n_units_total = unit_totals_full.get(bucket, 0)
        
        # Better access to unstacked data for unit counts
        if bucket in unit_counts_pivot.index:
            n_units_rural = unit_counts_pivot.loc[bucket, 'Non-metro/Rural'] if 'Non-metro/Rural' in unit_counts_pivot.columns else 0
            n_units_urban = unit_counts_pivot.loc[bucket, 'Metropolitan'] if 'Metropolitan' in unit_counts_pivot.columns else 0
        else:
            n_units_rural = n_units_urban = 0
        
        # Get employment totals with proper index checking
        total_emp_all = emp_by_bucket_total.get(bucket, 0)
        
        # Better access to employment by metro status
        if bucket in emp_by_bucket_metro.index:
            total_emp_rural = emp_by_bucket_metro.loc[bucket, 'Non-metro/Rural'] if 'Non-metro/Rural' in emp_by_bucket_metro.columns else 0
            total_emp_urban = emp_by_bucket_metro.loc[bucket, 'Metropolitan'] if 'Metropolitan' in emp_by_bucket_metro.columns else 0
        else:
            total_emp_rural = total_emp_urban = 0
        
        # Calculate employment percentages
        total_emp_pct = (total_emp_all / total_us_employment * 100) if total_us_employment > 0 else 0
        rural_emp_pct = (total_emp_rural / total_us_employment * 100) if total_us_employment > 0 else 0
        urban_emp_pct = (total_emp_urban / total_us_employment * 100) if total_us_employment > 0 else 0
        
        # Build row
        row = {
            'pop_label': bucket,
            f'total_{unit_label}': int(n_units_total),
            f'rural_{unit_label}': int(n_units_rural),
            f'urban_{unit_label}': int(n_units_urban),
            'total_employment_pct': round(total_emp_pct, 2),
            'rural_employment_pct': round(rural_emp_pct, 2),
            'urban_employment_pct': round(urban_emp_pct, 2),
            'total_employment': int(total_emp_all),
            'rural_employment': int(total_emp_rural),
            'urban_employment': int(total_emp_urban)
        }
        
        # Add NAICS coverage data for ALL levels
        for level in naics_levels:
            if level not in coverage_data:
                # If level doesn't exist, add zeros
                row[f'total_{level}_coverage_pct'] = 0.0
                row[f'rural_{level}_coverage_pct'] = 0.0
                row[f'urban_{level}_coverage_pct'] = 0.0
                continue
                
            coverage_total = coverage_data[level]['total'].get(bucket, 0)
            
            if bucket in coverage_data[level]['metro'].index:
                coverage_rural = coverage_data[level]['metro'].loc[bucket, 'Non-metro/Rural'] if 'Non-metro/Rural' in coverage_data[level]['metro'].columns else 0
                coverage_urban = coverage_data[level]['metro'].loc[bucket, 'Metropolitan'] if 'Metropolitan' in coverage_data[level]['metro'].columns else 0
            else:
                coverage_rural = coverage_urban = 0
            
            row[f'total_{level}_coverage_pct'] = round(float(coverage_total), 2)
            row[f'rural_{level}_coverage_pct'] = round(float(coverage_rural), 2)
            row[f'urban_{level}_coverage_pct'] = round(float(coverage_urban), 2)
        
        results.append(row)
    
    coverage_df = pd.DataFrame(results)
    coverage_df['pop_label'] = pd.Categorical(coverage_df['pop_label'], categories=all_buckets, ordered=True)
    
    universe_note = " with full universe" if use_full_universe else ""
    print(f"    Optimized NAICS coverage calculation{universe_note} complete for {unit_type}")
    return coverage_df.sort_values('pop_label').reset_index(drop=True)



def create_unit_population_summary(employed_data, geo_type='PUMA'):
    """
    Create unit population summary for coverage analysis.
    This creates the unit_pop dataframe needed for coverage calculations.
    """
    print(f"Creating {geo_type} unit population summary...")
    
    # Group by geographic unit to get one record per unit
    if geo_type == 'PUMA':  # ← Fixed: removed "ACA"
        unit_summary = employed_data.groupby(['geo_id', 'metro_status', 'pop_label']).agg({
            'PERWT': 'sum'  # Total employment in this unit
        }).reset_index()
    else:  # County
        unit_summary = employed_data.groupby(['geo_id', 'metro_status', 'pop_label']).agg({
            'PERWT': 'sum'  # Total employment in this unit
        }).reset_index()
    
    # Each row represents one geographic unit
    unit_pop = unit_summary[['geo_id', 'metro_status', 'pop_label']].drop_duplicates()
    
    print(f"✓ {geo_type} unit summary created: {len(unit_pop)} units")
    
    return unit_pop

def create_coverage_analysis(puma_employed, county_employed, full_county_universe=None):
    """
    Create NAICS coverage analysis for both PUMA and County data.
    Updated to use full county universe as denominator for accurate coverage percentages.
    
    Args:
        puma_employed: PUMA employment data
        county_employed: County employment data 
        full_county_universe: Complete county list with pop_labels (optional)
    
    Returns:
        puma_coverage, county_coverage dataframes
    """
    print("\nSTEP 4: Creating NAICS Coverage Analysis...")
    print("="*50)
    
    # Calculate total US employment for percentage calculations
    total_us_employment_puma = puma_employed['PERWT'].sum()
    total_us_employment_county = county_employed['PERWT'].sum()
    
    print(f"Total US Employment (PUMA): {total_us_employment_puma:,.0f}")
    print(f"Total US Employment (County): {total_us_employment_county:,.0f}")
    
    # ----------------------------------------
    # PUMA NAICS Coverage (no change - no full universe available)
    # ----------------------------------------
    print(f"\n--- PUMA NAICS Coverage ---")
    puma_unit_pop = create_unit_population_summary(puma_employed, 'PUMA')
    puma_coverage = calculate_naics_coverage_vectorized(
        puma_employed, 
        puma_unit_pop, 
        'PUMA', 
        total_us_employment_puma
    )
    
    # ----------------------------------------
    # County NAICS Coverage with Full Universe Support
    # ----------------------------------------
    print(f"\n--- County NAICS Coverage ---")
    
    # Filter to counties with valid COUNTYFIP (using COUNTYFIP now)
    unknown_mask = county_employed['COUNTYFIP'] == 0
    known_county_df = county_employed[~unknown_mask].copy()
    unknown_county_df = county_employed[unknown_mask].copy()
    
    # Create county_fips using COUNTYFIP if not exists
    if 'county_fips' not in known_county_df.columns:
        known_county_df['county_fips'] = (
            known_county_df['STATEFIP'].astype(int).astype(str).str.zfill(2) + 
            known_county_df['COUNTYFIP'].astype(int).astype(str).str.zfill(3)
        )
    
    if full_county_universe is not None:
        print("Using full county universe for accurate coverage percentages...")
        county_coverage = calculate_naics_coverage_with_full_universe(
            known_county_df,
            full_county_universe, 
            'County',
            total_us_employment_county
        )
    else:
        print("Using ACS data only for county coverage...")
        county_unit_pop = create_unit_population_summary(known_county_df, 'County')
        county_coverage = calculate_naics_coverage_vectorized(
            known_county_df,
            county_unit_pop,
            'County',
            total_us_employment_county
        )
    
    # Add "Unknown County" row if relevant
    if len(unknown_county_df) > 0:
        unknown_weight = unknown_county_df['PERWT'].sum()
        unknown_row = {
            'pop_label': 'Population Unknown',
            'total_counties': 0,
            'rural_counties': 0,
            'urban_counties': 0,
            'total_employment': unknown_weight,
            'rural_employment': 0,
            'urban_employment': 0,
            'total_employment_pct': round(unknown_weight / total_us_employment_county * 100, 2),
            'rural_employment_pct': 0.0,
            'urban_employment_pct': 0.0,
            'total_naics_2digit_coverage_pct': 0.0,
            'rural_naics_2digit_coverage_pct': 0.0,
            'urban_naics_2digit_coverage_pct': 0.0,
            'total_naics_3digit_coverage_pct': 0.0,
            'rural_naics_3digit_coverage_pct': 0.0,
            'urban_naics_3digit_coverage_pct': 0.0,
            'total_naics_4digit_coverage_pct': 0.0,
            'rural_naics_4digit_coverage_pct': 0.0,
            'urban_naics_4digit_coverage_pct': 0.0,
        }
        county_coverage = pd.concat([county_coverage, pd.DataFrame([unknown_row])], ignore_index=True)
        print(f"✓ Added Unknown County row: {unknown_weight:,.0f} employment")
    
    # ----------------------------------------
    # Display Summaries
    # ----------------------------------------
    print(f"\n✓ PUMA Coverage Analysis Complete: {len(puma_coverage)} population buckets")
    if len(puma_coverage) > 0:
        sample_cols = ['pop_label', 'total_pumas', 'total_employment_pct', 'total_naics_2digit_coverage_pct']
        available_cols = [col for col in sample_cols if col in puma_coverage.columns]
        print("Sample PUMA coverage data:")
        print(puma_coverage[available_cols].head(3))
    
    print(f"\n✓ County Coverage Analysis Complete: {len(county_coverage)} population buckets")
    if len(county_coverage) > 0:
        sample_cols = ['pop_label', 'total_counties', 'total_employment_pct', 'total_naics_2digit_coverage_pct']
        available_cols = [col for col in sample_cols if col in county_coverage.columns]
        print("Sample County coverage data:")
        print(county_coverage[available_cols].head(3))
    
    return puma_coverage, county_coverage

def calculate_naics_coverage_with_full_universe(employed_data, full_county_universe, unit_type, total_us_employment):
    """
    Calculate NAICS coverage using full county universe as denominator.
    
    Args:
        employed_data: ACS employment data (filtered)
        full_county_universe: Complete county list with pop_labels
        unit_type: Should be 'County'
        total_us_employment: Total employment for percentage calculations
    """
    print(f"  Starting NAICS coverage calculation with full universe for {unit_type}...")
    
    all_buckets = ['0-25K', '25K-50K', '50K-75K', '75K-100K', '100K-1M', '1M+']
    naics_levels = ['naics_2digit', 'naics_3digit', 'naics_4digit']
    
    # Create metro label mapping
    employed_data['metro_label'] = employed_data['metro_status'].map({
        'Non-metro/Rural': 'Rural',
        'Metropolitan': 'Urban'
    })
    
    # Get metro mapping and merge with full universe
    metro_mapping = employed_data[['county_fips', 'metro_label']].drop_duplicates()
    universe_with_metro = full_county_universe.merge(metro_mapping, on='county_fips', how='left')
    
    counties_without_metro = universe_with_metro['metro_label'].isna().sum()
    if counties_without_metro > 0:
        print(f"    Warning: {counties_without_metro} counties have no metro status (excluded)")
    
    universe_with_metro = universe_with_metro.dropna(subset=['metro_label'])
    print(f"    Counties with metro status: {len(universe_with_metro)} / {len(full_county_universe)}")
    
    # Calculate total county counts by bucket+metro from FULL UNIVERSE
    total_county_counts = universe_with_metro.groupby(['pop_label', 'metro_label']).size().reset_index(name='total_counties')
    total_county_counts_dict = {}
    for _, row in total_county_counts.iterrows():
        key = (row['pop_label'], row['metro_label'])
        total_county_counts_dict[key] = row['total_counties']
    
    # Also calculate totals by bucket only
    total_county_by_bucket = universe_with_metro.groupby('pop_label').size()
    
    # Calculate employment totals by bucket and metro from ACS data
    emp_by_bucket_metro = employed_data.groupby(['pop_label', 'metro_status'])['PERWT'].sum().reset_index()
    emp_by_bucket_total = employed_data.groupby('pop_label')['PERWT'].sum()
    
    # Build results
    results = []
    
    for bucket in all_buckets:
        # Get county counts from FULL UNIVERSE
        n_counties_total = total_county_by_bucket.get(bucket, 0)
        
        # Get county counts by metro status from full universe
        n_counties_rural = total_county_counts_dict.get((bucket, 'Rural'), 0)
        n_counties_urban = total_county_counts_dict.get((bucket, 'Urban'), 0)
        
        # Get employment totals from ACS data
        total_emp_all = emp_by_bucket_total.get(bucket, 0)
        
        # Get employment by metro status
        bucket_emp_data = emp_by_bucket_metro[emp_by_bucket_metro['pop_label'] == bucket]
        total_emp_rural = bucket_emp_data[bucket_emp_data['metro_status'] == 'Non-metro/Rural']['PERWT'].sum()
        total_emp_urban = bucket_emp_data[bucket_emp_data['metro_status'] == 'Metropolitan']['PERWT'].sum()
        
        # Calculate employment percentages
        total_emp_pct = (total_emp_all / total_us_employment * 100) if total_us_employment > 0 else 0
        rural_emp_pct = (total_emp_rural / total_us_employment * 100) if total_us_employment > 0 else 0
        urban_emp_pct = (total_emp_urban / total_us_employment * 100) if total_us_employment > 0 else 0
        
        # Build base row
        row = {
            'pop_label': bucket,
            'total_counties': int(n_counties_total),
            'rural_counties': int(n_counties_rural),
            'urban_counties': int(n_counties_urban),
            'total_employment_pct': round(total_emp_pct, 2),
            'rural_employment_pct': round(rural_emp_pct, 2),
            'urban_employment_pct': round(urban_emp_pct, 2),
            'total_employment': int(total_emp_all),
            'rural_employment': int(total_emp_rural),
            'urban_employment': int(total_emp_urban)
        }
        
        # Calculate NAICS coverage for each level
        for level in naics_levels:
            if level not in employed_data.columns:
                row[f'total_{level}_coverage_pct'] = 0.0
                row[f'rural_{level}_coverage_pct'] = 0.0
                row[f'urban_{level}_coverage_pct'] = 0.0
                continue
            
            # Get classified employment data for this level
            classified_data = employed_data[employed_data[level] != 'Unclassified']
            
            if len(classified_data) == 0:
                row[f'total_{level}_coverage_pct'] = 0.0
                row[f'rural_{level}_coverage_pct'] = 0.0
                row[f'urban_{level}_coverage_pct'] = 0.0
                continue
            
            # Calculate coverage by bucket and metro
            classified_by_bucket = classified_data.groupby('pop_label')['PERWT'].sum()
            classified_by_bucket_metro = classified_data.groupby(['pop_label', 'metro_status'])['PERWT'].sum()
            
            # Total coverage for this bucket
            classified_total = classified_by_bucket.get(bucket, 0)
            coverage_total = (classified_total / total_emp_all * 100) if total_emp_all > 0 else 0
            
            # Coverage by metro status
            classified_rural = classified_by_bucket_metro.get((bucket, 'Non-metro/Rural'), 0)
            classified_urban = classified_by_bucket_metro.get((bucket, 'Metropolitan'), 0)
            
            coverage_rural = (classified_rural / total_emp_rural * 100) if total_emp_rural > 0 else 0
            coverage_urban = (classified_urban / total_emp_urban * 100) if total_emp_urban > 0 else 0
            
            row[f'total_{level}_coverage_pct'] = round(coverage_total, 2)
            row[f'rural_{level}_coverage_pct'] = round(coverage_rural, 2) 
            row[f'urban_{level}_coverage_pct'] = round(coverage_urban, 2)
        
        results.append(row)
    
    coverage_df = pd.DataFrame(results)
    coverage_df['pop_label'] = pd.Categorical(coverage_df['pop_label'], categories=all_buckets, ordered=True)
    
    print(f"    NAICS coverage calculation with full universe complete for {unit_type}")
    return coverage_df.sort_values('pop_label').reset_index(drop=True)

In [56]:
# # ============================================================================
# # FIXED NAICS COVERAGE ANALYSIS FUNCTIONS
# # ============================================================================

# def calculate_naics_coverage_vectorized(employed_with_pop, unit_pop, unit_type, total_us_employment, full_county_universe=None):
#     """
#     Calculate NAICS coverage using vectorized operations for efficiency.
#     FIXED: Always uses pure ACS denominators for consistent coverage calculations.
#     """
#     print(f"  Starting optimized NAICS coverage calculation for {unit_type}...")
#     print(f"  Using pure ACS denominators for consistent coverage...")
    
#     all_buckets = ['0-25K', '25K-50K', '50K-75K', '75K-100K', '100K-1M', '1M+', 'Unknown County']
#     unit_label = 'pumas' if unit_type == 'PUMA' else 'counties'
    
#     # FIXED: Always use pure ACS denominators - count unique counties by pop+metro
#     if unit_type == 'County':
#         print(f"    Using pure ACS denominators - counting ALL counties with employment data...")
        
#         # Create metro label mapping for consistency
#         employed_with_pop['metro_label'] = employed_with_pop['metro_status'].map({
#             'Non-metro/Rural': 'Rural', 
#             'Metropolitan': 'Urban'
#         })
        
#         # Count unique counties by pop+metro combination from ALL ACS data
#         unit_counts_raw = employed_with_pop.groupby(['pop_label', 'metro_label'])['county_fips'].nunique().reset_index(name='count')
#         unit_counts_pivot = unit_counts_raw.pivot(index='pop_label', columns='metro_label', values='count').fillna(0)
#         unit_counts_pivot.columns = ['Non-metro/Rural', 'Metropolitan']  # Standardize column names
#         unit_totals_full = employed_with_pop.groupby('pop_label')['county_fips'].nunique()
        
#         print(f"    Total ACS counties: {employed_with_pop['county_fips'].nunique()}")
#         print(f"    ACS county breakdown:")
#         for pop_label in all_buckets:
#             if pop_label in unit_totals_full.index:
#                 rural_count = unit_counts_pivot.loc[pop_label, 'Non-metro/Rural'] if pop_label in unit_counts_pivot.index and 'Non-metro/Rural' in unit_counts_pivot.columns else 0
#                 urban_count = unit_counts_pivot.loc[pop_label, 'Metropolitan'] if pop_label in unit_counts_pivot.index and 'Metropolitan' in unit_counts_pivot.columns else 0
#                 total_count = unit_totals_full[pop_label]
#                 print(f"      {pop_label}: {total_count} total ({rural_count} Rural + {urban_count} Urban)")
        
#     else:
#         # PUMA logic (unchanged)
#         print(f"    Using ACS data only for unit counts...")
        
#         # Standardize metro status values
#         employed_with_pop = employed_with_pop.copy()
#         unit_pop = unit_pop.copy()
        
#         # Map employed data to match unit_pop naming if needed
#         employed_metro_mapping = {
#             'Rural': 'Non-metro/Rural',
#             'Urban': 'Metropolitan'
#         }
        
#         # Apply mapping if values need to be standardized
#         if 'Rural' in employed_with_pop['metro_status'].values or 'Urban' in employed_with_pop['metro_status'].values:
#             employed_with_pop['metro_status'] = employed_with_pop['metro_status'].map(employed_metro_mapping)
        
#         # Pre-create bucket categorical to ensure all buckets are present
#         employed_with_pop['pop_label'] = pd.Categorical(employed_with_pop['pop_label'], categories=all_buckets, ordered=True)
#         unit_pop['pop_label'] = pd.Categorical(unit_pop['pop_label'], categories=all_buckets, ordered=True)
        
#         # Vectorized unit counts by bucket and metro status (pure ACS)
#         print(f"    Computing unit counts...")
#         unit_counts_pivot = unit_pop.groupby(['pop_label', 'metro_status']).size().unstack(fill_value=0)
#         unit_totals_full = unit_pop.groupby('pop_label').size()
    
#     # Vectorized employment totals by bucket and metro status
#     print(f"    Computing employment totals...")
#     emp_by_bucket_metro = employed_with_pop.groupby(['pop_label', 'metro_status'])['PERWT'].sum().unstack(fill_value=0)
#     emp_by_bucket_total = employed_with_pop.groupby('pop_label')['PERWT'].sum()
    
#     # Calculate NAICS coverage by bucket and metro status
#     print(f"    Computing NAICS coverage rates...")
    
#     naics_levels = ['naics_2digit', 'naics_3digit', 'naics_4digit']
    
#     # Pre-calculate ALL NAICS coverage by bucket and metro status
#     coverage_data = {}
    
#     for level in naics_levels:
#         # Validate NAICS level exists
#         if level not in employed_with_pop.columns:
#             print(f"    Warning: {level} not found in data")
#             continue
            
#         # Create mask for classified records
#         classified_mask = employed_with_pop[level] != 'Unclassified'
#         classified_data = employed_with_pop[classified_mask]
        
#         # Vectorized coverage calculation by bucket and metro status
#         classified_by_bucket_metro = classified_data.groupby(['pop_label', 'metro_status'])['PERWT'].sum().unstack(fill_value=0)
#         classified_by_bucket_total = classified_data.groupby('pop_label')['PERWT'].sum()
        
#         # Calculate coverage percentages - vectorized division
#         coverage_total = (classified_by_bucket_total / emp_by_bucket_total * 100).fillna(0)
        
#         # Handle metro status coverage (avoid division by zero)
#         coverage_metro = classified_by_bucket_metro.div(emp_by_bucket_metro, fill_value=0) * 100
#         coverage_metro = coverage_metro.fillna(0)

#         coverage_data[level] = {
#             'total': coverage_total,
#             'metro': coverage_metro
#         }
    
#     # Build results DataFrame efficiently
#     print(f"    Assembling results...")
#     results = []
    
#     for bucket in all_buckets:
#         # Get unit counts (pure ACS)
#         n_units_total = unit_totals_full.get(bucket, 0)
        
#         # Better access to unstacked data for unit counts
#         if bucket in unit_counts_pivot.index:
#             n_units_rural = unit_counts_pivot.loc[bucket, 'Non-metro/Rural'] if 'Non-metro/Rural' in unit_counts_pivot.columns else 0
#             n_units_urban = unit_counts_pivot.loc[bucket, 'Metropolitan'] if 'Metropolitan' in unit_counts_pivot.columns else 0
#         else:
#             n_units_rural = n_units_urban = 0
        
#         # Get employment totals with proper index checking
#         total_emp_all = emp_by_bucket_total.get(bucket, 0)
        
#         # Better access to employment by metro status
#         if bucket in emp_by_bucket_metro.index:
#             total_emp_rural = emp_by_bucket_metro.loc[bucket, 'Non-metro/Rural'] if 'Non-metro/Rural' in emp_by_bucket_metro.columns else 0
#             total_emp_urban = emp_by_bucket_metro.loc[bucket, 'Metropolitan'] if 'Metropolitan' in emp_by_bucket_metro.columns else 0
#         else:
#             total_emp_rural = total_emp_urban = 0
        
#         # Calculate employment percentages
#         total_emp_pct = (total_emp_all / total_us_employment * 100) if total_us_employment > 0 else 0
#         rural_emp_pct = (total_emp_rural / total_us_employment * 100) if total_us_employment > 0 else 0
#         urban_emp_pct = (total_emp_urban / total_us_employment * 100) if total_us_employment > 0 else 0
        
#         # Build row
#         row = {
#             'pop_label': bucket,
#             f'total_{unit_label}': int(n_units_total),
#             f'rural_{unit_label}': int(n_units_rural),
#             f'urban_{unit_label}': int(n_units_urban),
#             'total_employment_pct': round(total_emp_pct, 2),
#             'rural_employment_pct': round(rural_emp_pct, 2),
#             'urban_employment_pct': round(urban_emp_pct, 2),
#             'total_employment': int(total_emp_all),
#             'rural_employment': int(total_emp_rural),
#             'urban_employment': int(total_emp_urban)
#         }
        
#         # Add NAICS coverage data for ALL levels
#         for level in naics_levels:
#             if level not in coverage_data:
#                 # If level doesn't exist, add zeros
#                 row[f'total_{level}_coverage_pct'] = 0.0
#                 row[f'rural_{level}_coverage_pct'] = 0.0
#                 row[f'urban_{level}_coverage_pct'] = 0.0
#                 continue
                
#             coverage_total = coverage_data[level]['total'].get(bucket, 0)
            
#             if bucket in coverage_data[level]['metro'].index:
#                 coverage_rural = coverage_data[level]['metro'].loc[bucket, 'Non-metro/Rural'] if 'Non-metro/Rural' in coverage_data[level]['metro'].columns else 0
#                 coverage_urban = coverage_data[level]['metro'].loc[bucket, 'Metropolitan'] if 'Metropolitan' in coverage_data[level]['metro'].columns else 0
#             else:
#                 coverage_rural = coverage_urban = 0
            
#             row[f'total_{level}_coverage_pct'] = round(float(coverage_total), 2)
#             row[f'rural_{level}_coverage_pct'] = round(float(coverage_rural), 2)
#             row[f'urban_{level}_coverage_pct'] = round(float(coverage_urban), 2)
        
#         results.append(row)
    
#     coverage_df = pd.DataFrame(results)
#     coverage_df['pop_label'] = pd.Categorical(coverage_df['pop_label'], categories=all_buckets, ordered=True)
    
#     print(f"    Optimized NAICS coverage calculation complete for {unit_type}")
#     return coverage_df.sort_values('pop_label').reset_index(drop=True)


# def create_unit_population_summary(employed_data, geo_type='PUMA'):
#     """
#     Create unit population summary for coverage analysis.
#     This creates the unit_pop dataframe needed for coverage calculations.
#     """
#     print(f"Creating {geo_type} unit population summary...")
    
#     # Group by geographic unit to get one record per unit
#     if geo_type == 'PUMA':
#         unit_summary = employed_data.groupby(['geo_id', 'metro_status', 'pop_label']).agg({
#             'PERWT': 'sum'  # Total employment in this unit
#         }).reset_index()
#     else:  # County
#         unit_summary = employed_data.groupby(['geo_id', 'metro_status', 'pop_label']).agg({
#             'PERWT': 'sum'  # Total employment in this unit
#         }).reset_index()
    
#     # Each row represents one geographic unit
#     unit_pop = unit_summary[['geo_id', 'metro_status', 'pop_label']].drop_duplicates()
    
#     print(f"✓ {geo_type} unit summary created: {len(unit_pop)} units")
    
#     return unit_pop


# def create_coverage_analysis(puma_employed, county_employed, full_county_universe=None):
#     """
#     FIXED: Create NAICS coverage analysis using pure ACS denominators.
#     Always uses county_employed data for county counts, ensuring consistency.
    
#     Args:
#         puma_employed: PUMA employment data
#         county_employed: County employment data 
#         full_county_universe: Complete county list (ignored - using pure ACS)
    
#     Returns:
#         puma_coverage, county_coverage dataframes
#     """
#     print("\nSTEP 4: Creating NAICS Coverage Analysis...")
#     print("FIXED: Using pure ACS denominators for consistent coverage...")
#     print("="*50)
    
#     # Calculate total US employment for percentage calculations
#     total_us_employment_puma = puma_employed['PERWT'].sum()
#     total_us_employment_county = county_employed['PERWT'].sum()
    
#     print(f"Total US Employment (PUMA): {total_us_employment_puma:,.0f}")
#     print(f"Total US Employment (County): {total_us_employment_county:,.0f}")
    
#     # ----------------------------------------
#     # PUMA NAICS Coverage
#     # ----------------------------------------
#     print(f"\n--- PUMA NAICS Coverage ---")
#     puma_unit_pop = create_unit_population_summary(puma_employed, 'PUMA')
#     puma_coverage = calculate_naics_coverage_vectorized(
#         puma_employed, 
#         puma_unit_pop, 
#         'PUMA', 
#         total_us_employment_puma
#     )
    
#     # ----------------------------------------
#     # County NAICS Coverage - FIXED to use pure ACS denominators
#     # ----------------------------------------
#     print(f"\n--- County NAICS Coverage ---")
    
#     # FIXED: Use the function that now uses pure ACS denominators
#     county_unit_pop = create_unit_population_summary(county_employed, 'County')
#     county_coverage = calculate_naics_coverage_vectorized(
#         county_employed,
#         county_unit_pop,
#         'County',
#         total_us_employment_county,
#         full_county_universe=None  # Explicitly ignore this parameter
#     )
    
#     # ----------------------------------------
#     # Display Summaries
#     # ----------------------------------------
#     print(f"\n✓ PUMA Coverage Analysis Complete: {len(puma_coverage)} population buckets")
#     if len(puma_coverage) > 0:
#         sample_cols = ['pop_label', 'total_pumas', 'total_employment_pct', 'total_naics_2digit_coverage_pct']
#         available_cols = [col for col in sample_cols if col in puma_coverage.columns]
#         print("Sample PUMA coverage data:")
#         print(puma_coverage[available_cols].head(3))
    
#     print(f"\n✓ County Coverage Analysis Complete: {len(county_coverage)} population buckets")
#     print(f"Using {county_employed['county_fips'].nunique()} ACS counties as denominators")
#     if len(county_coverage) > 0:
#         sample_cols = ['pop_label', 'total_counties', 'total_employment_pct', 'total_naics_2digit_coverage_pct']
#         available_cols = [col for col in sample_cols if col in county_coverage.columns]
#         print("Sample County coverage data:")
#         print(county_coverage[available_cols].head(3))
    
#     return puma_coverage, county_coverage


# # ============================================================================
# # FIXED SOC COVERAGE ANALYSIS FUNCTION
# # ============================================================================

# def calculate_soc_coverage_vectorized(employed_with_pop, unit_pop, unit_type, total_us_employment, full_county_universe=None):
#     """
#     FIXED: Calculate SOC coverage using pure ACS denominators for consistency.
#     """
#     print(f"  Starting optimized SOC coverage calculation for {unit_type}...")
#     print(f"  Using pure ACS denominators for consistent coverage...")
    
#     all_buckets = ['0-25K', '25K-50K', '50K-75K', '75K-100K', '100K-1M', '1M+', 'Unknown County']
#     unit_label = 'pumas' if unit_type == 'PUMA' else 'counties'
    
#     # FIXED: Always use pure ACS denominators - count unique counties by pop+metro
#     if unit_type == 'County':
#         print(f"    Using pure ACS denominators - counting ALL counties with employment data...")
        
#         # Create metro label mapping for consistency
#         employed_with_pop['metro_label'] = employed_with_pop['metro_status'].map({
#             'Non-metro/Rural': 'Rural', 
#             'Metropolitan': 'Urban'
#         })
        
#         # Count unique counties by pop+metro combination from ALL ACS data
#         unit_counts_raw = employed_with_pop.groupby(['pop_label', 'metro_label'])['county_fips'].nunique().reset_index(name='count')
#         unit_counts_pivot = unit_counts_raw.pivot(index='pop_label', columns='metro_label', values='count').fillna(0)
#         unit_counts_pivot.columns = ['Non-metro/Rural', 'Metropolitan']  # Standardize column names
#         unit_totals_full = employed_with_pop.groupby('pop_label')['county_fips'].nunique()
        
#         print(f"    Total ACS counties: {employed_with_pop['county_fips'].nunique()}")
#         print(f"    ACS county breakdown:")
#         for pop_label in all_buckets:
#             if pop_label in unit_totals_full.index:
#                 rural_count = unit_counts_pivot.loc[pop_label, 'Non-metro/Rural'] if pop_label in unit_counts_pivot.index and 'Non-metro/Rural' in unit_counts_pivot.columns else 0
#                 urban_count = unit_counts_pivot.loc[pop_label, 'Metropolitan'] if pop_label in unit_counts_pivot.index and 'Metropolitan' in unit_counts_pivot.columns else 0
#                 total_count = unit_totals_full[pop_label]
#                 print(f"      {pop_label}: {total_count} total ({rural_count} Rural + {urban_count} Urban)")
        
#     else:
#         # PUMA logic (unchanged)
#         print(f"    Using ACS data only for unit counts...")
        
#         # Standardize metro status values
#         employed_with_pop = employed_with_pop.copy()
#         unit_pop = unit_pop.copy()
        
#         # Map employed data to match unit_pop naming if needed
#         employed_metro_mapping = {
#             'Rural': 'Non-metro/Rural',
#             'Urban': 'Metropolitan'
#         }
        
#         # Apply mapping if values need to be standardized
#         if 'Rural' in employed_with_pop['metro_status'].values or 'Urban' in employed_with_pop['metro_status'].values:
#             employed_with_pop['metro_status'] = employed_with_pop['metro_status'].map(employed_metro_mapping)
        
#         # Pre-create bucket categorical to ensure all buckets are present
#         employed_with_pop['pop_label'] = pd.Categorical(employed_with_pop['pop_label'], categories=all_buckets, ordered=True)
#         unit_pop['pop_label'] = pd.Categorical(unit_pop['pop_label'], categories=all_buckets, ordered=True)
        
#         # Vectorized unit counts by bucket and metro status (pure ACS)
#         print(f"    Computing unit counts...")
#         unit_counts_pivot = unit_pop.groupby(['pop_label', 'metro_status']).size().unstack(fill_value=0)
#         unit_totals_full = unit_pop.groupby('pop_label').size()
    
#     # Vectorized employment totals by bucket and metro status
#     print(f"    Computing employment totals...")
#     emp_by_bucket_metro = employed_with_pop.groupby(['pop_label', 'metro_status'])['PERWT'].sum().unstack(fill_value=0)
#     emp_by_bucket_total = employed_with_pop.groupby('pop_label')['PERWT'].sum()
    
#     # Calculate SOC coverage by bucket and metro status
#     print(f"    Computing SOC coverage rates...")
    
#     soc_levels = ['soc_2digit', 'soc_3digit', 'soc_4digit']
    
#     # Pre-calculate ALL SOC coverage by bucket and metro status
#     coverage_data = {}
    
#     for level in soc_levels:
#         # Validate SOC level exists
#         if level not in employed_with_pop.columns:
#             print(f"    Warning: {level} not found in data")
#             continue
            
#         # Create mask for classified records
#         classified_mask = employed_with_pop[level] != 'Unclassified'
#         classified_data = employed_with_pop[classified_mask]
        
#         # Vectorized coverage calculation by bucket and metro status
#         classified_by_bucket_metro = classified_data.groupby(['pop_label', 'metro_status'])['PERWT'].sum().unstack(fill_value=0)
#         classified_by_bucket_total = classified_data.groupby('pop_label')['PERWT'].sum()
        
#         # Calculate coverage percentages - vectorized division
#         coverage_total = (classified_by_bucket_total / emp_by_bucket_total * 100).fillna(0)
        
#         # Handle metro status coverage (avoid division by zero)
#         coverage_metro = classified_by_bucket_metro.div(emp_by_bucket_metro, fill_value=0) * 100
#         coverage_metro = coverage_metro.fillna(0)

#         coverage_data[level] = {
#             'total': coverage_total,
#             'metro': coverage_metro
#         }
    
#     # Build results DataFrame efficiently
#     print(f"    Assembling results...")
#     results = []
    
#     for bucket in all_buckets:
#         # Get unit counts (pure ACS)
#         n_units_total = unit_totals_full.get(bucket, 0)
        
#         # Better access to unstacked data for unit counts
#         if bucket in unit_counts_pivot.index:
#             n_units_rural = unit_counts_pivot.loc[bucket, 'Non-metro/Rural'] if 'Non-metro/Rural' in unit_counts_pivot.columns else 0
#             n_units_urban = unit_counts_pivot.loc[bucket, 'Metropolitan'] if 'Metropolitan' in unit_counts_pivot.columns else 0
#         else:
#             n_units_rural = n_units_urban = 0
        
#         # Get employment totals with proper index checking
#         total_emp_all = emp_by_bucket_total.get(bucket, 0)
        
#         # Better access to employment by metro status
#         if bucket in emp_by_bucket_metro.index:
#             total_emp_rural = emp_by_bucket_metro.loc[bucket, 'Non-metro/Rural'] if 'Non-metro/Rural' in emp_by_bucket_metro.columns else 0
#             total_emp_urban = emp_by_bucket_metro.loc[bucket, 'Metropolitan'] if 'Metropolitan' in emp_by_bucket_metro.columns else 0
#         else:
#             total_emp_rural = total_emp_urban = 0
        
#         # Calculate employment percentages
#         total_emp_pct = (total_emp_all / total_us_employment * 100) if total_us_employment > 0 else 0
#         rural_emp_pct = (total_emp_rural / total_us_employment * 100) if total_us_employment > 0 else 0
#         urban_emp_pct = (total_emp_urban / total_us_employment * 100) if total_us_employment > 0 else 0
        
#         # Build row
#         row = {
#             'pop_label': bucket,
#             f'total_{unit_label}': int(n_units_total),
#             f'rural_{unit_label}': int(n_units_rural),
#             f'urban_{unit_label}': int(n_units_urban),
#             'total_employment_pct': round(total_emp_pct, 2),
#             'rural_employment_pct': round(rural_emp_pct, 2),
#             'urban_employment_pct': round(urban_emp_pct, 2),
#             'total_employment': int(total_emp_all),
#             'rural_employment': int(total_emp_rural),
#             'urban_employment': int(total_emp_urban)
#         }
        
#         # Add SOC coverage data for ALL levels
#         for level in soc_levels:
#             if level not in coverage_data:
#                 # If level doesn't exist, add zeros
#                 row[f'total_{level}_coverage_pct'] = 0.0
#                 row[f'rural_{level}_coverage_pct'] = 0.0
#                 row[f'urban_{level}_coverage_pct'] = 0.0
#                 continue
                
#             coverage_total = coverage_data[level]['total'].get(bucket, 0)
            
#             if bucket in coverage_data[level]['metro'].index:
#                 coverage_rural = coverage_data[level]['metro'].loc[bucket, 'Non-metro/Rural'] if 'Non-metro/Rural' in coverage_data[level]['metro'].columns else 0
#                 coverage_urban = coverage_data[level]['metro'].loc[bucket, 'Metropolitan'] if 'Metropolitan' in coverage_data[level]['metro'].columns else 0
#             else:
#                 coverage_rural = coverage_urban = 0
            
#             row[f'total_{level}_coverage_pct'] = round(float(coverage_total), 2)
#             row[f'rural_{level}_coverage_pct'] = round(float(coverage_rural), 2)
#             row[f'urban_{level}_coverage_pct'] = round(float(coverage_urban), 2)
        
#         results.append(row)
    
#     coverage_df = pd.DataFrame(results)
#     coverage_df['pop_label'] = pd.Categorical(coverage_df['pop_label'], categories=all_buckets, ordered=True)
    
#     print(f"    Optimized SOC coverage calculation complete for {unit_type}")
#     return coverage_df.sort_values('pop_label').reset_index(drop=True)

In [57]:
def calculate_soc_coverage_vectorized(employed_with_pop, unit_pop, unit_type, total_us_employment, full_county_universe=None):
    """
    Calculate SOC coverage using vectorized operations for efficiency.
    Updated to support full county universe for accurate county counts.
    """
    print(f"  Starting optimized SOC coverage calculation for {unit_type}...")
    
    all_buckets = ['0-25K', '25K-50K', '50K-75K', '75K-100K', '100K-1M', '1M+']
    unit_label = 'pumas' if unit_type == 'PUMA' else 'counties'
    soc_levels = ['soc_2digit', 'soc_3digit', 'soc_4digit', 'soc_5digit', 'soc_6digit']
    
    # Determine if using full universe for county counts
    use_full_universe = (unit_type == 'County' and full_county_universe is not None)
    
    if use_full_universe:
        print(f"    Using full county universe for accurate county counts...")
        # Create county_fips if not exists
        if 'county_fips' not in employed_with_pop.columns:
            employed_with_pop['county_fips'] = (
                employed_with_pop['STATEFIP'].astype(int).astype(str).str.zfill(2) + 
                employed_with_pop['COUNTYFIP'].astype(int).astype(str).str.zfill(3)
            )
        
        # Create metro label mapping
        employed_with_pop['metro_label'] = employed_with_pop['metro_status'].map({
            'Non-metro/Rural': 'Rural',
            'Metropolitan': 'Urban'
        })
        
        # Get metro mapping from ACS data
        metro_mapping = employed_with_pop[['county_fips', 'metro_label']].drop_duplicates()
        
        # FIXED: Use full universe for counts, estimate metro status for missing counties
        universe_with_metro = full_county_universe.merge(metro_mapping, on='county_fips', how='left')
        
        counties_without_metro = universe_with_metro['metro_label'].isna().sum()
        print(f"    Warning: {counties_without_metro} counties have no ACS metro status")
        
        # For missing metro status, estimate based on population size (simple heuristic)
        # Counties over 50K are more likely to be urban
        universe_with_metro.loc[
            universe_with_metro['metro_label'].isna() & 
            universe_with_metro['pop_label'].isin(['75K-100K', '100K-1M', '1M+']), 
            'metro_label'
        ] = 'Urban'
        
        universe_with_metro.loc[
            universe_with_metro['metro_label'].isna(), 
            'metro_label'
        ] = 'Rural'
        
        # Now use ALL counties for counts
        unit_counts_full = universe_with_metro.groupby(['pop_label', 'metro_label']).size().reset_index(name='count')
        unit_counts_pivot = unit_counts_full.pivot(index='pop_label', columns='metro_label', values='count').fillna(0)
        unit_counts_pivot.columns = ['Non-metro/Rural', 'Metropolitan']  # Standardize column names
        unit_totals_full = universe_with_metro.groupby('pop_label').size()
        
        print(f"    Using ALL {len(universe_with_metro)} counties for counts (estimated metro status for missing)")
    else:
        print(f"    Using ACS data only for unit counts...")
        # Standardize metro status values
        print(f"    Standardizing metro status values...")
        
        employed_with_pop = employed_with_pop.copy()
        unit_pop = unit_pop.copy()
        
        # Map employed data to match unit_pop naming if needed
        employed_metro_mapping = {
            'Rural': 'Non-metro/Rural',
            'Urban': 'Metropolitan'
        }
        
        # Apply mapping if values need to be standardized
        if 'Rural' in employed_with_pop['metro_status'].values or 'Urban' in employed_with_pop['metro_status'].values:
            employed_with_pop['metro_status'] = employed_with_pop['metro_status'].map(employed_metro_mapping)
        
        print(f"    Employment metro status values: {employed_with_pop['metro_status'].unique()}")
        print(f"    Unit metro status values: {unit_pop['metro_status'].unique()}")
        
        # Pre-create bucket categorical to ensure all buckets are present
        employed_with_pop['pop_label'] = pd.Categorical(employed_with_pop['pop_label'], categories=all_buckets, ordered=True)
        unit_pop['pop_label'] = pd.Categorical(unit_pop['pop_label'], categories=all_buckets, ordered=True)
        
        # Vectorized unit counts by bucket and metro status
        print(f"    Computing unit counts...")
        unit_counts_pivot = unit_pop.groupby(['pop_label', 'metro_status']).size().unstack(fill_value=0)
        unit_totals_full = unit_pop.groupby('pop_label').size()
    
    # Vectorized employment totals by bucket and metro status (always from ACS data)
    print(f"    Computing employment totals...")
    emp_by_bucket_metro = employed_with_pop.groupby(['pop_label', 'metro_status'])['PERWT'].sum().unstack(fill_value=0)
    emp_by_bucket_total = employed_with_pop.groupby('pop_label')['PERWT'].sum()
    
    # Calculate SOC coverage by bucket and metro status
    print(f"    Computing SOC coverage rates...")
    
    # Pre-calculate ALL SOC coverage by bucket and metro status
    coverage_data = {}
    
    for level in soc_levels:
        # Validate SOC level exists
        if level not in employed_with_pop.columns:
            print(f"    Warning: {level} not found in data")
            continue
            
        # Create mask for classified records
        classified_mask = employed_with_pop[level] != 'Unclassified'
        classified_data = employed_with_pop[classified_mask]
        
        # Vectorized coverage calculation by bucket and metro status
        classified_by_bucket_metro = classified_data.groupby(['pop_label', 'metro_status'])['PERWT'].sum().unstack(fill_value=0)
        classified_by_bucket_total = classified_data.groupby('pop_label')['PERWT'].sum()
        
        # Calculate coverage percentages - vectorized division
        coverage_total = (classified_by_bucket_total / emp_by_bucket_total * 100).fillna(0)
        
        # Handle metro status coverage (avoid division by zero)
        coverage_metro = classified_by_bucket_metro.div(emp_by_bucket_metro, fill_value=0) * 100
        coverage_metro = coverage_metro.fillna(0)

        coverage_data[level] = {
            'total': coverage_total,
            'metro': coverage_metro
        }
    
    # Build results DataFrame efficiently
    print(f"    Assembling results...")
    results = []
    
    for bucket in all_buckets:
        # Get unit counts (from full universe if available, otherwise from ACS)
        n_units_total = unit_totals_full.get(bucket, 0)
        
        # Better access to unstacked data for unit counts
        if bucket in unit_counts_pivot.index:
            n_units_rural = unit_counts_pivot.loc[bucket, 'Non-metro/Rural'] if 'Non-metro/Rural' in unit_counts_pivot.columns else 0
            n_units_urban = unit_counts_pivot.loc[bucket, 'Metropolitan'] if 'Metropolitan' in unit_counts_pivot.columns else 0
        else:
            n_units_rural = n_units_urban = 0
        
        # Get employment totals with proper index checking
        total_emp_all = emp_by_bucket_total.get(bucket, 0)
        
        # Better access to employment by metro status
        if bucket in emp_by_bucket_metro.index:
            total_emp_rural = emp_by_bucket_metro.loc[bucket, 'Non-metro/Rural'] if 'Non-metro/Rural' in emp_by_bucket_metro.columns else 0
            total_emp_urban = emp_by_bucket_metro.loc[bucket, 'Metropolitan'] if 'Metropolitan' in emp_by_bucket_metro.columns else 0
        else:
            total_emp_rural = total_emp_urban = 0
        
        # Calculate employment percentages
        total_emp_pct = (total_emp_all / total_us_employment * 100) if total_us_employment > 0 else 0
        rural_emp_pct = (total_emp_rural / total_us_employment * 100) if total_us_employment > 0 else 0
        urban_emp_pct = (total_emp_urban / total_us_employment * 100) if total_us_employment > 0 else 0
        
        # Build row
        row = {
            'pop_label': bucket,
            f'total_{unit_label}': int(n_units_total),
            f'rural_{unit_label}': int(n_units_rural),
            f'urban_{unit_label}': int(n_units_urban),
            'total_employment_pct': round(total_emp_pct, 2),
            'rural_employment_pct': round(rural_emp_pct, 2),
            'urban_employment_pct': round(urban_emp_pct, 2),
            'total_employment': int(total_emp_all),
            'rural_employment': int(total_emp_rural),
            'urban_employment': int(total_emp_urban)
        }
        
        # Add SOC coverage data for ALL levels
        for level in soc_levels:
            if level not in coverage_data:
                # If level doesn't exist, add zeros
                row[f'total_{level}_coverage_pct'] = 0.0
                row[f'rural_{level}_coverage_pct'] = 0.0
                row[f'urban_{level}_coverage_pct'] = 0.0
                continue
                
            coverage_total = coverage_data[level]['total'].get(bucket, 0)
            
            if bucket in coverage_data[level]['metro'].index:
                coverage_rural = coverage_data[level]['metro'].loc[bucket, 'Non-metro/Rural'] if 'Non-metro/Rural' in coverage_data[level]['metro'].columns else 0
                coverage_urban = coverage_data[level]['metro'].loc[bucket, 'Metropolitan'] if 'Metropolitan' in coverage_data[level]['metro'].columns else 0
            else:
                coverage_rural = coverage_urban = 0
            
            row[f'total_{level}_coverage_pct'] = round(float(coverage_total), 2)
            row[f'rural_{level}_coverage_pct'] = round(float(coverage_rural), 2)
            row[f'urban_{level}_coverage_pct'] = round(float(coverage_urban), 2)
        
        results.append(row)
    
    coverage_df = pd.DataFrame(results)
    coverage_df['pop_label'] = pd.Categorical(coverage_df['pop_label'], categories=all_buckets, ordered=True)
    
    universe_note = " with full universe" if use_full_universe else ""
    print(f"    Optimized SOC coverage calculation{universe_note} complete for {unit_type}")
    return coverage_df.sort_values('pop_label').reset_index(drop=True)

In [58]:
# ============================================================================
# EMPLOYMENT SHARE ANALYSIS FUNCTIONS (PART 3)
# ============================================================================

def calculate_employment_shares_pivot(df_employed, naics_level='naics_2digit', geo_type='PUMA'):
    """Calculate employment shares showing what % of each area's employment is in each industry"""
    
    print(f"  Calculating employment shares for {naics_level} ({geo_type})...")
    
    # Validate input
    if naics_level not in df_employed.columns:
        print(f"    Warning: {naics_level} not found in data")
        return pd.DataFrame()
    
    if len(df_employed) == 0:
        print(f"    Warning: No data for {naics_level}")
        return pd.DataFrame()
    
    # Filter out unclassified records
    df_analysis = df_employed[df_employed[naics_level] != 'Unclassified'].copy()
    
    if len(df_analysis) == 0:
        print(f"    Warning: No classified data for {naics_level}")
        return pd.DataFrame()
    
    print(f"      Processing {len(df_analysis):,} records...")
    
    # Calculate employment by industry, metro status, and population bucket
    grouped = (df_analysis.groupby([naics_level, 'metro_status', 'pop_label'])['PERWT']
              .sum()
              .reset_index())
    
    # Calculate total employment by geography (metro_status + pop_label combination)
    geography_totals = (df_analysis.groupby(['metro_status', 'pop_label'])['PERWT']
                       .sum()
                       .reset_index()
                       .rename(columns={'PERWT': 'total_geography_employment'}))
    
    # Merge with totals
    merged = grouped.merge(geography_totals, on=['metro_status', 'pop_label'], how='left')
    
    # Calculate percentage shares (each geography sums to 100%)
    merged['employment_share'] = (
        (merged['PERWT'] / merged['total_geography_employment'] * 100)
        .round(2)
        .fillna(0)
    )
    
    # Create pivot table - industries as rows, population buckets as columns
    try:
        pivot = merged.pivot_table(
            index=[naics_level, 'metro_status'],
            columns='pop_label',
            values='employment_share',
            fill_value=0
        ).reset_index()
    except Exception as e:
        print(f"      Error creating pivot: {e}")
        return pd.DataFrame()
    
    # Ensure all bucket columns exist
    all_buckets = ['0-25K', '25K-50K', '50K-75K', '75K-100K', '100K-1M', '1M+', 'Unknown County']
    for bucket in all_buckets:
        if bucket not in pivot.columns:
            pivot[bucket] = 0.0
    
    # Handle Unknown County for County analysis
    if geo_type == 'County' and 'Unknown County' not in pivot.columns:
        unknown_county_data = df_analysis[df_analysis['pop_label'] == 'Unknown County'] if 'Unknown County' in df_analysis['pop_label'].values else pd.DataFrame()
        if len(unknown_county_data) > 0:
            unknown_grouped = (unknown_county_data.groupby([naics_level, 'metro_status'])['PERWT']
                             .sum()
                             .reset_index())
            unknown_totals = (unknown_county_data.groupby(['metro_status', 'pop_label'])['PERWT']
                            .sum()
                            .reset_index()
                            .rename(columns={'PERWT': 'total_unknown_employment'}))
            unknown_merged = unknown_grouped.merge(unknown_totals, on=['metro_status'], how='left')
            unknown_merged['unknown_share'] = (
                (unknown_merged['PERWT'] / unknown_merged['total_unknown_employment'] * 100)
                .round(2)
                .fillna(0)
            )
            # Add unknown county data to pivot
            pivot = pivot.merge(
                unknown_merged[[naics_level, 'metro_status', 'unknown_share']],
                on=[naics_level, 'metro_status'],
                how='left'
            )
            pivot['Unknown County'] = pivot['unknown_share'].fillna(0)
            pivot = pivot.drop('unknown_share', axis=1)
        else:
            pivot['Unknown County'] = 0.0
    elif geo_type == 'PUMA':
        pivot['Unknown County'] = 0.0
    
    # Order columns correctly
    base_columns = [naics_level, 'metro_status']
    bucket_columns = ['0-25K', '25K-50K', '50K-75K', '75K-100K', '100K-1M', '1M+', 'Unknown County']
    column_order = base_columns + bucket_columns
    existing_columns = [col for col in column_order if col in pivot.columns]
    pivot = pivot[existing_columns]
    
    # Add industry titles using NAICS mapping if available
    print(f"      Adding industry titles...")
    if 'NAICS_MAPPING' in globals() and NAICS_MAPPING:
        def get_industry_title(code):
            code_str = str(code).strip()
            
            # Try direct lookup first
            if code_str in NAICS_MAPPING:
                mapping_value = NAICS_MAPPING[code_str]
                if isinstance(mapping_value, dict):
                    return mapping_value.get('title', f'NAICS {code_str}')
                elif isinstance(mapping_value, str):
                    return mapping_value
            
            # ENHANCED FALLBACK: Try shorter codes
            if len(code_str) > 2:
                for fallback_length in [5, 4, 3, 2]:
                    if len(code_str) > fallback_length:
                        fallback_code = code_str[:fallback_length]
                        if fallback_code in NAICS_MAPPING:
                            mapping_value = NAICS_MAPPING[fallback_code]
                            if isinstance(mapping_value, str):
                                return f"{mapping_value} (detailed)"
            
            return f'NAICS {code_str}'
    else:
        # Fallback to generic titles
        def get_industry_title(code):
            return f'NAICS {code}'
    
    unique_codes = pivot[naics_level].unique()
    title_mapping = {code: get_industry_title(code) for code in unique_codes}
    pivot['industry_title'] = pivot[naics_level].map(title_mapping)
    
    # Final formatting
    pivot.columns.name = None
    pivot = pivot.rename(columns={
        naics_level: 'industry_code',
        'metro_status': 'rural_urban_status'
    })
    
    # Add level and geo identifiers
    pivot['naics_level'] = naics_level
    pivot['geo_level'] = geo_type
    
    # Reorder final columns
    final_columns = (['naics_level', 'industry_code', 'industry_title', 'rural_urban_status'] + 
                    bucket_columns + ['geo_level'])
    existing_final_columns = [col for col in final_columns if col in pivot.columns]
    pivot = pivot[existing_final_columns]
    
    # Sort for consistent output
    pivot['rural_urban_status'] = pd.Categorical(
        pivot['rural_urban_status'],
        categories=['Non-metro/Rural', 'Metropolitan'],
        ordered=True
    )
    pivot = pivot.sort_values(['industry_code', 'rural_urban_status']).reset_index(drop=True)
    
    print(f"      Employment shares complete: {len(pivot)} industry-metro combinations")
    return pivot

def create_employment_share_analysis(puma_employed, county_employed):
    """Create employment share analysis for both PUMA and County data across all NAICS levels"""
    
    print("\nSTEP 6: Creating Employment Share Analysis...")
    print("="*50)
    
    naics_levels = ['naics_2digit', 'naics_3digit', 'naics_4digit']
    all_results = []
    
    # PUMA Employment Shares
    print(f"\n--- PUMA Employment Shares ---")
    for level in naics_levels:
        print(f"  Processing {level}...")
        puma_shares = calculate_employment_shares_pivot(puma_employed, level, 'PUMA')
        if len(puma_shares) > 0:
            all_results.append(puma_shares)
    
    # County Employment Shares
    print(f"\n--- County Employment Shares ---")
    for level in naics_levels:
        print(f"  Processing {level}...")
        county_shares = calculate_employment_shares_pivot(county_employed, level, 'County')
        if len(county_shares) > 0:
            all_results.append(county_shares)
    
    # Combine all results
    if all_results:
        combined_shares = pd.concat(all_results, ignore_index=True)
        print(f"\n✓ Employment Share Analysis Complete: {len(combined_shares)} total records")
        
        # Split back into PUMA and County for return
        puma_employment_shares = combined_shares[combined_shares['geo_level'] == 'PUMA'].copy()
        county_employment_shares = combined_shares[combined_shares['geo_level'] == 'County'].copy()
        
        print(f"  PUMA Employment Shares: {len(puma_employment_shares)} records")
        print(f"  County Employment Shares: {len(county_employment_shares)} records")
        
        # Show sample
        if len(combined_shares) > 0:
            print("\nSample Employment Share data:")
            sample_cols = ['naics_level', 'industry_code', 'industry_title', 'rural_urban_status', '0-25K', '25K-50K', 'geo_level']
            available_cols = [col for col in sample_cols if col in combined_shares.columns]
            print(combined_shares[available_cols].head(3))
    else:
        combined_shares = pd.DataFrame()
        puma_employment_shares = pd.DataFrame()
        county_employment_shares = pd.DataFrame()
        print(f"\n⚠ No Employment Share data generated")
    
    return puma_employment_shares, county_employment_shares
  

In [59]:
# ============================================================================
# SOC EMPLOYMENT SHARE ANALYSIS FUNCTIONS
# ============================================================================

def calculate_soc_employment_shares_pivot(df_employed, soc_level='soc_2digit', geo_type='PUMA'):
    """Calculate SOC employment shares showing what % of each area's employment is in each occupation"""
    
    print(f"  Calculating SOC employment shares for {soc_level} ({geo_type})...")
    
    # Validate input
    if soc_level not in df_employed.columns:
        print(f"    Warning: {soc_level} not found in data")
        return pd.DataFrame()
    
    if len(df_employed) == 0:
        print(f"    Warning: No data for {soc_level}")
        return pd.DataFrame()
    
    # Filter out unclassified records
    df_analysis = df_employed[df_employed[soc_level] != 'Unclassified'].copy()
    
    if len(df_analysis) == 0:
        print(f"    Warning: No classified data for {soc_level}")
        return pd.DataFrame()
    
    print(f"      Processing {len(df_analysis):,} records...")
    
    # Calculate employment by occupation, metro status, and population bucket
    grouped = (df_analysis.groupby([soc_level, 'metro_status', 'pop_label'])['PERWT']
              .sum()
              .reset_index())
    
    # Calculate total employment by geography (metro_status + pop_label combination)
    geography_totals = (df_analysis.groupby(['metro_status', 'pop_label'])['PERWT']
                       .sum()
                       .reset_index()
                       .rename(columns={'PERWT': 'total_geography_employment'}))
    
    # Merge with totals
    merged = grouped.merge(geography_totals, on=['metro_status', 'pop_label'], how='left')
    
    # Calculate percentage shares (each geography sums to 100%)
    merged['employment_share'] = (
        (merged['PERWT'] / merged['total_geography_employment'] * 100)
        .round(2)
        .fillna(0)
    )
    
    # Create pivot table - occupations as rows, population buckets as columns
    try:
        pivot = merged.pivot_table(
            index=[soc_level, 'metro_status'],
            columns='pop_label',
            values='employment_share',
            fill_value=0
        ).reset_index()
    except Exception as e:
        print(f"      Error creating pivot: {e}")
        return pd.DataFrame()
    
    # Ensure all bucket columns exist
    all_buckets = ['0-25K', '25K-50K', '50K-75K', '75K-100K', '100K-1M', '1M+', 'Unknown County']
    for bucket in all_buckets:
        if bucket not in pivot.columns:
            pivot[bucket] = 0.0
    
    # Handle Unknown County for County analysis
    if geo_type == 'County' and 'Unknown County' not in pivot.columns:
        unknown_county_data = df_analysis[df_analysis['pop_label'] == 'Unknown County'] if 'Unknown County' in df_analysis['pop_label'].values else pd.DataFrame()
        if len(unknown_county_data) > 0:
            unknown_grouped = (unknown_county_data.groupby([soc_level, 'metro_status'])['PERWT']
                             .sum()
                             .reset_index())
            unknown_totals = (unknown_county_data.groupby(['metro_status', 'pop_label'])['PERWT']
                            .sum()
                            .reset_index()
                            .rename(columns={'PERWT': 'total_unknown_employment'}))
            unknown_merged = unknown_grouped.merge(unknown_totals, on=['metro_status'], how='left')
            unknown_merged['unknown_share'] = (
                (unknown_merged['PERWT'] / unknown_merged['total_unknown_employment'] * 100)
                .round(2)
                .fillna(0)
            )
            # Add unknown county data to pivot
            pivot = pivot.merge(
                unknown_merged[[soc_level, 'metro_status', 'unknown_share']],
                on=[soc_level, 'metro_status'],
                how='left'
            )
            pivot['Unknown County'] = pivot['unknown_share'].fillna(0)
            pivot = pivot.drop('unknown_share', axis=1)
        else:
            pivot['Unknown County'] = 0.0
    elif geo_type == 'PUMA':
        pivot['Unknown County'] = 0.0
    
    # Order columns correctly
    base_columns = [soc_level, 'metro_status']
    bucket_columns = ['0-25K', '25K-50K', '50K-75K', '75K-100K', '100K-1M', '1M+', 'Unknown County']
    column_order = base_columns + bucket_columns
    existing_columns = [col for col in column_order if col in pivot.columns]
    pivot = pivot[existing_columns]
    
    # Add occupation titles using SOC mapping if available
    print(f"      Adding occupation titles...")
    if 'SOC_MAPPING' in globals() and SOC_MAPPING:
        # Use the loaded SOC mapping
        def get_occupation_title(code):
            code_str = str(code).strip()
            # Look for this code in the SOC mapping
            for acs_code, soc_hierarchy in SOC_MAPPING.items():
                # Check different SOC levels
                for soc_level_check in ['soc2_code', 'soc3_code', 'soc4_code', 'soc5_code', 'soc6_code']:
                    if soc_hierarchy.get(soc_level_check) == code_str:
                        title_key = soc_level_check.replace('_code', '_title')
                        return soc_hierarchy.get(title_key, f'SOC {code_str}')
            
            # Fallback to basic SOC titles
            basic_soc_titles = {
                '11': 'Management Occupations',
                '13': 'Business and Financial Operations',
                '15': 'Computer and Mathematical Occupations',
                '17': 'Architecture and Engineering Occupations',
                '19': 'Life, Physical, and Social Science Occupations',
                '21': 'Community and Social Service Occupations',
                '23': 'Legal Occupations',
                '25': 'Educational Instruction and Library Occupations',
                '27': 'Arts, Design, Entertainment, Sports, and Media Occupations',
                '29': 'Healthcare Practitioners and Technical Occupations',
                '31': 'Healthcare Support Occupations',
                '33': 'Protective Service Occupations',
                '35': 'Food Preparation and Serving Related Occupations',
                '37': 'Building and Grounds Cleaning and Maintenance Occupations',
                '39': 'Personal Care and Service Occupations',
                '41': 'Sales and Related Occupations',
                '43': 'Office and Administrative Support Occupations',
                '45': 'Farming, Fishing, and Forestry Occupations',
                '47': 'Construction and Extraction Occupations',
                '49': 'Installation, Maintenance, and Repair Occupations',
                '51': 'Production Occupations',
                '53': 'Transportation and Material Moving Occupations',
                '55': 'Military Specific Occupations'
            }
            
            # Use 2-digit code for basic titles
            code_2digit = code_str[:2] if len(code_str) >= 2 else code_str
            return basic_soc_titles.get(code_2digit, f'SOC {code_str}')
    else:
        # Fallback to generic titles
        def get_occupation_title(code):
            return f'SOC {code}'
    
    unique_codes = pivot[soc_level].unique()
    title_mapping = {code: get_occupation_title(code) for code in unique_codes}
    pivot['occupation_title'] = pivot[soc_level].map(title_mapping)
    
    # Final formatting
    pivot.columns.name = None
    pivot = pivot.rename(columns={
        soc_level: 'occupation_code',
        'metro_status': 'rural_urban_status'
    })
    
    # Add level and geo identifiers
    pivot['soc_level'] = soc_level
    pivot['geo_level'] = geo_type
    
    # Reorder final columns
    final_columns = (['soc_level', 'occupation_code', 'occupation_title', 'rural_urban_status'] + 
                    bucket_columns + ['geo_level'])
    existing_final_columns = [col for col in final_columns if col in pivot.columns]
    pivot = pivot[existing_final_columns]
    
    # Sort for consistent output
    pivot['rural_urban_status'] = pd.Categorical(
        pivot['rural_urban_status'],
        categories=['Non-metro/Rural', 'Metropolitan'],
        ordered=True
    )
    pivot = pivot.sort_values(['occupation_code', 'rural_urban_status']).reset_index(drop=True)
    
    print(f"      SOC employment shares complete: {len(pivot)} occupation-metro combinations")
    return pivot
    
def create_soc_employment_share_analysis(puma_employed, county_employed):
    """Create SOC employment share analysis for both PUMA and County data across all SOC levels"""
    
    print("\nCreating SOC Employment Share Analysis...")
    print("="*50)
    
    soc_levels = ['soc_2digit', 'soc_3digit', 'soc_4digit']
    all_results = []
    
    # PUMA SOC Employment Shares
    print(f"\n--- PUMA SOC Employment Shares ---")
    for level in soc_levels:
        print(f"  Processing {level}...")
        puma_shares = calculate_soc_employment_shares_pivot(puma_employed, level, 'PUMA')
        if len(puma_shares) > 0:
            all_results.append(puma_shares)
    
    # County SOC Employment Shares
    print(f"\n--- County SOC Employment Shares ---")
    for level in soc_levels:
        print(f"  Processing {level}...")
        county_shares = calculate_soc_employment_shares_pivot(county_employed, level, 'County')
        if len(county_shares) > 0:
            all_results.append(county_shares)
    
    # Combine all results
    if all_results:
        combined_shares = pd.concat(all_results, ignore_index=True)
        print(f"\n✓ SOC Employment Share Analysis Complete: {len(combined_shares)} total records")
        
        # Split back into PUMA and County for return
        puma_soc_employment_shares = combined_shares[combined_shares['geo_level'] == 'PUMA'].copy()
        county_soc_employment_shares = combined_shares[combined_shares['geo_level'] == 'County'].copy()
        
        print(f"  PUMA SOC Employment Shares: {len(puma_soc_employment_shares)} records")
        print(f"  County SOC Employment Shares: {len(county_soc_employment_shares)} records")
        
        # Show sample
        if len(combined_shares) > 0:
            print("\nSample SOC Employment Share data:")
            sample_cols = ['soc_level', 'occupation_code', 'occupation_title', 'rural_urban_status', '0-25K', '25K-50K', 'geo_level']
            available_cols = [col for col in sample_cols if col in combined_shares.columns]
            print(combined_shares[available_cols].head(3))
    else:
        combined_shares = pd.DataFrame()
        puma_soc_employment_shares = pd.DataFrame()
        county_soc_employment_shares = pd.DataFrame()
        print(f"\n⚠ No SOC Employment Share data generated")
    
    return puma_soc_employment_shares, county_soc_employment_shares

In [60]:
def create_missing_counties_analysis(county_employed, full_county_universe):
    """
    Create detailed analysis of missing counties between Snowflake and ACS datasets
    """
    print("\nCreating missing counties analysis...")
    
    # Get county sets
    counties_with_acs = set(county_employed['county_fips'].unique())
    counties_in_snowflake = set(full_county_universe['county_fips'].unique())
    
    # Find missing counties (in Snowflake but not in ACS)
    missing_from_acs = counties_in_snowflake - counties_with_acs
    
    # Find extra counties (in ACS but not in Snowflake) 
    missing_from_snowflake = counties_with_acs - counties_in_snowflake
    
    # Create missing counties dataframe
    missing_counties_data = []
    
    for fips in sorted(missing_from_acs):
        county_info = full_county_universe[full_county_universe['county_fips'] == fips].iloc[0]
        missing_counties_data.append({
            'county_fips': fips,
            'county_name': county_info['county_name'],
            'pop_label': county_info['pop_label'],
            'status': 'Missing from ACS',
            'reason': 'No employment data available'
        })
    
    for fips in sorted(missing_from_snowflake):
        # Get county info from ACS data
        county_info = county_employed[county_employed['county_fips'] == fips].iloc[0]
        missing_counties_data.append({
            'county_fips': fips,
            'county_name': f"Unknown county {fips}",  # We don't have names in ACS
            'pop_label': county_info['pop_label'],
            'status': 'Missing from Snowflake',
            'reason': 'Not in population dataset'
        })
    
    missing_counties_df = pd.DataFrame(missing_counties_data)
    
    # Create coverage summary by population bucket
    coverage_summary = []
    
    for bucket in ['0-25K', '25K-50K', '50K-75K', '75K-100K', '100K-1M', '1M+']:
        # Counties in this bucket from Snowflake
        snowflake_bucket = full_county_universe[full_county_universe['pop_label'] == bucket]
        total_counties = len(snowflake_bucket)
        
        # How many have ACS data
        bucket_fips = set(snowflake_bucket['county_fips'])
        covered_counties = len(bucket_fips.intersection(counties_with_acs))
        missing_counties = len(bucket_fips - counties_with_acs)
        
        # Calculate employment records for covered counties
        if covered_counties > 0:
            covered_fips = bucket_fips.intersection(counties_with_acs)
            employment_records = len(county_employed[county_employed['county_fips'].isin(covered_fips)])
        else:
            employment_records = 0
        
        coverage_summary.append({
            'population_bucket': bucket,
            'total_counties_snowflake': total_counties,
            'counties_with_acs_data': covered_counties,
            'counties_missing_from_acs': missing_counties,
            'coverage_percentage': round(covered_counties / total_counties * 100, 0) if total_counties > 0 else 0,
            'employment_records': employment_records
        })
    
    coverage_summary_df = pd.DataFrame(coverage_summary)
    
    print(f"✓ Missing counties analysis complete:")
    print(f"  Counties missing from ACS: {len(missing_from_acs)}")
    print(f"  Counties missing from Snowflake: {len(missing_from_snowflake)}")
    print(f"  Counties in both datasets: {len(counties_with_acs.intersection(counties_in_snowflake))}")
    
    return missing_counties_df, coverage_summary_df

In [61]:
# ============================================================================
# FINAL SEQUENTIAL ANALYSIS STEPS - CLEANED
# ============================================================================

def step_1_load_data():
    print("STEP 1: Loading data...")
    df = load_and_prepare_data("usa_00052.csv.gz")
    print(f"✓ Data loaded: {len(df):,} records, {df.shape[1]} columns")
    print(f"✓ Memory usage: {df.memory_usage(deep=True).sum() / 1024**2:.1f} MB")

    key_cols = ['EMPSTAT', 'INDNAICS', 'OCCSOC', 'PUMA', 'COUNTYICP', 'MET2013', 'PERWT']
    for col in key_cols:
        print(f"  {'✓' if col in df.columns else '✗'} {col}")

    print("\nSample of raw data:")
    print(df[['EMPSTAT', 'INDNAICS', 'OCCSOC', 'PERWT']].head(3))
    return df

def step_2_add_classifications(df):
    print("\nSTEP 2: Adding NAICS and SOC classifications...")
    df = add_classifications(df)
    print("✓ Classifications added")
    return df

def step_3_prepare_employment_data(df, full_county_universe=None):
    # Filter employed individuals with valid NAICS + SOC
    puma_employed = filter_employment_data(df, 'PUMA')
    county_employed = filter_employment_data(df, 'County')

    # Add geo features, population buckets, metro status
    puma_employed = add_geographic_features(puma_employed, 'PUMA', full_dataset=df)
    county_employed = add_geographic_features(county_employed, 'County', full_dataset=df)

    # Apply Snowflake population labels to county ACS data
    if full_county_universe is not None:
        print("Applying Snowflake population labels to ACS county data...")
        
        # Ensure county_fips exists
        if 'county_fips' not in county_employed.columns:
            county_employed['county_fips'] = (
                county_employed['STATEFIP'].astype(int).astype(str).str.zfill(2) + 
                county_employed['COUNTYFIP'].astype(int).astype(str).str.zfill(3)
            )
        
        # Replace ACS pop_label with Snowflake pop_label
        pop_mapping = full_county_universe.set_index('county_fips')['pop_label'].to_dict()
        county_employed['pop_label_original'] = county_employed['pop_label']  # Keep original for reference
        county_employed['pop_label'] = county_employed['county_fips'].map(pop_mapping)
        
        # Handle any unmapped counties (keep original if no Snowflake data)
        county_employed['pop_label'] = county_employed['pop_label'].fillna(county_employed['pop_label_original'])
        
        print(f"✓ Population labels updated for {len(county_employed)} county records")
        
        # Show the impact
        remapped_count = (county_employed['pop_label'] != county_employed['pop_label_original']).sum()
        print(f"✓ {remapped_count} records had population labels updated")

    print("✓ Employment data prepared with population + metro features.")
    return puma_employed, county_employed


def step_4_create_naics_coverage_analysis(puma_employed, county_employed, full_county_universe=None):
    return create_coverage_analysis(puma_employed, county_employed, full_county_universe)

def step_5_create_soc_coverage_analysis(puma_employed, county_employed, full_county_universe=None):
    total_us_employment_puma = puma_employed['PERWT'].sum()
    total_us_employment_county = county_employed['PERWT'].sum()

    puma_unit_pop = create_unit_population_summary(puma_employed, 'PUMA')
    county_unit_pop = create_unit_population_summary(county_employed, 'County')

    puma_soc_coverage = calculate_soc_coverage_vectorized(
        puma_employed, puma_unit_pop, 'PUMA', total_us_employment_puma
    )
    county_soc_coverage = calculate_soc_coverage_vectorized(
        county_employed, county_unit_pop, 'County', total_us_employment_county,
        full_county_universe=full_county_universe  # ← Add this
    )
    return puma_soc_coverage, county_soc_coverage

def step_6_create_employment_share_analysis(puma_employed, county_employed):
    return create_employment_share_analysis(puma_employed, county_employed)

def step_6b_create_soc_employment_share_analysis(puma_employed, county_employed):
    return create_soc_employment_share_analysis(puma_employed, county_employed)

def step_8_export_to_excel_extended(
    puma_coverage, county_coverage,
    puma_soc_coverage, county_soc_coverage,
    puma_employment_shares, county_employment_shares,
    soc_employment_shares_puma, soc_employment_shares_county,
    naics_presence_puma, naics_presence_county,
    soc_presence_puma, soc_presence_county,
    missing_counties_df, coverage_summary_df,  # Add these parameters
    naics_output_path="naics_employment_analysis.xlsx",
    soc_output_path="soc_employment_analysis.xlsx",
    missing_counties_path="missing_counties_analysis.xlsx"  # Add this parameter
):
    print("\nSTEP 8: Exporting NAICS results to Excel...")
    with pd.ExcelWriter(naics_output_path, engine='xlsxwriter') as writer:
        puma_coverage.to_excel(writer, sheet_name="NAICS PUMA Coverage", index=False)
        county_coverage.to_excel(writer, sheet_name="NAICS County Coverage", index=False)
        puma_employment_shares.to_excel(writer, sheet_name="NAICS PUMA Shares", index=False)
        county_employment_shares.to_excel(writer, sheet_name="NAICS County Shares", index=False)
        naics_presence_puma.to_excel(writer, sheet_name="NAICS PUMA Presence", index=False)
        naics_presence_county.to_excel(writer, sheet_name="NAICS County Presence", index=False)
    print(f"✓ NAICS Excel exported: {naics_output_path}")

    print("\nSTEP 8: Exporting SOC results to Excel...")
    with pd.ExcelWriter(soc_output_path, engine='xlsxwriter') as writer:
        puma_soc_coverage.to_excel(writer, sheet_name="SOC PUMA Coverage", index=False)
        county_soc_coverage.to_excel(writer, sheet_name="SOC County Coverage", index=False)
        soc_employment_shares_puma.to_excel(writer, sheet_name="SOC PUMA Shares", index=False)
        soc_employment_shares_county.to_excel(writer, sheet_name="SOC County Shares", index=False)
        soc_presence_puma.to_excel(writer, sheet_name="SOC PUMA Presence", index=False)
        soc_presence_county.to_excel(writer, sheet_name="SOC County Presence", index=False)
    print(f"✓ SOC Excel exported: {soc_output_path}")
    
    print(f"\nSTEP 8: Exporting missing counties analysis to Excel...")
    with pd.ExcelWriter(missing_counties_path, engine='xlsxwriter') as writer:
        # Missing counties details
        missing_counties_df.to_excel(writer, sheet_name="Missing Counties Detail", index=False)
        
        # Coverage summary by population bucket
        coverage_summary_df.to_excel(writer, sheet_name="Coverage by Pop Bucket", index=False)
        
        # Get workbook and add formatting
        workbook = writer.book
        
        # Format coverage summary sheet
        worksheet = writer.sheets['Coverage by Pop Bucket']
        
        # Add percentage format (divide by 100 since we're storing as whole numbers)
        percent_format = workbook.add_format({'num_format': '0"%"'})
        worksheet.set_column('E:E', 15, percent_format)  # coverage_percentage column
        
        # Add number format with commas
        number_format = workbook.add_format({'num_format': '#,##0'})
        worksheet.set_column('B:D', 15, number_format)  # count columns
        worksheet.set_column('F:F', 15, number_format)  # employment_records column
    
    print(f"✓ Missing counties analysis exported: {missing_counties_path}")



def step_9_final_summary(analysis_results):
    print_final_summary(analysis_results)

def step_10_geographic_analysis(puma_employed, county_employed, full_county_universe=None):
    """STEP 10: Geographic presence analysis"""
    print("\nSTEP 10: Geographic Presence Analysis")
    print("=" * 60)
    
    # PUMA analysis (no full universe available)
    naics_presence_puma = create_naics_representation_matrix(puma_employed, unit='PUMA')
    soc_presence_puma = create_soc_representation_matrix(puma_employed, unit='PUMA')
    
    # County analysis with optional full universe
    naics_presence_county = create_naics_representation_matrix(
        county_employed, unit='County', full_county_universe=full_county_universe
    )
    soc_presence_county = create_soc_representation_matrix(
        county_employed, unit='County', full_county_universe=full_county_universe
    )
    
    return naics_presence_puma, naics_presence_county, soc_presence_puma, soc_presence_county




In [62]:
# ============================================================================
# POST-PIPELINE VERIFICATION CHECKS (Remove when satisfied)
# ============================================================================

def verify_pipeline_fixes(county_employed, full_county_universe):
    """
    Comprehensive verification that all fixes are working correctly
    """
    print("\n" + "=" * 80)
    print("POST-PIPELINE VERIFICATION CHECKS")
    print("=" * 80)
    
    # 1. Verify Denton County population classification
    print("\n1. DENTON COUNTY POPULATION CLASSIFICATION:")
    print("-" * 50)
    
    denton_county = county_employed[county_employed['county_fips'] == '48121']
    if len(denton_county) > 0:
        current_pop_label = denton_county['pop_label'].iloc[0]
        if 'pop_label_original' in county_employed.columns:
            original_pop_label = denton_county['pop_label_original'].iloc[0]
            print(f"   Records found: {len(denton_county):,}")
            print(f"   Original ACS classification: {original_pop_label}")
            print(f"   Current classification: {current_pop_label}")
            
            if current_pop_label == '1M+' and original_pop_label != '1M+':
                print("   ✓ SUCCESS: Denton County reclassified from ACS to Snowflake population")
            elif current_pop_label == '1M+':
                print("   ✓ SUCCESS: Denton County correctly classified as 1M+")
            else:
                print(f"   ✗ ISSUE: Denton County not classified as 1M+ (currently: {current_pop_label})")
        else:
            print(f"   Records found: {len(denton_county):,}")
            print(f"   Current classification: {current_pop_label}")
            if current_pop_label == '1M+':
                print("   ✓ SUCCESS: Denton County classified as 1M+")
            else:
                print(f"   ✗ ISSUE: Denton County not classified as 1M+ (currently: {current_pop_label})")
    else:
        print("   ✗ ISSUE: Denton County (48121) not found in data")
    
    # 2. Overall 1M+ county coverage check
    print("\n2. 1M+ COUNTY COVERAGE ANALYSIS:")
    print("-" * 50)
    
    # Counties that should be 1M+ (from Snowflake)
    counties_1m_snowflake = set(full_county_universe[full_county_universe['pop_label'] == '1M+']['county_fips'])
    
    # Counties with ACS data
    counties_with_acs = set(county_employed['county_fips'].unique())
    
    # Counties classified as 1M+ in current ACS data
    counties_1m_acs = set(county_employed[county_employed['pop_label'] == '1M+']['county_fips'].unique())
    
    total_1m_snowflake = len(counties_1m_snowflake)
    covered_1m = len(counties_1m_snowflake.intersection(counties_with_acs))
    classified_1m = len(counties_1m_snowflake.intersection(counties_1m_acs))
    
    print(f"   Total 1M+ counties (Snowflake): {total_1m_snowflake}")
    print(f"   1M+ counties with ACS data: {covered_1m}")
    print(f"   1M+ counties classified as 1M+ in ACS: {classified_1m}")
    print(f"   Coverage rate: {covered_1m/total_1m_snowflake*100:.0f}%")
    print(f"   Classification accuracy: {classified_1m/covered_1m*100:.0f}%" if covered_1m > 0 else "   Classification accuracy: N/A")
    
    # Show missing counties
    missing_counties = counties_1m_snowflake - counties_with_acs
    if missing_counties:
        print(f"\n   Missing 1M+ counties ({len(missing_counties)}):")
        for fips in sorted(missing_counties):
            county_name = full_county_universe[full_county_universe['county_fips'] == fips]['county_name'].iloc[0]
            print(f"     {fips}: {county_name}")
    
    # Show misclassified counties
    misclassified = counties_with_acs.intersection(counties_1m_snowflake) - counties_1m_acs
    if misclassified:
        print(f"\n   1M+ counties not classified as 1M+ ({len(misclassified)}):")
        for fips in sorted(misclassified):
            county_name = full_county_universe[full_county_universe['county_fips'] == fips]['county_name'].iloc[0]
            current_label = county_employed[county_employed['county_fips'] == fips]['pop_label'].iloc[0]
            print(f"     {fips}: {county_name} (classified as: {current_label})")
    
    # 3. Population bucket distribution comparison
    print("\n3. POPULATION BUCKET DISTRIBUTION:")
    print("-" * 50)
    
    acs_dist = county_employed.groupby('pop_label').size().sort_index()
    acs_coverage = county_employed.groupby('pop_label')['county_fips'].nunique().sort_index()
    
    print(f"   {'Bucket':<12} {'ACS Records':<12} {'Unique Counties':<15}")
    print(f"   {'-'*12} {'-'*12} {'-'*15}")
    for bucket in ['0-25K', '25K-50K', '50K-75K', '75K-100K', '100K-1M', '1M+']:
        records = acs_dist.get(bucket, 0)
        counties = acs_coverage.get(bucket, 0)
        print(f"   {bucket:<12} {records:<12,} {counties:<15}")
    
    print("\n" + "=" * 80)
    print("VERIFICATION COMPLETE")
    print("=" * 80)

In [63]:
# ============================================================================
# EXECUTION PIPELINE
# ============================================================================

# STEP 1: Load raw data
df = step_1_load_data()

# STEP 2: Add NAICS and SOC classifications
df = step_2_add_classifications(df)

# STEP 2.5: Load full county universe from Snowflake
full_county_universe = create_full_county_universe(county_population_df)

# STEP 3: Prepare employment data (includes population mapping)
puma_employed, county_employed = step_3_prepare_employment_data(df, full_county_universe)

# # STEP 3B: Add geographic features (including metro + pop labels)
# puma_employed = add_geographic_features(puma_employed, geo_type='PUMA', full_dataset=df)
# county_employed = add_geographic_features(county_employed, geo_type='County', full_dataset=df)

# STEP 4: NAICS coverage analysis
puma_coverage, county_coverage = step_4_create_naics_coverage_analysis(
    puma_employed, county_employed, full_county_universe=full_county_universe
)

# STEP 5: SOC coverage analysis
puma_soc_coverage, county_soc_coverage = step_5_create_soc_coverage_analysis(
    puma_employed, county_employed, full_county_universe=full_county_universe
)

# STEP 6: NAICS employment shares
puma_employment_shares, county_employment_shares = step_6_create_employment_share_analysis(puma_employed, county_employed)

# STEP 6B: SOC employment shares
puma_soc_employment_shares, county_soc_employment_shares = step_6b_create_soc_employment_share_analysis(puma_employed, county_employed)

# Step 10: Add combined label for geographic representation analysis
naics_rep_puma, naics_rep_county, soc_rep_puma, soc_rep_county = step_10_geographic_analysis(
    puma_employed, county_employed, full_county_universe=full_county_universe
)

# STEP 7.5: Create missing counties analysis
missing_counties_df, coverage_summary_df = create_missing_counties_analysis(county_employed, full_county_universe)

# STEP 8: Export to Excel (including missing counties analysis)
step_8_export_to_excel_extended(
    puma_coverage, county_coverage,
    puma_soc_coverage, county_soc_coverage,
    puma_employment_shares, county_employment_shares,
    puma_soc_employment_shares, county_soc_employment_shares,
    naics_rep_puma, naics_rep_county,
    soc_rep_puma, soc_rep_county,
    missing_counties_df, coverage_summary_df,  # Add these
    naics_output_path="acs_ipums_naics_file.xlsx",
    soc_output_path="acs_ipums_soc_file.xlsx",
    missing_counties_path="missing_counties_analysis.xlsx"  # Add this
)

# STEP 9: Final summary (console only)
analysis_results = {
    'NAICS_Coverage': (puma_coverage, county_coverage),
    'SOC_Coverage': (puma_soc_coverage, county_soc_coverage),
    'NAICS_Employment_Shares': (puma_employment_shares, county_employment_shares),
    'SOC_Employment_Shares': (puma_soc_employment_shares, county_soc_employment_shares)
}
step_9_final_summary(analysis_results)

# ============================================================================
# VERIFICATION CHECKS (Remove when satisfied with results)
# ============================================================================
verify_pipeline_fixes(county_employed, full_county_universe)


STEP 1: Loading data...
Loading data from usa_00052.csv.gz...
Loaded 15,912,393 records
✓ Data loaded: 15,912,393 records, 24 columns
✓ Memory usage: 4503.7 MB
  ✓ EMPSTAT
  ✓ INDNAICS
  ✓ OCCSOC
  ✓ PUMA
  ✓ COUNTYICP
  ✓ MET2013
  ✓ PERWT

Sample of raw data:
   EMPSTAT INDNAICS  OCCSOC  PERWT
0        3        0       0    2.0
1        3        0       0   14.0
2        1     8131  434051    4.0

STEP 2: Adding NAICS and SOC classifications...
  Processing NAICS with consolidation...
  NAICS consolidation applied:
    32→31: 229,248 records
    33→31: 517,774 records
    3M→31: 28,117 records
    45→44: 523,956 records
    49→48: 162,164 records
  Processing SOC...
    Processing 9,396,234 SOC records with mapping...
  ✓ NAICS classifications created with consolidation
  ✓ SOC classifications created
✓ Classifications added
Creating full county universe from Snowflake data...
✓ Full county universe created: 3144 counties
Population bucket distribution (all US counties):
  0-25K: 152

  unit_counts_pivot = unit_pop.groupby(['pop_label', 'metro_status']).size().unstack(fill_value=0)
  unit_totals_full = unit_pop.groupby('pop_label').size()
  emp_by_bucket_metro = employed_with_pop.groupby(['pop_label', 'metro_status'])['PERWT'].sum().unstack(fill_value=0)
  emp_by_bucket_total = employed_with_pop.groupby('pop_label')['PERWT'].sum()


    Computing NAICS coverage rates...


  classified_by_bucket_metro = classified_data.groupby(['pop_label', 'metro_status'])['PERWT'].sum().unstack(fill_value=0)
  classified_by_bucket_total = classified_data.groupby('pop_label')['PERWT'].sum()
  classified_by_bucket_metro = classified_data.groupby(['pop_label', 'metro_status'])['PERWT'].sum().unstack(fill_value=0)
  classified_by_bucket_total = classified_data.groupby('pop_label')['PERWT'].sum()
  classified_by_bucket_metro = classified_data.groupby(['pop_label', 'metro_status'])['PERWT'].sum().unstack(fill_value=0)
  classified_by_bucket_total = classified_data.groupby('pop_label')['PERWT'].sum()


    Assembling results...
    Optimized NAICS coverage calculation complete for PUMA

--- County NAICS Coverage ---
Using full county universe for accurate coverage percentages...
  Starting NAICS coverage calculation with full universe for County...
    Counties with metro status: 445 / 3144
    NAICS coverage calculation with full universe complete for County
✓ Added Unknown County row: 49,037,516 employment

✓ PUMA Coverage Analysis Complete: 6 population buckets
Sample PUMA coverage data:
  pop_label  total_pumas  total_employment_pct  \
0     0-25K            0                   0.0   
1   25K-50K            0                   0.0   
2   50K-75K            0                   0.0   

   total_naics_2digit_coverage_pct  
0                              0.0  
1                              0.0  
2                              0.0  

✓ County Coverage Analysis Complete: 7 population buckets
Sample County coverage data:
  pop_label  total_counties  total_employment_pct  \
0     0-25K 

  unit_counts_pivot = unit_pop.groupby(['pop_label', 'metro_status']).size().unstack(fill_value=0)
  unit_totals_full = unit_pop.groupby('pop_label').size()
  emp_by_bucket_metro = employed_with_pop.groupby(['pop_label', 'metro_status'])['PERWT'].sum().unstack(fill_value=0)
  emp_by_bucket_total = employed_with_pop.groupby('pop_label')['PERWT'].sum()


    Computing SOC coverage rates...


  classified_by_bucket_metro = classified_data.groupby(['pop_label', 'metro_status'])['PERWT'].sum().unstack(fill_value=0)
  classified_by_bucket_total = classified_data.groupby('pop_label')['PERWT'].sum()
  classified_by_bucket_metro = classified_data.groupby(['pop_label', 'metro_status'])['PERWT'].sum().unstack(fill_value=0)
  classified_by_bucket_total = classified_data.groupby('pop_label')['PERWT'].sum()
  classified_by_bucket_metro = classified_data.groupby(['pop_label', 'metro_status'])['PERWT'].sum().unstack(fill_value=0)
  classified_by_bucket_total = classified_data.groupby('pop_label')['PERWT'].sum()
  classified_by_bucket_metro = classified_data.groupby(['pop_label', 'metro_status'])['PERWT'].sum().unstack(fill_value=0)
  classified_by_bucket_total = classified_data.groupby('pop_label')['PERWT'].sum()
  classified_by_bucket_metro = classified_data.groupby(['pop_label', 'metro_status'])['PERWT'].sum().unstack(fill_value=0)
  classified_by_bucket_total = classified_data.groupb

    Assembling results...
    Optimized SOC coverage calculation complete for PUMA
  Starting optimized SOC coverage calculation for County...
    Using full county universe for accurate county counts...
    Using ALL 3145 counties for counts (estimated metro status for missing)
    Computing employment totals...
    Computing SOC coverage rates...
    Assembling results...
    Optimized SOC coverage calculation with full universe complete for County

STEP 6: Creating Employment Share Analysis...

--- PUMA Employment Shares ---
  Processing naics_2digit...
  Calculating employment shares for naics_2digit (PUMA)...
      Processing 7,366,658 records...
      Adding industry titles...
      Employment shares complete: 42 industry-metro combinations
  Processing naics_3digit...
  Calculating employment shares for naics_3digit (PUMA)...
      Processing 6,883,560 records...
      Adding industry titles...
      Employment shares complete: 184 industry-metro combinations
  Processing naics_

In [64]:
# Check how many unique county_fips have value '00000' or similar
unknown_counties = county_employed[county_employed['COUNTYFIP'] == 0]
print(f"Records with COUNTYFIP = 0: {len(unknown_counties)}")
print(f"Unique county_fips with COUNTYFIP = 0: {unknown_counties['county_fips'].nunique()}")

# Check what's in pop_label = 'Unknown County'
unknown_pop = county_employed[county_employed['pop_label'] == 'Unknown County']
print(f"Records with pop_label = 'Unknown County': {len(unknown_pop)}")
print(f"Unique county_fips in Unknown County: {unknown_pop['county_fips'].nunique()}")
print(f"Sample county_fips values: {unknown_pop['county_fips'].unique()[:10]}")

Records with COUNTYFIP = 0: 2591104
Unique county_fips with COUNTYFIP = 0: 49
Records with pop_label = 'Unknown County': 2591104
Unique county_fips in Unknown County: 49
Sample county_fips values: ['01000' '02000' '04000' '05000' '06000' '08000' '09000' '12000' '13000'
 '15000']


In [65]:
# Debug coverage denominators to understand what's being used

def debug_coverage_denominators(county_employed, full_county_universe):
    """
    Debug what denominators are actually being used in coverage calculations
    """
    print("=" * 80)
    print("DEBUGGING COVERAGE DENOMINATORS")
    print("=" * 80)
    
    # 1. Basic counts
    print("\n1. BASIC DATASET COUNTS:")
    print(f"   Total counties in Snowflake: {len(full_county_universe):,}")
    print(f"   Counties with ACS employment data: {len(county_employed['county_fips'].unique()):,}")
    
    # 2. Population bucket analysis
    print("\n2. POPULATION BUCKET BREAKDOWN:")
    
    # Snowflake population distribution
    snowflake_pop_dist = full_county_universe['pop_label'].value_counts().sort_index()
    print(f"   Snowflake population distribution:")
    for bucket, count in snowflake_pop_dist.items():
        print(f"     {bucket}: {count:,} counties")
    
    # ACS population distribution (after Snowflake mapping)
    acs_pop_dist = county_employed.groupby('pop_label')['county_fips'].nunique().sort_index()
    print(f"\n   ACS population distribution (after Snowflake mapping):")
    for bucket, count in acs_pop_dist.items():
        print(f"     {bucket}: {count:,} counties")
    
    # 3. Metro status analysis
    print("\n3. METRO STATUS BREAKDOWN:")
    
    # Create metro mapping like representation matrices do
    county_employed['metro_label'] = county_employed['metro_status'].map({
        'Non-metro/Rural': 'Rural',
        'Metropolitan': 'Urban'
    })
    
    metro_mapping = county_employed[['county_fips', 'metro_label']].drop_duplicates()
    print(f"   Counties with metro status mapping: {len(metro_mapping):,}")
    
    # Merge Snowflake with metro mapping (like representation matrices)
    universe_with_metro = full_county_universe.merge(metro_mapping, on='county_fips', how='left')
    counties_with_metro = universe_with_metro.dropna(subset=['metro_label'])
    
    print(f"   Snowflake counties that can be assigned metro status: {len(counties_with_metro):,}")
    print(f"   Snowflake counties without metro status: {len(universe_with_metro) - len(counties_with_metro):,}")
    
    # 4. Pop + Metro combinations
    print("\n4. POPULATION + METRO COMBINATIONS:")
    
    # What representation matrices actually use as denominators
    total_counts = counties_with_metro.groupby(['pop_label', 'metro_label']).size().reset_index(name='total_units')
    print(f"   Denominator combinations (what rep matrices use):")
    for _, row in total_counts.iterrows():
        print(f"     {row['pop_label']} {row['metro_label']}: {row['total_units']:,} counties")
    
    # What pure ACS would give us
    acs_counts = county_employed.groupby(['pop_label', 'metro_label'])['county_fips'].nunique().reset_index(name='acs_units')
    print(f"\n   Pure ACS combinations:")
    for _, row in acs_counts.iterrows():
        print(f"     {row['pop_label']} {row['metro_label']}: {row['acs_units']:,} counties")
    
    # 5. Coverage comparison
    print("\n5. COVERAGE COMPARISON:")
    
    # Compare the denominators
    comparison = total_counts.merge(acs_counts, on=['pop_label', 'metro_label'], how='outer', suffixes=('_snowflake', '_acs')).fillna(0)
    
    print(f"   {'Bucket':<12} {'Metro':<8} {'Snowflake':<10} {'ACS':<8} {'Match?':<8}")
    print(f"   {'-'*12} {'-'*8} {'-'*10} {'-'*8} {'-'*8}")
    
    for _, row in comparison.iterrows():
        bucket = row['pop_label']
        metro = row['metro_label']
        sf_count = int(row['total_units'])
        acs_count = int(row['acs_units'])
        match = "✓" if sf_count == acs_count else "✗"
        
        print(f"   {bucket:<12} {metro:<8} {sf_count:<10} {acs_count:<8} {match:<8}")
    
    # 6. 1M+ specific analysis
    print("\n6. 1M+ COUNTIES SPECIFIC ANALYSIS:")
    
    # 1M+ counties from Snowflake
    counties_1m_snowflake = set(full_county_universe[full_county_universe['pop_label'] == '1M+']['county_fips'])
    
    # 1M+ counties with ACS data
    counties_1m_acs = set(county_employed[county_employed['pop_label'] == '1M+']['county_fips'])
    
    # 1M+ counties with metro status
    counties_1m_with_metro = set(counties_with_metro[counties_with_metro['pop_label'] == '1M+']['county_fips'])
    
    print(f"   Total 1M+ counties (Snowflake): {len(counties_1m_snowflake):,}")
    print(f"   1M+ counties with ACS data: {len(counties_1m_acs):,}")
    print(f"   1M+ counties with metro status: {len(counties_1m_with_metro):,}")
    
    # Rural/Urban split for 1M+
    counties_1m_rural = len(counties_with_metro[(counties_with_metro['pop_label'] == '1M+') & (counties_with_metro['metro_label'] == 'Rural')])
    counties_1m_urban = len(counties_with_metro[(counties_with_metro['pop_label'] == '1M+') & (counties_with_metro['metro_label'] == 'Urban')])
    
    print(f"   1M+ Rural counties (denominator): {counties_1m_rural:,}")
    print(f"   1M+ Urban counties (denominator): {counties_1m_urban:,}")
    
    print("\n7. CONCLUSION:")
    if len(counties_with_metro) == len(metro_mapping):
        print("   ✓ Your coverage denominators = ACS denominators")
        print("   ✓ Only counties with employment data are included")
        print("   ✓ 100% coverage makes sense - you have data for all counties you can analyze")
    else:
        print("   ✗ Your coverage denominators ≠ ACS denominators") 
        print("   ✗ Some counties without employment data are included")
        print("   ✗ Coverage percentages may be misleading")
    
    print("=" * 80)

# Run the debug
debug_coverage_denominators(county_employed, full_county_universe)

DEBUGGING COVERAGE DENOMINATORS

1. BASIC DATASET COUNTS:
   Total counties in Snowflake: 3,144
   Counties with ACS employment data: 493

2. POPULATION BUCKET BREAKDOWN:
   Snowflake population distribution:
     0-25K: 1,526 counties
     100K-1M: 570 counties
     1M+: 48 counties
     25K-50K: 616 counties
     50K-75K: 252 counties
     75K-100K: 132 counties

   ACS population distribution (after Snowflake mapping):
     100K-1M: 399 counties
     1M+: 45 counties
     Unknown County: 49 counties

3. METRO STATUS BREAKDOWN:
   Counties with metro status mapping: 535
   Snowflake counties that can be assigned metro status: 445
   Snowflake counties without metro status: 2,700

4. POPULATION + METRO COMBINATIONS:
   Denominator combinations (what rep matrices use):
     100K-1M Rural: 47 counties
     100K-1M Urban: 353 counties
     1M+ Urban: 45 counties

   Pure ACS combinations:
     100K-1M Rural: 47 counties
     100K-1M Urban: 353 counties
     1M+ Urban: 45 counties
     Un

In [66]:
puma_coverage

Unnamed: 0,pop_label,total_pumas,rural_pumas,urban_pumas,total_employment_pct,rural_employment_pct,urban_employment_pct,total_employment,rural_employment,urban_employment,total_naics_2digit_coverage_pct,rural_naics_2digit_coverage_pct,urban_naics_2digit_coverage_pct,total_naics_3digit_coverage_pct,rural_naics_3digit_coverage_pct,urban_naics_3digit_coverage_pct,total_naics_4digit_coverage_pct,rural_naics_4digit_coverage_pct,urban_naics_4digit_coverage_pct
0,0-25K,0,0,0,0.0,0.0,0.0,0,0,0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,25K-50K,0,0,0,0.0,0.0,0.0,0,0,0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2,50K-75K,0,0,0,0.0,0.0,0.0,0,0,0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3,75K-100K,34,3,31,1.07,0.07,0.99,1711081,117616,1593465,100.0,100.0,100.0,94.16,91.76,94.34,85.59,75.74,86.32
4,100K-1M,2428,506,1922,98.93,19.31,79.62,158872334,31016329,127856005,100.0,100.0,100.0,92.96,92.46,93.08,84.1,82.19,84.56
5,1M+,0,0,0,0.0,0.0,0.0,0,0,0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [67]:
county_coverage

Unnamed: 0,pop_label,total_counties,rural_counties,urban_counties,total_employment_pct,rural_employment_pct,urban_employment_pct,total_employment,rural_employment,urban_employment,total_naics_2digit_coverage_pct,rural_naics_2digit_coverage_pct,urban_naics_2digit_coverage_pct,total_naics_3digit_coverage_pct,rural_naics_3digit_coverage_pct,urban_naics_3digit_coverage_pct,total_naics_4digit_coverage_pct,rural_naics_4digit_coverage_pct,urban_naics_4digit_coverage_pct
0,0-25K,0,0,0,0.0,0.0,0.0,0.0,0,0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,25K-50K,0,0,0,0.0,0.0,0.0,0.0,0,0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2,50K-75K,0,0,0,0.0,0.0,0.0,0.0,0,0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3,75K-100K,0,0,0,0.0,0.0,0.0,0.0,0,0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,100K-1M,400,47,353,40.72,2.53,38.19,65391071.0,4069053,61322018,100.0,100.0,100.0,93.16,93.56,93.13,84.73,85.76,84.66
5,1M+,45,0,45,28.74,0.0,28.74,46154828.0,0,46154828,100.0,0.0,100.0,93.33,0.0,93.33,84.86,0.0,84.86
6,Population Unknown,0,0,0,30.54,0.0,0.0,49037516.0,0,0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [68]:
puma_employment_shares

Unnamed: 0,naics_level,industry_code,industry_title,rural_urban_status,0-25K,25K-50K,50K-75K,75K-100K,100K-1M,1M+,Unknown County,geo_level
0,naics_2digit,11,"Agriculture, Forestry, Fishing and Hunting",Non-metro/Rural,0.0,0.0,0.0,8.23,2.89,0.0,0.0,PUMA
1,naics_2digit,11,"Agriculture, Forestry, Fishing and Hunting",Metropolitan,0.0,0.0,0.0,0.34,0.78,0.0,0.0,PUMA
2,naics_2digit,21,"Mining, Quarrying, and Oil and Gas Extraction",Non-metro/Rural,0.0,0.0,0.0,1.77,0.93,0.0,0.0,PUMA
3,naics_2digit,21,"Mining, Quarrying, and Oil and Gas Extraction",Metropolitan,0.0,0.0,0.0,0.05,0.29,0.0,0.0,PUMA
4,naics_2digit,22,Utilities,Non-metro/Rural,0.0,0.0,0.0,1.60,1.13,0.0,0.0,PUMA
...,...,...,...,...,...,...,...,...,...,...,...,...
605,naics_4digit,92M1,"Administration of environmental quality, and h...",Metropolitan,0.0,0.0,0.0,0.17,0.18,0.0,0.0,PUMA
606,naics_4digit,92M2,Administration of economic programs and space ...,Non-metro/Rural,0.0,0.0,0.0,1.40,0.46,0.0,0.0,PUMA
607,naics_4digit,92M2,Administration of economic programs and space ...,Metropolitan,0.0,0.0,0.0,0.40,0.42,0.0,0.0,PUMA
608,naics_4digit,92MP,"Justice, public order, and safety activities",Non-metro/Rural,0.0,0.0,0.0,3.63,2.65,0.0,0.0,PUMA


In [69]:
# Check if shares sum to 100% for each area type
verification = county_employment_shares.groupby(['rural_urban_status', 'naics_level'])[
    ['0-25K', '25K-50K', '50K-75K', '75K-100K', '100K-1M', '1M+', 'Unknown County']
].sum()

print("Sums by area type (should be ~100%):")
print(verification)

Sums by area type (should be ~100%):
                                 0-25K  25K-50K  50K-75K  75K-100K  100K-1M  \
rural_urban_status naics_level                                                
Non-metro/Rural    naics_2digit    0.0      0.0      0.0       0.0    99.99   
                   naics_3digit    0.0      0.0      0.0       0.0    99.96   
                   naics_4digit    0.0      0.0      0.0       0.0    99.98   
Metropolitan       naics_2digit    0.0      0.0      0.0       0.0   100.02   
                   naics_3digit    0.0      0.0      0.0       0.0    99.99   
                   naics_4digit    0.0      0.0      0.0       0.0   100.03   

                                    1M+  Unknown County  
rural_urban_status naics_level                           
Non-metro/Rural    naics_2digit    0.00          100.01  
                   naics_3digit    0.00          100.02  
                   naics_4digit    0.00           99.97  
Metropolitan       naics_2digit  100.01 

  verification = county_employment_shares.groupby(['rural_urban_status', 'naics_level'])[


In [70]:
county_employment_shares

Unnamed: 0,naics_level,industry_code,industry_title,rural_urban_status,0-25K,25K-50K,50K-75K,75K-100K,100K-1M,1M+,Unknown County,geo_level
610,naics_2digit,11,"Agriculture, Forestry, Fishing and Hunting",Non-metro/Rural,0.0,0.0,0.0,0.0,1.25,0.00,3.16,County
611,naics_2digit,11,"Agriculture, Forestry, Fishing and Hunting",Metropolitan,0.0,0.0,0.0,0.0,0.90,0.42,1.19,County
612,naics_2digit,21,"Mining, Quarrying, and Oil and Gas Extraction",Non-metro/Rural,0.0,0.0,0.0,0.0,0.20,0.00,1.04,County
613,naics_2digit,21,"Mining, Quarrying, and Oil and Gas Extraction",Metropolitan,0.0,0.0,0.0,0.0,0.30,0.22,0.37,County
614,naics_2digit,22,Utilities,Non-metro/Rural,0.0,0.0,0.0,0.0,0.76,0.00,1.19,County
...,...,...,...,...,...,...,...,...,...,...,...,...
1215,naics_4digit,92M1,"Administration of environmental quality, and h...",Metropolitan,0.0,0.0,0.0,0.0,0.19,0.14,0.23,County
1216,naics_4digit,92M2,Administration of economic programs and space ...,Non-metro/Rural,0.0,0.0,0.0,0.0,0.45,0.00,0.47,County
1217,naics_4digit,92M2,Administration of economic programs and space ...,Metropolitan,0.0,0.0,0.0,0.0,0.45,0.34,0.49,County
1218,naics_4digit,92MP,"Justice, public order, and safety activities",Non-metro/Rural,0.0,0.0,0.0,0.0,1.83,0.00,2.78,County


In [71]:
# Check if shares sum to 100% for each area type
verification = county_employment_shares.groupby(['rural_urban_status', 'naics_level'])[
    ['0-25K', '25K-50K', '50K-75K', '75K-100K', '100K-1M', '1M+', 'Unknown County']
].sum()

print("Sums by area type (should be ~100%):")
print(verification)

Sums by area type (should be ~100%):
                                 0-25K  25K-50K  50K-75K  75K-100K  100K-1M  \
rural_urban_status naics_level                                                
Non-metro/Rural    naics_2digit    0.0      0.0      0.0       0.0    99.99   
                   naics_3digit    0.0      0.0      0.0       0.0    99.96   
                   naics_4digit    0.0      0.0      0.0       0.0    99.98   
Metropolitan       naics_2digit    0.0      0.0      0.0       0.0   100.02   
                   naics_3digit    0.0      0.0      0.0       0.0    99.99   
                   naics_4digit    0.0      0.0      0.0       0.0   100.03   

                                    1M+  Unknown County  
rural_urban_status naics_level                           
Non-metro/Rural    naics_2digit    0.00          100.01  
                   naics_3digit    0.00          100.02  
                   naics_4digit    0.00           99.97  
Metropolitan       naics_2digit  100.01 

  verification = county_employment_shares.groupby(['rural_urban_status', 'naics_level'])[


In [72]:
naics_rep_puma

Unnamed: 0,industry_code,industry_title,0-25K Rural,0-25K Urban,25K-50K Rural,25K-50K Urban,50K-75K Rural,50K-75K Urban,75K-100K Rural,75K-100K Urban,100K-1M Rural,100K-1M Urban,1M+ Rural,1M+ Urban
0,11,"Agriculture, Forestry, Fishing and Hunting",0.0,0.0,0.0,0.0,0.0,0.0,100.0,100.0,100.0,99.4,0.0,0.0
1,111,Crop production,0.0,0.0,0.0,0.0,0.0,0.0,100.0,93.5,99.8,96.4,0.0,0.0
2,112,Animal production and aquaculture,0.0,0.0,0.0,0.0,0.0,0.0,100.0,74.2,99.4,80.2,0.0,0.0
3,113,Forestry and Logging,0.0,0.0,0.0,0.0,0.0,0.0,100.0,32.3,86.6,46.1,0.0,0.0
4,1133,Logging,0.0,0.0,0.0,0.0,0.0,0.0,66.7,9.7,75.5,25.7,0.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
389,9281P,National security and international affairs,0.0,0.0,0.0,0.0,0.0,0.0,100.0,100.0,98.6,98.6,0.0,0.0
390,92M,Unclassified,0.0,0.0,0.0,0.0,0.0,0.0,100.0,100.0,100.0,100.0,0.0,0.0
391,92M1,"Administration of environmental quality, and h...",0.0,0.0,0.0,0.0,0.0,0.0,100.0,87.1,97.6,91.9,0.0,0.0
392,92M2,Administration of economic programs and space ...,0.0,0.0,0.0,0.0,0.0,0.0,100.0,96.8,99.8,99.0,0.0,0.0


In [73]:
naics_rep_county

Unnamed: 0,industry_code,industry_title,0-25K Rural,0-25K Urban,25K-50K Rural,25K-50K Urban,50K-75K Rural,50K-75K Urban,75K-100K Rural,75K-100K Urban,100K-1M Rural,100K-1M Urban,1M+ Rural,1M+ Urban
0,11,"Agriculture, Forestry, Fishing and Hunting",0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,100.0,100.0,0.0,100.0
1,111,Crop production,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,100.0,100.0,0.0,100.0
2,112,Animal production and aquaculture,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,100.0,97.7,0.0,100.0
3,113,Forestry and Logging,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,83.0,76.2,0.0,93.3
4,1133,Logging,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,63.8,53.5,0.0,75.6
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
389,9281P,National security and international affairs,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,100.0,99.4,0.0,100.0
390,92M,Unclassified,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,100.0,100.0,0.0,100.0
391,92M1,"Administration of environmental quality, and h...",0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,95.7,98.0,0.0,100.0
392,92M2,Administration of economic programs and space ...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,100.0,100.0,0.0,100.0


In [74]:
puma_soc_coverage

Unnamed: 0,pop_label,total_pumas,rural_pumas,urban_pumas,total_employment_pct,rural_employment_pct,urban_employment_pct,total_employment,rural_employment,urban_employment,...,urban_soc_3digit_coverage_pct,total_soc_4digit_coverage_pct,rural_soc_4digit_coverage_pct,urban_soc_4digit_coverage_pct,total_soc_5digit_coverage_pct,rural_soc_5digit_coverage_pct,urban_soc_5digit_coverage_pct,total_soc_6digit_coverage_pct,rural_soc_6digit_coverage_pct,urban_soc_6digit_coverage_pct
0,0-25K,0,0,0,0.0,0.0,0.0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,25K-50K,0,0,0,0.0,0.0,0.0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2,50K-75K,0,0,0,0.0,0.0,0.0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3,75K-100K,34,3,31,1.07,0.07,0.99,1711081,117616,1593465,...,100.0,100.0,100.0,100.0,100.0,100.0,100.0,0.0,0.0,0.0
4,100K-1M,2428,506,1922,98.93,19.31,79.62,158872334,31016329,127856005,...,100.0,100.0,100.0,100.0,100.0,100.0,100.0,0.0,0.0,0.0
5,1M+,0,0,0,0.0,0.0,0.0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [75]:
county_soc_coverage

Unnamed: 0,pop_label,total_counties,rural_counties,urban_counties,total_employment_pct,rural_employment_pct,urban_employment_pct,total_employment,rural_employment,urban_employment,...,urban_soc_3digit_coverage_pct,total_soc_4digit_coverage_pct,rural_soc_4digit_coverage_pct,urban_soc_4digit_coverage_pct,total_soc_5digit_coverage_pct,rural_soc_5digit_coverage_pct,urban_soc_5digit_coverage_pct,total_soc_6digit_coverage_pct,rural_soc_6digit_coverage_pct,urban_soc_6digit_coverage_pct
0,0-25K,1526,1526,0,0.0,0.0,0.0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,25K-50K,616,616,0,0.0,0.0,0.0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2,50K-75K,252,252,0,0.0,0.0,0.0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3,75K-100K,132,0,132,0.0,0.0,0.0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,100K-1M,571,47,524,40.72,2.53,38.19,65391071,4069053,61322018,...,100.0,100.0,100.0,100.0,100.0,100.0,100.0,0.0,0.0,0.0
5,1M+,48,0,48,28.74,0.0,28.74,46154828,0,46154828,...,100.0,100.0,0.0,100.0,100.0,0.0,100.0,0.0,0.0,0.0


In [76]:
puma_soc_employment_shares

Unnamed: 0,soc_level,occupation_code,occupation_title,rural_urban_status,0-25K,25K-50K,50K-75K,75K-100K,100K-1M,1M+,Unknown County,geo_level
0,soc_2digit,11,Management Occupations,Non-metro/Rural,0.0,0.0,0.0,13.18,10.09,0.0,0.0,PUMA
1,soc_2digit,11,Management Occupations,Metropolitan,0.0,0.0,0.0,12.43,11.60,0.0,0.0,PUMA
2,soc_2digit,13,Business and Financial Operations Occupations,Non-metro/Rural,0.0,0.0,0.0,2.96,3.90,0.0,0.0,PUMA
3,soc_2digit,13,Business and Financial Operations Occupations,Metropolitan,0.0,0.0,0.0,7.14,6.39,0.0,0.0,PUMA
4,soc_2digit,15,Computer and Mathematical Occupations,Non-metro/Rural,0.0,0.0,0.0,1.53,1.89,0.0,0.0,PUMA
...,...,...,...,...,...,...,...,...,...,...,...,...
461,soc_4digit,55-100,Military Officer Special and Tactical Operatio...,Metropolitan,0.0,0.0,0.0,0.05,0.04,0.0,0.0,PUMA
462,soc_4digit,55-200,First-Line Enlisted Military Supervisors,Non-metro/Rural,0.0,0.0,0.0,0.02,0.04,0.0,0.0,PUMA
463,soc_4digit,55-200,First-Line Enlisted Military Supervisors,Metropolitan,0.0,0.0,0.0,0.03,0.03,0.0,0.0,PUMA
464,soc_4digit,55-300,Military Enlisted Tactical Operations and Air/...,Non-metro/Rural,0.0,0.0,0.0,0.01,0.17,0.0,0.0,PUMA


In [77]:
county_soc_employment_shares

Unnamed: 0,soc_level,occupation_code,occupation_title,rural_urban_status,0-25K,25K-50K,50K-75K,75K-100K,100K-1M,1M+,Unknown County,geo_level
466,soc_2digit,11,Management Occupations,Non-metro/Rural,0.0,0.0,0.0,0.0,10.37,0.00,10.06,County
467,soc_2digit,11,Management Occupations,Metropolitan,0.0,0.0,0.0,0.0,11.58,11.84,11.22,County
468,soc_2digit,13,Business and Financial Operations Occupations,Non-metro/Rural,0.0,0.0,0.0,0.0,5.29,0.00,3.69,County
469,soc_2digit,13,Business and Financial Operations Occupations,Metropolitan,0.0,0.0,0.0,0.0,6.25,6.92,5.71,County
470,soc_2digit,15,Computer and Mathematical Occupations,Non-metro/Rural,0.0,0.0,0.0,0.0,3.20,0.00,1.69,County
...,...,...,...,...,...,...,...,...,...,...,...,...
927,soc_4digit,55-100,Military Officer Special and Tactical Operatio...,Metropolitan,0.0,0.0,0.0,0.0,0.05,0.02,0.07,County
928,soc_4digit,55-200,First-Line Enlisted Military Supervisors,Non-metro/Rural,0.0,0.0,0.0,0.0,0.07,0.00,0.03,County
929,soc_4digit,55-200,First-Line Enlisted Military Supervisors,Metropolitan,0.0,0.0,0.0,0.0,0.04,0.02,0.05,County
930,soc_4digit,55-300,Military Enlisted Tactical Operations and Air/...,Non-metro/Rural,0.0,0.0,0.0,0.0,0.15,0.00,0.17,County


In [78]:
soc_rep_puma

Unnamed: 0,occupation_code,occupation_title,0-25K Rural,0-25K Urban,25K-50K Rural,25K-50K Urban,50K-75K Rural,50K-75K Urban,75K-100K Rural,75K-100K Urban,100K-1M Rural,100K-1M Urban,1M+ Rural,1M+ Urban
0,11,Management Occupations,0.0,0.0,0.0,0.0,0.0,0.0,100.0,100.0,100.0,100.0,0.0,0.0
1,11-1,Top Executives,0.0,0.0,0.0,0.0,0.0,0.0,100.0,100.0,100.0,100.0,0.0,0.0
2,11-100,General and Operations Managers,0.0,0.0,0.0,0.0,0.0,0.0,100.0,100.0,100.0,100.0,0.0,0.0
3,11-1011,Chief Executives,0.0,0.0,0.0,0.0,0.0,0.0,100.0,100.0,100.0,99.8,0.0,0.0
4,11-1021,General and Operations Managers,0.0,0.0,0.0,0.0,0.0,0.0,100.0,100.0,100.0,100.0,0.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
752,55-200,First-Line Enlisted Military Supervisors,0.0,0.0,0.0,0.0,0.0,0.0,33.3,19.4,30.2,28.8,0.0,0.0
753,55-2011,First-Line Supervisors of Air Crew Members,0.0,0.0,0.0,0.0,0.0,0.0,33.3,19.4,30.2,28.8,0.0,0.0
754,55-3,Military Enlisted Tactical Operations and Air/...,0.0,0.0,0.0,0.0,0.0,0.0,33.3,12.9,43.7,40.3,0.0,0.0
755,55-300,Military Enlisted Tactical Operations and Air/...,0.0,0.0,0.0,0.0,0.0,0.0,33.3,12.9,43.7,40.3,0.0,0.0


In [79]:
soc_rep_county

Unnamed: 0,occupation_code,occupation_title,0-25K Rural,0-25K Urban,25K-50K Rural,25K-50K Urban,50K-75K Rural,50K-75K Urban,75K-100K Rural,75K-100K Urban,100K-1M Rural,100K-1M Urban,1M+ Rural,1M+ Urban
0,11,Management Occupations,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,100.0,100.0,0.0,100.0
1,11-1,Top Executives,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,100.0,100.0,0.0,100.0
2,11-100,General and Operations Managers,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,100.0,100.0,0.0,100.0
3,11-1011,Chief Executives,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,100.0,100.0,0.0,100.0
4,11-1021,General and Operations Managers,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,100.0,100.0,0.0,100.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
752,55-200,First-Line Enlisted Military Supervisors,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,40.4,53.5,0.0,77.8
753,55-2011,First-Line Supervisors of Air Crew Members,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,40.4,53.5,0.0,77.8
754,55-3,Military Enlisted Tactical Operations and Air/...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,46.8,63.5,0.0,93.3
755,55-300,Military Enlisted Tactical Operations and Air/...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,46.8,63.5,0.0,93.3


In [80]:
missing_counties_df

Unnamed: 0,county_fips,county_name,pop_label,status,reason
0,01001,"Autauga County, Alabama",50K-75K,Missing from ACS,No employment data available
1,01005,"Barbour County, Alabama",0-25K,Missing from ACS,No employment data available
2,01007,"Bibb County, Alabama",0-25K,Missing from ACS,No employment data available
3,01009,"Blount County, Alabama",50K-75K,Missing from ACS,No employment data available
4,01011,"Bullock County, Alabama",0-25K,Missing from ACS,No employment data available
...,...,...,...,...,...
2744,51000,Unknown county 51000,Unknown County,Missing from Snowflake,Not in population dataset
2745,53000,Unknown county 53000,Unknown County,Missing from Snowflake,Not in population dataset
2746,54000,Unknown county 54000,Unknown County,Missing from Snowflake,Not in population dataset
2747,55000,Unknown county 55000,Unknown County,Missing from Snowflake,Not in population dataset


In [81]:
coverage_summary_df

Unnamed: 0,population_bucket,total_counties_snowflake,counties_with_acs_data,counties_missing_from_acs,coverage_percentage,employment_records
0,0-25K,1526,0,1526,0.0,0
1,25K-50K,616,0,616,0.0,0
2,50K-75K,252,0,252,0.0,0
3,75K-100K,132,0,132,0.0,0
4,100K-1M,570,399,171,70.0,2822034
5,1M+,48,45,3,94.0,1953520
