In [None]:
import pandas as pd
import censusdata
import csv
from pathlib import Path
import os
import requests
import sys

module_path = os.path.abspath(os.path.join("../.."))
if module_path not in sys.path:
    sys.path.append(module_path)

from data_pipeline.etl.sources.census.etl_utils import get_state_fips_codes
from data_pipeline.etl.base import ExtractTransformLoad
from data_pipeline.config import settings


ACS_YEAR = 2019

DATA_PATH = Path.cwd().parent / "data"
FIPS_CSV_PATH = DATA_PATH / "fips_states_2010.csv"

GEOID_FIELD_NAME = "GEOID10"
UNEMPLOYED_FIELD_NAME = "Unemployed Civilians (fraction)"

# Some display settings to make pandas outputs more readable.
pd.set_option("display.expand_frame_repr", False)
pd.set_option("display.precision", 2)

In [None]:
class Constants:
    def __init__(self):
        # TODO: redundant
        self.GEOID_FIELD_NAME: str = "GEOID10"
        self.DATA_PATH: str = Path.cwd().parent / "data"
        self.TMP_PATH: Path = DATA_PATH / "tmp"

        # Set constants for MSAs.
        self.GEOCORR_FILE_PATH: str = "/Users/lucas/Documents/usds/repos/lucasmbrown/misc/geocorr2014_all_states.csv"
        self.PLACE_FIELD_NAME: str = "Census Place Name"
        self.COUNTY_FIELD_NAME: str = "County Name"
        self.STATE_ABBREVIATION_FIELD_NAME: str = "State Abbreviation"
        self.MSA_FIELD_NAME: str = "Metropolitan/Micropolitan Statistical Area Name"
        self.MSA_ID_FIELD_NAME: str = "MSA ID"
        self.MSA_TYPE_FIELD_NAME: str = "MSA Type"

        # Constants for MSA median incomes
        self.ACS_YEAR: int = 2019
        self.MSA_MEDIAN_INCOME_URL: str = f"https://api.census.gov/data/{self.ACS_YEAR}/acs/acs5?get=B19013_001E&for=metropolitan%20statistical%20area/micropolitan%20statistical%20area"
        self.MSA_INCOME_FIELD_NAME: str = f"Median household income in the past 12 months (MSA; {self.ACS_YEAR} inflation-adjusted dollars)"
            
        # Constants for state median incomes
        self.STATE_MEDIAN_INCOME_URL: str = f"https://api.census.gov/data/{self.ACS_YEAR}/acs/acs5?get=B19013_001E&for=state"
        self.STATE_GEOID_FIELD_NAME: str = "GEOID2"
        self.STATE_MEDIAN_INCOME_FIELD_NAME: str = f"Median household income (State; {self.ACS_YEAR} inflation-adjusted dollars)"

        # Constants for output
        self.AMI_REFERENCE_FIELD_NAME: str = "AMI Reference"
        self.AMI_FIELD_NAME: str = "Area Median Income (State or metropolitan)"
        self.OUTPUT_PATH = self.DATA_PATH / "dataset" / "census_msa_median_income"


constants = Constants()

In [None]:
# Load and clean GEOCORR data

# Note: this data is generated by https://mcdc.missouri.edu/applications/geocorr2014.html, at the advice of the Census. 
# The specific query used is the following, which takes a couple of minutes to run: 
# https://mcdc.missouri.edu/cgi-bin/broker?_PROGRAM=apps.geocorr2014.sas&_SERVICE=MCDC_long&_debug=0&state=Mo29&state=Al01&state=Ak02&state=Az04&state=Ar05&state=Ca06&state=Co08&state=Ct09&state=De10&state=Dc11&state=Fl12&state=Ga13&state=Hi15&state=Id16&state=Il17&state=In18&state=Ia19&state=Ks20&state=Ky21&state=La22&state=Me23&state=Md24&state=Ma25&state=Mi26&state=Mn27&state=Ms28&state=Mt30&state=Ne31&state=Nv32&state=Nh33&state=Nj34&state=Nm35&state=Ny36&state=Nc37&state=Nd38&state=Oh39&state=Ok40&state=Or41&state=Pa42&state=Ri44&state=Sc45&state=Sd46&state=Tn47&state=Tx48&state=Ut49&state=Vt50&state=Va51&state=Wa53&state=Wv54&state=Wi55&state=Wy56&g1_=state&g1_=county&g1_=placefp&g1_=tract&g1_=bg&g2_=cbsa10&g2_=cbsatype10&wtvar=pop10&nozerob=1&title=&csvout=1&namoptf=b&listout=1&lstfmt=html&namoptr=b&oropt=&counties=&metros=&places=&latitude=&longitude=&locname=&distance=&kiloms=0&nrings=&r1=&r2=&r3=&r4=&r5=&r6=&r7=&r8=&r9=&r10=&lathi=&latlo=&longhi=&longlo=

