In [2]:

#The variables come from here:
# https://api.census.gov/data/2019/acs/acs1/variables.html

#importing packages
import pandas as pd
import censusdata
import gc
import re
from unittest.mock import inplace
import json
import os
import itertools
from datetime import date
import sys

def get_geoid_reference_df(geographies: dict, level:str) -> pd.DataFrame:

    # check level is valid 

    if level not in ['county', 'tract', 'blockgroup']:
        raise ValueError('Invalid level')
    

    """Curates the GEOID References from the geographies grab"""

    #creating a dictionary to store the GEOID references
    if level == 'tract':
        geoid_dict = {
            'GEOID': [],
            'STATE': [],
            'COUNTY': [], 
            'TRACT' : []
        }        
    
    elif level == 'blockgroup':
        geoid_dict = {
            'GEOID': [],
            'STATE': [],
            'COUNTY': [], 
            'TRACT' : [], 
            'BLOCK_GROUP': []
        }
    elif level == 'county':
        geoid_dict = {
            'GEOID': [],
            'STATE': [],
            'COUNTY': []
        }

    # replace unessecary strings with blank
    
    replace_str = ['Summary level: 150', 'state', 'county', 'tract', 'block group', ' ', ':', ',']

    # iterate throguh items of returned geogeographies from us census api 
    for item in geographies.items():
        # get the second item in the tuple
        geoid = item[1]
        # remove strings in replace_str list in string
        geoid = re.sub('|'.join(replace_str), '', str(geoid))
        # split by > to get seperated census ids 
        geoids = str(geoid).split('>')
        # append to dictionary lists for each census id 
        geoid_dict['STATE'].append(geoids[0])
        geoid_dict['COUNTY'].append(geoids[1])

        # if county is select then geoid is county + state
        if level == 'county':
            geoid_dict['GEOID'].append(geoids[0] + geoids[1])
        
        # if tract is level then append tract and GEOID is county + state + tract
        elif level == 'tract':
            geoid_dict['TRACT'].append(geoids[2])
            geoid_dict['GEOID'].append(geoids[0] + geoids[1] + geoids[2])
        
        # if block group is level then append block group and GEOID is county + state + tract + block group
        elif level == 'blockgroup':
            geoid_dict['TRACT'].append(geoids[2])
            geoid_dict['BLOCK_GROUP'].append(geoids[3])
            geoid_dict['GEOID'].append(geoids[0] + geoids[1] + geoids[2] + geoids[3])
    
    geoid_df = pd.DataFrame.from_dict(geoid_dict)

    geoid_df.set_index('GEOID', inplace=True)

    return geoid_df

    
def geoid_from_df(df: pd.DataFrame, level:str) -> pd.DataFrame:

    "Return the GEOID from the dataframe that is returned from the ACS calls"
    
    if level not in ['county', 'tract', 'blockgroup']:
        raise ValueError('Invalid level')
    
    # reset index so we can handle the census object that is returned for the id column
    df = df.reset_index()

    df = df.rename(columns={"index": "id"})
    
    geoid = df[['id']].astype(str)

    # Get split columns on colon

    geoid = geoid['id'].str.split(":",expand=True)

    if level == 'county':
        columns = ['STATE', 'COUNTY']
        parse_nbr = [3,4]
    elif level == 'tract':
        columns = ['STATE', 'COUNTY', 'TRACT']
        parse_nbr = [3,4,5]
    elif level == 'blockgroup':
        columns = ['STATE', 'COUNTY', 'TRACT', 'BLOCK_GROUP']
        parse_nbr = [3,4,5,6]
    

    # get all of the census id columns from the split
    geoid = geoid[parse_nbr]
    
    # force set the geoid columns by position
    geoid.columns = columns
    
    # add all columns together to get GEOID based on level

    if level == 'county':
        geoid['GEOID'] = geoid['STATE'] + geoid['COUNTY']
    elif level == 'tract':
        geoid['GEOID'] = geoid['STATE'] + geoid['COUNTY'] + geoid['TRACT']
    elif level == 'blockgroup':
        geoid['GEOID'] = geoid['STATE'] + geoid['COUNTY'] + geoid['TRACT'] + geoid['BLOCK_GROUP']

    # Apply a replacement to all columns to obtain cleaned geoid 

    geoid['GEOID'] = geoid['GEOID'].apply(lambda s: re.sub('|'.join(['> block group', '> tract', '> county']), '', str(s)))

    # drop the unessecary columns

    geoid = geoid.drop(columns=columns)
    
    # concat new columns and passed dataframe

    df = pd.concat([df,geoid], axis=1)

    # drop id columns and set GEOID index so concat works

    df.drop(columns=['id'], inplace=True)

    df.reset_index()

    df.set_index('GEOID', inplace=True) 


    return df


