In [0]:
%run /Workspace/Repos/yubin.park@mimilabs.ai/mimi-common-utils/ingestion_utils

In [0]:
filepath_geos = "/Volumes/mimi_ws_1/census/src/acs/downloads/Geos20235YR.txt"
geo_key_columns = [
    'GEO_ID',      # Primary key for joining
    'NAME',        # Human-readable name (e.g., "Census Tract 9501, Autauga County, Alabama")
    'STUSAB',      # State abbreviation (e.g., "AL", "CA")
    'SUMLEVEL',    # Geographic level (140=tract, 150=block group, 050=county)
    'STATE',       # State FIPS code
    'COUNTY',      # County FIPS code
    'TRACT',       # Census tract (if applicable)
    'BLKGRP',      # Block group (if applicable)
    'PLACE',       # Place/city FIPS code (if applicable)
    'ZCTA5'        # ZIP code tabulation area (if applicable)
]
df = spark.read.csv(filepath_geos, header=True, sep='|', inferSchema=True)
df = df.select(*geo_key_columns)
for col in geo_key_columns:
    df = df.withColumnRenamed(col, col.lower())

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import DataFrame
from typing import List, Dict, Optional
from pyspark.sql.types import DoubleType

volumepath = "/Volumes/mimi_ws_1/census/src/acs/2023_5yr_data"

In [0]:
def safe_divide(numerator_col, denominator_col, default_value=None):
    """Safely divide two columns, handling nulls and zeros"""
    if isinstance(numerator_col, str):
        numerator_col = F.col(numerator_col)
    if isinstance(denominator_col, str):
        denominator_col = F.col(denominator_col)
    
    return F.when(
        (denominator_col.isNotNull()) & (denominator_col > 0),
        numerator_col / denominator_col
    ).otherwise(default_value)

def safe_column_sum(columns: List[str]):
    """Safely sum columns, treating nulls as 0"""
    if not columns:
        return F.lit(0)
    result = F.lit(0)
    for col_name in columns:
        result = result + F.coalesce(F.col(col_name), F.lit(0))
    return result

def validate_rate(column_expr):
    """Ensure rate values are between 0 and 1"""
    return F.when(column_expr < 0, F.lit(0)) \
           .when(column_expr > 1, F.lit(1)) \
           .otherwise(column_expr)

# =============================================================================
# ACS VARIABLES WITH CORRECT FORMAT (B01001_E001 instead of B01001_001E)
# =============================================================================