geocorr_df = pd.read_csv(
    filepath_or_buffer=constants.GEOCORR_FILE_PATH,
    # Skip second row, which has descriptions.
    skiprows=[1],
    # The following need to remain as strings for all of their digits, not get converted to numbers.
    dtype={"tract": "string", "county": "string", "state": "string", "bg": "string", "cbsa10": "string"},
    low_memory=False,
)


# Strip the unnecessary period from the tract ID:
geocorr_df["tract"] = geocorr_df["tract"].str.replace(".", "", regex=False)

# Create the full GEOID out of the component parts. 
geocorr_df[constants.GEOID_FIELD_NAME] = (
    geocorr_df["county"] + geocorr_df["tract"] + geocorr_df["bg"]
)

# QA the combined field: 
tract_values = geocorr_df[constants.GEOID_FIELD_NAME].str.len().unique()
if any(tract_values != [12]):
    print(tract_values)
    raise ValueError("Some of the census BG data has the wrong length.")

# Rename some fields
geocorr_df.rename(
    columns={
        "placenm": constants.PLACE_FIELD_NAME,
        "cbsaname10": constants.MSA_FIELD_NAME,
        "cntyname": constants.COUNTY_FIELD_NAME,
        "stab": constants.STATE_ABBREVIATION_FIELD_NAME,
        "cbsa10": constants.MSA_ID_FIELD_NAME,
        "cbsatype10": constants.MSA_TYPE_FIELD_NAME,
    },
    inplace=True,
    errors="raise",
)

# Remove duplicated rows. 
# Some rows appear twice: once for the population within a CBG that's also within a census place, 
# and once for the population that's within a CBG that's *not* within a census place. 
# Drop the row that's not within a census place.

# Sort by whether the place has a place name: 
geocorr_df.sort_values(by=constants.PLACE_FIELD_NAME, axis=0, ascending=True, inplace=True)

# Drop all the duplicated rows except for the first one (which will have the place name):
rows_to_drop = geocorr_df.duplicated(keep="first", subset=[constants.GEOID_FIELD_NAME])
# Keep everything that's *not* a row to drop:
geocorr_df = geocorr_df[~rows_to_drop]

# Sort by GEOID again to put the dataframe back to original order: 
geocorr_df.sort_values(by=constants.GEOID_FIELD_NAME, axis=0, ascending=True, inplace=True)


if len(geocorr_df) > 220333:
    raise ValueError("Too many CBGs.")

geocorr_df

In [None]:
# Load and clean MSA income data
download = requests.get(constants.MSA_MEDIAN_INCOME_URL, verify=None)
msa_median_incomes = json.loads(download.content)

# Remove first list entry, which is the column names.
column_names = msa_median_incomes.pop(0)

msa_median_incomes_df = pd.DataFrame(data=msa_median_incomes, columns=column_names)

msa_median_incomes_df.rename(
    columns={
        f"B19013_001E": constants.MSA_INCOME_FIELD_NAME,
        "metropolitan statistical area/micropolitan statistical area": constants.MSA_ID_FIELD_NAME
    },
    inplace=True,
    errors="raise",
)

# Convert MSA ID to str
msa_median_incomes_df[constants.MSA_ID_FIELD_NAME] = msa_median_incomes_df[constants.MSA_ID_FIELD_NAME].astype(str)