def get_acs_data(acs_data_dict: dict, state_code:str, year:int, level:str, acs_type:str, key=None) -> pd.DataFrame:

    """This function will return a dataframe with the ACS data for each chunk"""


    if acs_type not in ['acs5', 'acs1']:
        raise ValueError('Invalid acs type')

    if level not in ['county', 'tract', 'blockgroup']:
        raise ValueError('Invalid level')

    if year not in [2010, 2011, 2013,2014, 2015, 2016, 2017, 2018, 2019, 2020]:
        raise ValueError('Invalid year')


    def _chunked(dd, size):
        
        """A generator to break the passed dictionary into chunks that can be iterated over"""

        it = iter(dd)
        while True:
            p = tuple(itertools.islice(it, size))
            if not p:
                break
            yield p

    # get the chunk size from the acs_dd dictionary

    chunks = int(len(list(acs_data_dict.items()))/10)


    if level == 'county':
        census = censusdata.censusgeo([('state', state_code), ('county', '*')])
    elif level == 'tract':
        census = censusdata.censusgeo([('state', state_code), ('county', '*'), ('tract', '*')])
    elif level == 'blockgroup':
        census = censusdata.censusgeo([('state', state_code), ('county', '*'), ('tract', '*'), ('block group', '*')])

    if key is None:
    # get geoids for pass in state code
        boundaries = censusdata.geographies(census, 'acs5', year)
    else:
        boundaries = censusdata.geographies(census, 'acs5', year, key=key)

    # curate reference and dataframe to concat on as we iterate through the chunks 

    geoid_df = get_geoid_reference_df(boundaries, level)

    # using reference curate the dataframe and assign the GEOID index to concat on

    census_df_list = []

    # iterate over the dictionary in chunks
    for chunk in _chunked(acs_dd, chunks):

        # for var in chunk:
        if key is None:
        #Get the census data from the API
            census_data_df = censusdata.download(acs_type, year, census, list(chunk)).rename(columns=acs_dd)
        else:
            census_data_df = censusdata.download(acs_type, year, census,  list(chunk), key).rename(columns=acs_dd)

        #Pulling out the tract number from the index
        acs_df = geoid_from_df(census_data_df, level)

        census_df_list.append(acs_df)
        # acs_df = None

    census_df = pd.concat([geoid_df, pd.concat(census_df_list, axis=1)], axis=1)

    census_df['year'] = year

    census_df['level'] = level

   # concat all the dataframes together 
    return census_df


def get_aggregated_acs_data(acs_dd:dict, st_fips_df:pd.DataFrame, year:int, level:str, acs_type:str, key:str) -> pd.DataFrame:
    
    """This function will return a dataframe with the ACS data for each chunk"""

    if level not in ['county', 'tract', 'blockgroup', 'all']:
        raise ValueError('Invalid level')


    try:
    # get the state codes from the state fips csv file
        state_codes = st_fips['STATE']
    except:
        raise ValueError('State code column not found, make sure state code column in CSV is labeled as STATE')
        sys.exit(1)

        

    # create an empty list so we can append all of the 50 state data to it 
    
    tract_lst = []
    block_group_lst = []
    county_lst = []
    any_lst = []

    # iterate over the state codes and get the ACS data for each state
    for state in state_codes:
        if level == 'all':
        # get the ACS data for each state
            raw_df = get_acs_data(acs_dd, state, year, 'county', acs_type, key)
            raw_df = get_acs_data(acs_dd, state, year, 'tract', acs_type, key)
            raw_df = get_acs_data(acs_dd, state, year, 'blockgroup', acs_type, key)
            tract_lst.append(raw_df)
            block_group_lst.append(raw_df)
            county_lst.append(raw_df)
        else:    
            # append the ACS data for each state to the empty list
            raw_df = get_acs_data(acs_dd, state, year, level, acs_type, key)
            any_lst.append(raw_df)


    if level == 'all':
        #concat seperate dataframes together
        tract_df = pd.concat(tract_lst, axis=0)
        block_group_df = pd.concat(block_group_lst, axis=0)
        county_df = pd.concat(county_lst, axis=0)
        return tract_df, block_group_df, county_df

    else:
        # concat all the dataframes together 
        any_df = pd.concat(any_lst, axis=0)

        return any_df
    
    # write the ACS data to a csv file

In [3]:
# set absolute path to join on for reading and writing
abspath = os.path.dirname(os.path.normpath(os.path.abspath(os.path.dirname(''))))

# read in ACS data to obtain keys and formatted column names from source config folder
acs_dd = json.load(open(os.path.join(abspath, 'src', 'refs', 'acs_dd.json')))

# open state fips codes csv file in pandas
st_fips = pd.read_csv(os.path.join(abspath, 'src', 'refs', 'state_fips.csv'), dtype=str)

# get api key from config json file

config = json.load(open(os.path.join(abspath, 'config.json')))

key = config['key'][0]

todays_date = date.today()

# year = todays_date.year

year = 2019 # hard coded year to get since 2019 is the only thing that is available 

state_code = '37'

key = '79d3d777f9c930f33a654446bf2e40f425c9b247'

# raw_df = get_acs_data(acs_dd, state_code, year, 'blockgroup', 'acs5', key)

# raw_df.to_csv(os.path.join(abspath, 'data', 'raw', f'acs_{year}_raw.csv'))

all_50_grp_df = get_aggregated_acs_data(acs_dd, st_fips, year, 'blockgroup', 'acs5', key)

all_50_grp_df.to_csv(os.path.join(abspath, 'data', 'raw', f'acs_all_50_{year}_raw.csv'))
