## Required Libraries

In [0]:
from pyspark.sql.functions import col, count, rank, trim, regexp_replace, dense_rank
from pyspark.sql.window import Window
import requests
import pandas as pd
import numpy as np


## Fetch data for all State FIPS

In [0]:
# Your API key
API_KEY = "d4f069b303bf1ede6d3c01948e3113bf21da475f"

# List of all state FIPS codes
STATE_FIPS_LIST = [
    "01", "02", "04", "05", "06", "08", "09", "10", "11", "12", "13", "15", "16", "17", "18", "19", "20", "21", "22", "23", "24", "25", "26", "27", "28", "29", "30", "31", "32", "33", "34", "35", "36", "37", "38", "39", "40", "41", "42", "44", "45", "46", "47", "48", "49", "50", "51", "53", "54", "55", "56"]

# Years for which to fetch data
YEARS = [year for year in range(2015, 2024)]

# List to store data for each year
all_county_data = []

for year in YEARS:
    # Base URL for the Census API
    BASE_URL = f"https://api.census.gov/data/{year}/acs/acs5"

    for state_fips in STATE_FIPS_LIST:
        # Parameters for the API request
        params = {
            "get": "NAME,B03002_001E,B03002_003E,B03002_004E,B03002_006E,B03002_007E,B03002_012E,B03002_008E,B03002_014E,B03002_017E,B03002_013E,B03002_011E",
            "for": "county:*",  # Fetch data for all counties
            "in": f"state:{state_fips}",  # State filter
            "key": API_KEY,
        }

        # Make the API request
        response = requests.get(BASE_URL, params=params)

        if response.status_code == 200:
            # Parse the response JSON
            data = response.json()

            # Extract headers and data rows
            headers = data[0]
            values = data[1:]

            # Convert to DataFrame
            year_df = pd.DataFrame(values, columns=headers)

            # Add a column for the year
            year_df["year"] = year

            # Add a column for the state FIPS code
            year_df["State FIPS"] = state_fips

            # Mapping of column codes to their actual names
            column_mapping = {
                "B03002_001E": "total_population",
                "B03002_003E": "White",
                "B03002_004E": "Black",
                "B03002_006E": "Asian",
                "B03002_007E": "Native American",
                "B03002_012E": "Hispanic",
                "B03002_008E": "Other",
                "B03002_014E": "Black: Hispanic",
                "B03002_017E": "Native American: Hispanic",
                "B03002_013E": "White: Hispanic",
                "B03002_011E": "Two or More Races"
            }


            # Rename columns
            year_df.rename(columns=column_mapping, inplace=True)

            year_df = year_df.drop(columns=["state", "county", "State FIPS"])

            year_df[["county", "state"]] = year_df["NAME"].str.split(",", expand=True)
            year_df["state"] = year_df["state"].str.strip()

            # Drop unnecessary state and county FIPS columns
            year_df = year_df.drop(columns=["NAME"])

            # Append to the list
            all_county_data.append(year_df)
        else:
            print(f"Error fetching data for {year} - State FIPS {state_fips}: {response.status_code}, {response.text}")


## Harmonization and %Race Calculation

In [0]:

# Combine all years and states into a single DataFrame
county_race_df = pd.concat(all_county_data, ignore_index=True)
county_race_df = county_race_df.melt(id_vars=["county", "state", "year", "total_population"], var_name="race", value_name="population")

# Mapping dictionary
race_mapping = {
    "White": "W",
    "Black": "B",
    "Asian": "A",
    "Native American": "N",
    "Hispanic": "H",
    "Native American: Hispanic": "N;H",
    "White: Hispanic": "W;H",
    "Black: Hispanic": "B;H",
    "Two or More Races": "ToM",
    "Other": "O"
}

# Dictionary of state name to abbreviation mapping
state_abbr_dict = {
    'Alabama': 'AL',
    'Alaska': 'AK',
    'Arizona': 'AZ',
    'Arkansas': 'AR',
    'California': 'CA',
    'Colorado': 'CO',
    'Connecticut': 'CT',
    'Delaware': 'DE',
    'Florida': 'FL',
    'Georgia': 'GA',
    'Hawaii': 'HI',
    'Idaho': 'ID',
    'Illinois': 'IL',
    'Indiana': 'IN',
    'Iowa': 'IA',
    'Kansas': 'KS',
    'Kentucky': 'KY',
    'Louisiana': 'LA',
    'Maine': 'ME',
    'Maryland': 'MD',
    'Massachusetts': 'MA',
    'Michigan': 'MI',
    'Minnesota': 'MN',
    'Mississippi': 'MS',
    'Missouri': 'MO',
    'Montana': 'MT',
    'Nebraska': 'NE',
    'Nevada': 'NV',
    'New Hampshire': 'NH',
    'New Jersey': 'NJ',
    'New Mexico': 'NM',
    'New York': 'NY',
    'North Carolina': 'NC',
    'North Dakota': 'ND',
    'Ohio': 'OH',
    'Oklahoma': 'OK',
    'Oregon': 'OR',
    'Pennsylvania': 'PA',
    'Rhode Island': 'RI',
    'South Carolina': 'SC',
    'South Dakota': 'SD',
    'Tennessee': 'TN',
    'Texas': 'TX',
    'Utah': 'UT',
    'Vermont': 'VT',
    'Virginia': 'VA',
    'Washington': 'WA',
    'West Virginia': 'WV',
    'Wisconsin': 'WI',
    'Wyoming': 'WY'
}


# Apply mapping
county_race_df['race'] = county_race_df['race'].map(race_mapping)
county_race_df['state'] = county_race_df['state'].map(state_abbr_dict)

# cast datatypes and get percentage population race
county_race_df["population"]  = county_race_df["population"].astype("int")
county_race_df["total_population"]  = county_race_df["total_population"].astype("int")

county_race_df = county_race_df.groupby(['county', 'year', 'race'], as_index=False).agg({
    'population': 'sum',
    'total_population': 'sum',

})

# cast datatypes and get percentage population race
county_race_df["%population_race"] = round((county_race_df["population"] / county_race_df["total_population"]) * 100, 2)


## Assign Ranks According to Desc Order of %Race Population

In [0]:
spark_county_race_df = spark.createDataFrame(county_race_df)
spark_county_race_df = spark_county_race_df.withColumn("county", trim(regexp_replace(col("county"), "County", "")))
window_race = Window.partitionBy("county", "year").orderBy(col("%population_race").desc())
spark_county_race_df = spark_county_race_df.withColumn("race_population_rank", dense_rank().over(window_race).alias("race_population_rank"))


## Write Pre-Processed Data to Delta Table

In [0]:
spark_county_race_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("default.percentage_population_race_per_county_by_year")

## Example Testing

In [0]:
display(spark_county_race_df.filter((col("year") == 2023) & (col("county") == "Bristol")))

county,year,race,population,total_population,%population_race,race_population_rank
Bristol,2023,W,488309,629004,77.63,1
Bristol,2023,H,58843,629004,9.35,2
Bristol,2023,B,23394,629004,3.72,3
Bristol,2023,ToM,15335,629004,2.44,4
Bristol,2023,A,14804,629004,2.35,5
Bristol,2023,W;H,11849,629004,1.88,6
Bristol,2023,O,9580,629004,1.52,7
Bristol,2023,B;H,2640,629004,0.42,8
Bristol,2023,N;H,141,629004,0.02,9
Bristol,2023,N,62,629004,0.01,10