acs_variables = {
    # =============================================================================
    # INCOME & ECONOMIC STATUS
    # =============================================================================
    "B19013": {
        "median_household_income": F.col("B19013_E001")
    },
    
    "B19301": {
        "per_capita_income": F.col("B19301_E001")
    },
    
    "B17001": {
        "poverty_rate_all_ages": validate_rate(
            safe_divide("B17001_E002", "B17001_E001")
        ),
        "child_poverty_rate": validate_rate(
            safe_divide(
                safe_column_sum(["B17001_E004", "B17001_E005", "B17001_E006", 
                                "B17001_E007", "B17001_E008", "B17001_E009"]),
                F.col("B17001_E003")
            )
        )
    },
    
    "B19083": {
        "gini_index_income_inequality": F.col("B19083_E001")
    },
    
    "B19057": {
        "public_assistance_rate": validate_rate(
            safe_divide("B19057_E002", "B19057_E001")
        )
    },
    
    "B22003": {
        "snap_recipients_rate": validate_rate(
            safe_divide("B22003_E002", "B22003_E001")
        )
    },
    
    "B23025": {
        "unemployment_rate": validate_rate(
            safe_divide("B23025_E005", "B23025_E002")
        ),
        "employment_rate": validate_rate(
            safe_divide("B23025_E004", "B23025_E002")
        ),
        "labor_force_participation_rate": validate_rate(
            safe_divide("B23025_E002", "B23025_E001")
        )
    },
    
    # =============================================================================
    # EDUCATION
    # =============================================================================
    "B15003": {
        "less_than_high_school_rate": validate_rate(
            safe_divide(
                safe_column_sum([
                    "B15003_E002", "B15003_E003", "B15003_E004", "B15003_E005",
                    "B15003_E006", "B15003_E007", "B15003_E008", "B15003_E009",
                    "B15003_E010", "B15003_E011", "B15003_E012", "B15003_E013",
                    "B15003_E014", "B15003_E015", "B15003_E016"
                ]),
                F.col("B15003_E001")
            )
        ),
        "high_school_graduate_rate": validate_rate(
            safe_divide(
                safe_column_sum(["B15003_E017", "B15003_E018"]),
                F.col("B15003_E001")
            )
        ),
        "some_college_or_associates_rate": validate_rate(
            safe_divide(
                safe_column_sum(["B15003_E019", "B15003_E020", "B15003_E021"]),
                F.col("B15003_E001")
            )
        ),
        "bachelors_degree_or_higher_rate": validate_rate(
            safe_divide(
                safe_column_sum([
                    "B15003_E022", "B15003_E023", "B15003_E024", "B15003_E025"
                ]),
                F.col("B15003_E001")
            )
        ),
        "graduate_degree_rate": validate_rate(
            safe_divide(
                safe_column_sum(["B15003_E023", "B15003_E024", "B15003_E025"]),
                F.col("B15003_E001")
            )
        )
    },
    
    # =============================================================================
    # HOUSING
    # =============================================================================
    "B25077": {
        "median_home_value": F.col("B25077_E001")
    },
    
    "B25064": {
        "median_gross_rent": F.col("B25064_E001")
    },
    
    "B25003": {
        "owner_occupied_rate": validate_rate(
            safe_divide("B25003_E002", "B25003_E001")
        ),
        "renter_occupied_rate": validate_rate(
            safe_divide("B25003_E003", "B25003_E001")
        )
    },
    
    "B25002": {
        "vacant_housing_rate": validate_rate(
            safe_divide("B25002_E003", "B25002_E001")
        )
    },
    
    "B25070": {
        "rent_burden_over_30pct_rate": validate_rate(
            safe_divide(
                safe_column_sum([
                    "B25070_E007", "B25070_E008", "B25070_E009", "B25070_E010"
                ]),
                F.col("B25070_E001")
            )
        ),
        "severe_rent_burden_over_50pct_rate": validate_rate(
            safe_divide("B25070_E010", "B25070_E001")
        )
    },
    
    "B25014": {
        "overcrowded_housing_rate": validate_rate(
            safe_divide(
                safe_column_sum([
                    "B25014_E005", "B25014_E006", "B25014_E007",
                    "B25014_E011", "B25014_E012", "B25014_E013"
                ]),
                F.col("B25014_E001")
            )
        ),
        "severely_overcrowded_rate": validate_rate(
            safe_divide(
                safe_column_sum([
                    "B25014_E006", "B25014_E007",
                    "B25014_E012", "B25014_E013"
                ]),
                F.col("B25014_E001")
            )
        )
    },
    
    # =============================================================================
    # DEMOGRAPHICS & AGE
    # =============================================================================
    "B01003": {
        "total_population": F.col("B01003_E001")
    },
    
    "B01001": {
        "age_under_18_rate": validate_rate(
            safe_divide(
                safe_column_sum([
                    "B01001_E003", "B01001_E004", "B01001_E005", "B01001_E006",
                    "B01001_E027", "B01001_E028", "B01001_E029", "B01001_E030"
                ]),
                F.col("B01001_E001")
            )
        ),
        "age_65_and_over_rate": validate_rate(
            safe_divide(
                safe_column_sum([
                    "B01001_E020", "B01001_E021", "B01001_E022",
                    "B01001_E023", "B01001_E024", "B01001_E025",
                    "B01001_E044", "B01001_E045", "B01001_E046",
                    "B01001_E047", "B01001_E048", "B01001_E049"
                ]),
                F.col("B01001_E001")
            )
        ),
        "working_age_18_64_rate": validate_rate(
            safe_divide(
                safe_column_sum([
                    "B01001_E007", "B01001_E008", "B01001_E009", "B01001_E010",
                    "B01001_E011", "B01001_E012", "B01001_E013", "B01001_E014",
                    "B01001_E015", "B01001_E016", "B01001_E017", "B01001_E018", "B01001_E019",
                    "B01001_E031", "B01001_E032", "B01001_E033", "B01001_E034",
                    "B01001_E035", "B01001_E036", "B01001_E037", "B01001_E038",
                    "B01001_E039", "B01001_E040", "B01001_E041", "B01001_E042", "B01001_E043"
                ]),
                F.col("B01001_E001")
            )
        )
    },
    
    "B01002": {
        "median_age": F.col("B01002_E001"),
        "median_age_male": F.col("B01002_E002"),
        "median_age_female": F.col("B01002_E003")
    },
    
    # =============================================================================
    # RACE & ETHNICITY
    # =============================================================================
    "B02001": {
        "white_alone_rate": validate_rate(
            safe_divide("B02001_E002", "B02001_E001")
        ),
        "black_african_american_rate": validate_rate(
            safe_divide("B02001_E003", "B02001_E001")
        ),
        "american_indian_alaska_native_rate": validate_rate(
            safe_divide("B02001_E004", "B02001_E001")
        ),
        "asian_rate": validate_rate(
            safe_divide("B02001_E005", "B02001_E001")
        ),
        "native_hawaiian_pacific_islander_rate": validate_rate(
            safe_divide("B02001_E006", "B02001_E001")
        ),
        "other_race_rate": validate_rate(
            safe_divide("B02001_E007", "B02001_E001")
        ),
        "two_or_more_races_rate": validate_rate(
            safe_divide("B02001_E008", "B02001_E001")
        )
    },
    
    "B03003": {
        "hispanic_latino_rate": validate_rate(
            safe_divide("B03003_E003", "B03003_E001")
        )
    },
    
    # =============================================================================
    # IMMIGRATION & LANGUAGE
    # =============================================================================
    "B05002": {
        "foreign_born_rate": validate_rate(
            safe_divide("B05002_E013", "B05002_E001")
        ),
        "naturalized_citizen_rate": validate_rate(
            safe_divide("B05002_E014", "B05002_E001")
        ),
        "not_us_citizen_rate": validate_rate(
            safe_divide("B05002_E021", "B05002_E001")
        )
    },
    
    "B16001": {
        "speaks_language_other_than_english_rate": validate_rate(
            safe_divide(
                F.col("B16001_E001") - F.col("B16001_E002"),
                F.col("B16001_E001")
            )
        )
    },
    
    "B16004": {
        "limited_english_proficiency_rate": validate_rate(
            safe_divide(
                safe_column_sum([
                    "B16004_E007", "B16004_E008", "B16004_E012", "B16004_E013",
                    "B16004_E017", "B16004_E018", "B16004_E022", "B16004_E023",
                    "B16004_E029", "B16004_E030", "B16004_E034", "B16004_E035",
                    "B16004_E039", "B16004_E040", "B16004_E044", "B16004_E045",
                    "B16004_E051", "B16004_E052", "B16004_E056", "B16004_E057",
                    "B16004_E061", "B16004_E062", "B16004_E066", "B16004_E067"
                ]),
                F.col("B16004_E001")
            )
        )
    },
    
    # =============================================================================
    # FAMILY STRUCTURE
    # =============================================================================
    "B11001": {
        "family_households_rate": validate_rate(
            safe_divide("B11001_E002", "B11001_E001")
        ),
        "married_couple_families_rate": validate_rate(
            safe_divide("B11001_E003", "B11001_E001")
        ),
        "single_person_households_rate": validate_rate(
            safe_divide("B11001_E008", "B11001_E001")
        )
    },
    
    "B11005": {
        "female_headed_households_with_children_rate": validate_rate(
            safe_divide("B11005_E007", "B11005_E001")
        ),
        "male_headed_households_with_children_rate": validate_rate(
            safe_divide("B11005_E005", "B11005_E001")
        )
    },
    
    # =============================================================================
    # EMPLOYMENT & OCCUPATION
    # =============================================================================
    "C24010": {
        "management_business_science_arts_rate": validate_rate(
            safe_divide(
                safe_column_sum(["C24010_E003", "C24010_E039"]),
                F.col("C24010_E001")
            )
        )
    },
    
    "C24030": {
        "manufacturing_employment_rate": validate_rate(
            safe_divide(
                safe_column_sum(["C24030_E007", "C24030_E034"]),
                F.col("C24030_E001")
            )
        ),
        "retail_trade_rate": validate_rate(
            safe_divide(
                safe_column_sum(["C24030_E009", "C24030_E036"]),
                F.col("C24030_E001")
            )
        ),
        "healthcare_social_assistance_rate": validate_rate(
            safe_divide(
                safe_column_sum(["C24030_E018", "C24030_E045"]),
                F.col("C24030_E001")
            )
        )
    },
    
    # =============================================================================
    # TRANSPORTATION
    # =============================================================================
    "B08303": {
        "commute_over_30_minutes_rate": validate_rate(
            safe_divide(
                safe_column_sum([
                    "B08303_E008", "B08303_E009", "B08303_E010",
                    "B08303_E011", "B08303_E012", "B08303_E013"
                ]),
                F.col("B08303_E001")
            )
        ),
        "commute_over_60_minutes_rate": validate_rate(
            safe_divide(
                safe_column_sum(["B08303_E012", "B08303_E013"]),
                F.col("B08303_E001")
            )
        )
    },
    
    "B08301": {
        "drove_alone_to_work_rate": validate_rate(
            safe_divide("B08301_E003", "B08301_E001")
        ),
        "carpooled_to_work_rate": validate_rate(
            safe_divide("B08301_E004", "B08301_E001")
        ),
        "public_transit_use_rate": validate_rate(
            safe_divide("B08301_E010", "B08301_E001")
        ),
        "walked_to_work_rate": validate_rate(
            safe_divide("B08301_E019", "B08301_E001")
        ),
        "worked_from_home_rate": validate_rate(
            safe_divide("B08301_E021", "B08301_E001")
        )
    },
    
    "B25044": {
        "no_vehicle_households_rate": validate_rate(
            safe_divide(
                safe_column_sum(["B25044_E003", "B25044_E010"]),
                F.col("B25044_E001")
            )
        )
    },
    
    # =============================================================================
    # HEALTH & DISABILITY
    # =============================================================================
    "B18101": {
        "disability_rate": validate_rate(
            safe_divide(
                safe_column_sum([
                    "B18101_E004", "B18101_E007", "B18101_E010",
                    "B18101_E013", "B18101_E016", "B18101_E019",
                    "B18101_E023", "B18101_E026", "B18101_E029",
                    "B18101_E032", "B18101_E035", "B18101_E038"
                ]),
                F.col("B18101_E001")
            )
        )
    },
    
    "B27001": {
        "no_health_insurance_rate": validate_rate(
            safe_divide(
                safe_column_sum([
                    "B27001_E005", "B27001_E008", "B27001_E011", "B27001_E014",
                    "B27001_E017", "B27001_E020", "B27001_E023", "B27001_E026", "B27001_E029",
                    "B27001_E033", "B27001_E036", "B27001_E039", "B27001_E042",
                    "B27001_E045", "B27001_E048", "B27001_E051", "B27001_E054", "B27001_E057"
                ]),
                F.col("B27001_E001")
            )
        )
    },
    
    # =============================================================================
    # TECHNOLOGY ACCESS
    # =============================================================================
    "B28002": {
        "broadband_internet_rate": validate_rate(
            safe_divide("B28002_E004", "B28002_E001")
        ),
        "no_internet_rate": validate_rate(
            safe_divide("B28002_E013", "B28002_E001")
        )
    },
    
    "B28001": {
        "has_computer_rate": validate_rate(
            safe_divide(
                F.col("B28001_E001") - F.col("B28001_E011"),
                F.col("B28001_E001")
            )
        ),
        "no_computer_rate": validate_rate(
            safe_divide("B28001_E011", "B28001_E001")
        )
    },
    
    # =============================================================================
    # ADDITIONAL HOUSING CHARACTERISTICS
    # =============================================================================
    "B25024": {
        "single_family_detached_rate": validate_rate(
            safe_divide("B25024_E002", "B25024_E001")
        ),
        "mobile_homes_rate": validate_rate(
            safe_divide("B25024_E010", "B25024_E001")
        ),
        "apartment_5plus_units_rate": validate_rate(
            safe_divide(
                safe_column_sum(["B25024_E007", "B25024_E008", "B25024_E009"]),
                F.col("B25024_E001")
            )
        )
    },
    
    "B25034": {
        "built_before_1980_rate": validate_rate(
            safe_divide(
                safe_column_sum([
                    "B25034_E008", "B25034_E009", "B25034_E010", "B25034_E011"
                ]),
                F.col("B25034_E001")
            )
        ),
        "built_2000_or_later_rate": validate_rate(
            safe_divide(
                safe_column_sum(["B25034_E002", "B25034_E003", "B25034_E004"]),
                F.col("B25034_E001")
            )
        )
    },
    
    # =============================================================================
    # ADDITIONAL DEMOGRAPHICS
    # =============================================================================
    "B21001": {
        "veterans_rate": validate_rate(
            safe_divide("B21001_E002", "B21001_E001")
        )
    },
    
    "B10051": {
        "grandparents_responsible_for_grandchildren_rate": validate_rate(
            safe_divide("B10051_E002", "B10051_E001")
        )
    },
    
    "B07003": {
        "moved_in_past_year_rate": validate_rate(
            safe_divide(
                F.col("B07003_E001") - F.col("B07003_E004"),
                F.col("B07003_E001")
            )
        ),
        "moved_from_different_state_rate": validate_rate(
            safe_divide("B07003_E013", "B07003_E001")
        )
    }
}