print(msa_median_incomes_df)

In [None]:
#QA 

print(f"Length of msa_median_incomes_df MSA IDs: {len(msa_median_incomes_df[constants.MSA_ID_FIELD_NAME])}")
print(f"Length of msa_median_incomes_df unique MSA IDs: {len(msa_median_incomes_df[constants.MSA_ID_FIELD_NAME].unique())}")


print(f"Length of geocorr_df MSA IDs: {len(geocorr_df[constants.MSA_ID_FIELD_NAME])}")
print(f"Length of geocorr_df unique MSA IDs: {len(geocorr_df[constants.MSA_ID_FIELD_NAME].unique())}")

print(f"Length of geocorr_df geoids: {len(geocorr_df[constants.GEOID_FIELD_NAME])}")
print(f"Length of geocorr_df unique geoids: {len(geocorr_df[constants.GEOID_FIELD_NAME].unique())}")

In [None]:
# Join CBGs on MSA incomes
merged_df = geocorr_df.merge(msa_median_incomes_df, on=constants.MSA_ID_FIELD_NAME, how="left")

if len(merged_df) > 220333:
    raise ValueError("Too many CBGs in join.")

In [None]:
# Load state incomes 
# Load and clean MSA income data
download = requests.get(constants.STATE_MEDIAN_INCOME_URL, verify=None)
state_median_income = json.loads(download.content)

# Remove first list entry, which is the column names.
column_names = state_median_income.pop(0)

state_median_income_df = pd.DataFrame(data=state_median_income, columns=column_names)

state_median_income_df.rename(
    columns={
        "B19013_001E": constants.STATE_MEDIAN_INCOME_FIELD_NAME,
        "state": constants.STATE_GEOID_FIELD_NAME,
    },
    inplace=True,
    errors="raise",
)

state_median_income_df.head()

In [None]:
# Merge state income with CBGs
# Join state data on CBG data:
merged_df[constants.STATE_GEOID_FIELD_NAME] = (
    merged_df[constants.GEOID_FIELD_NAME].astype(str).str[0:2]
)

merged_with_state_income_df = merged_df.merge(
    state_median_income_df,
    how="left",
    on=constants.STATE_GEOID_FIELD_NAME,
)

# Choose reference income: MSA if MSA type is Metro, otherwise use State.
merged_with_state_income_df[constants.AMI_REFERENCE_FIELD_NAME] = [
    "MSA" if msa_type == "Metro" else "State"
    for msa_type in merged_with_state_income_df[constants.MSA_TYPE_FIELD_NAME]
]

# Populate reference income: MSA income if reference income is MSA, state income if reference income is state.
merged_with_state_income_df[
    constants.AMI_FIELD_NAME
] = merged_with_state_income_df.apply(
    lambda x: x[constants.MSA_INCOME_FIELD_NAME]
    if x[constants.AMI_REFERENCE_FIELD_NAME] == "MSA"
    else x[constants.STATE_MEDIAN_INCOME_FIELD_NAME],
    axis=1,
)

merged_with_state_income_df

In [None]:
# Write to disk
columns_to_keep = [
    constants.GEOID_FIELD_NAME,
    constants.PLACE_FIELD_NAME,
    constants.COUNTY_FIELD_NAME,
    constants.STATE_ABBREVIATION_FIELD_NAME,
    constants.MSA_FIELD_NAME,
    constants.MSA_ID_FIELD_NAME,
    constants.MSA_TYPE_FIELD_NAME,
    constants.MSA_INCOME_FIELD_NAME,
    constants.STATE_GEOID_FIELD_NAME,
    constants.STATE_MEDIAN_INCOME_FIELD_NAME,
    constants.AMI_REFERENCE_FIELD_NAME,
    constants.AMI_FIELD_NAME,
]


constants.OUTPUT_PATH.mkdir(parents=True, exist_ok=True)
merged_with_state_income_df[columns_to_keep].to_csv(
    path_or_buf=constants.CSV_PATH / "usa.csv", index=False
)