In [0]:
df_coll = {}
for filepath in Path(volumepath).glob("*.dat"):
    table_key = filepath.stem.split('-')[-1].upper()

    if table_key not in acs_variables:
        continue
    
    # Read everything as strings first (most reliable for ACS data)
    df_tab = spark.read.csv(
        str(filepath), 
        sep="|", 
        header=True, 
        inferSchema=False,  # Everything as strings
        nullValue="",
        nanValue="null"
    )
    
    # Cast numeric columns to Double
    for col in df_tab.columns:
        col_upper = col.upper()
        # Keep geographic identifiers as strings
        if any(geo in col_upper for geo in ['GEO', 'NAME', 'STATE', 'COUNTY', 
                                              'TRACT', 'PLACE', 'LOGRECNO', 'SUMLEVEL']):
            continue
        # Cast ACS data columns to Double
        elif any(suffix in col_upper for suffix in ['_E001', '_E002', '_E003', '_M001', '_M002']) or \
             (len(col) > 6 and col[-4] in ['E', 'M'] and col[-3:].isdigit()):
            df_tab = df_tab.withColumn(col, F.col(col).cast(DoubleType()))
    
    new_columns = []
    
    for column_name, column_eq in acs_variables[table_key].items():
        df_tab = df_tab.withColumn(column_name, column_eq)
        new_columns.append(column_name)
    df_tab = df_tab.withColumnRenamed('GEO_ID', 'geo_id')
    df_tab = df_tab.select('geo_id', *new_columns)
    df_coll[table_key] = df_tab

In [0]:
for table_key, df_tab in df_coll.items():
    # Outer join to keep all GEO_IDs
    df = df.join(df_tab, on='geo_id', how='left')
    print(f"Added {table_key}: {len(df_tab.columns)-1} variables")

In [0]:
from dateutil.parser import parse

In [0]:
df.withColumn('mimi_src_file_date', F.lit(parse('2023-12-31').date())) \
 .withColumn('mimi_src_file_name', F.lit('2023_5YRData.zip')) \
 .withColumn('mimi_dlt_load_date', F.lit(datetime.today().date())) \
 .write.mode('overwrite').saveAsTable('mimi_ws_1.census.acs2023_5yr_sf')

In [0]:
%sql
COMMENT ON TABLE mimi_ws_1.census.acs2023_5yr_sf IS '# [American Community Survey (ACS) 5 Year Estimates Summary File](https://www2.census.gov/programs-surveys/acs/summary_file/2023/) - 2019-2023 | interval: snapshot, resolution: geoid'