In [149]:
"""
ACS Collection
Author: Dominic Ridley
"""
from builtins import any as b_any
import sphinx
import pandas as pd
pd.set_option("display.max_rows", None)
import requests
import pickle
import psycopg2
import sys
from psycopg2 import OperationalError, errorcodes, errors
from IPython.display import display 
import time
import numpy as np
import acs_functions
from config import config

In [136]:
class CensusAPI:
    """
    Contains methods for accessing Census API, loading geo ids of municipalities and retrieving Census tables.
    """
    
    def __init__(self, year): #
        """ Intializes API parameter variables """
        
        self.api_key = '766973dcdc26460a63ee43b8bfed1d1c4692486a'
        self.call = 'https://api.census.gov/data/2018/acs/acs5?get=group({4})&' \
                        + 'for=county%20subdivision:{2}&in=state:{0}%20county:{1}&key={3}'
        self.year = year
        
        """ Loads the census variables used for each table """
        self.ids_dict = pickle.load(open('table_to_censusid.p', "rb"))
        self.cols_dict = pickle.load(open('cols_ids_dict.p', "rb"))
        self.col_uni_dict = pickle.load(open('p_cols_dict.p', 'rb'))
        self.column_ids = pd.read_csv('column_ids.csv', index_col=[0,1], skipinitialspace=True)
        self.column_ids['ids'] = self.column_ids['ids'].apply(lambda x: set(str(x).zfill(3).replace(' ', '').split(',')))
        
    def load_table_censusvars(self, table):
        """ Loads the census variables used for table """
        
        dict_ = c.ids_dict[table]
        ids, names = list(map(list, zip(*dict_)))
        return ids, names
        
    def load_geoids(self):
        """ Loads geoids for geographical level (municipality, tract, block)"""
        
        full_table = pd.read_excel('K:\DataServices\Datasets\Data Keys\Census_MuniName_County_State_ID.xls', \
                                   dtype={'GEOID': str})
        return full_table[['GEOID', 'MUNI_ID', 'MUNI']]
    
    def connect_database(self, server):
        """ Connect to the PostgreSQL database server """
        
        conn = None
        cur = None
        try:
            # read connection parameters
            params = config(section=server)

            # connect to the PostgreSQL server
            #print('Connecting to the PostgreSQL database...')
            conn = psycopg2.connect(**params)
            print("Connected to " + params['database'])
            # create a cursor
            cur = conn.cursor()

        except (Exception, psycopg2.DatabaseError) as error:
            print(error)

        return conn, cur
    
    def call_api(self, table, group='B01001'):
        """ Calls Census API by table and group """
        
        #Loads GEOIDS to map to municipalities
        geoid_table = self.load_geoids()

        #Creates set of county codes to drop any duplicate values
        counties = set([str(x)[2:5] for x in geoid_table['GEOID']])
        
        #Intializes empty dataframe
        out_df = pd.DataFrame()
        
        col_names = []
        
        #Iterates through the counties, calls census api for pop. estimates and appends to dataframe
        for i in counties:
            
            #Calls Census API with state code: 25, county: i, subdivisions: *global and groups
            request_url = self.call.format('25', i, '*', self.api_key, group)
            out = requests.get(request_url).json()
            
            if out_df.empty:
                out_df = out_df.append(out[1:], ignore_index=True)
                col_names = out[0]
            else:
                out_df = out_df.append(out[1:], ignore_index=True)

        
        out_df.columns = col_names
        
        #display(out_df)
        e_m_columns = [i[-4:] for i in out_df.columns if (i[-1] in ['E', 'M']) & (i not in ['NAME'])]
        cols_to_rename = [i for i in out_df.columns if (i[-1] in ['E', 'M']) & (i not in ['NAME'])]
        
        out_df = out_df.rename(columns=dict(zip(cols_to_rename, e_m_columns)))
        out_df = out_df[['NAME', 'GEO_ID']+e_m_columns]
        
        out_df = out_df.rename(columns={'GEO_ID': 'GEOID'})
        
        out_df['GEOID'] = out_df['GEOID'].apply(lambda x: x[-10:])
        
        final_df = out_df.copy()
        
        
        #Joins tables on 'GEOID' to get municipality names
        final_df = final_df.merge(geoid_table, on='GEOID', how='inner')
        final_df = final_df[['MUNI_ID', 'MUNI', 'NAME']+e_m_columns].sort_values('MUNI_ID')
        return final_df

In [119]:
column_ids = pd.read_csv('column_ids.csv', index_col=[0,1], skipinitialspace=True)

In [138]:
ids = c.column_ids.loc['b01001_population_by_age_gender_acs_m'].to_dict(orient='index')
[idx + 'E' for idx in ids['pop']['ids']]


['001E']

In [121]:
column_ids['ids'] = column_ids['ids'].apply(lambda x: set(str(x).zfill(3).replace(' ', '').split(',')))

In [122]:
column_ids.loc['b01001_population_by_age_gender_acs_m']

Unnamed: 0,ids,moe,percent
pop,{001},False,False
popm,{001},True,False
pop_u5,"{003, 027}",False,False
pop_u5m,"{003, 027}",True,False
pop_5_9,"{028, 004}",False,False
pop_5_9m,"{028, 004}",True,False
pop1014,"{005, 029}",False,False
pop1014m,"{005, 029}",True,False
pop1517,"{030, 006}",False,False
pop1517m,"{030, 006}",True,False


In [137]:
c = CensusAPI('2018')

In [67]:
conn, cur = c.connect_database('sdvm')

Connected to ds


In [68]:
cur.callproc('acs_est_pct', ([1], [1]))
cur.fetchall()

[(Decimal('100.00'),)]

In [156]:
hello = c.call_api('b01001_population_by_age_gender_acs_m')

In [160]:
pickle.dump(hello, open('test_table.p', 'wb'))

In [159]:
hello.iloc[:, 3:] = hello.iloc[:, 3:].applymap(lambda x: float(x)).clip(lower=0)

In [174]:
hello.loc[144, :]

MUNI_ID                                               204
MUNI                                            NEW SALEM
NAME       New Salem town, Franklin County, Massachusetts
001E                                                 1036
001M                                                  109
002E                                                  591
002M                                                   76
003E                                                   55
003M                                                   32
004E                                                   28
004M                                                   13
005E                                                   31
005M                                                   13
006E                                                   14
006M                                                   10
007E                                                   21
007M                                                   17
008E          

In [132]:
c.column_ids.loc['b01001_population_by_age_gender_acs_m']

Unnamed: 0,ids,moe,percent
pop,{{'001'}},False,False
popm,{{'001'}},True,False
pop_u5,"{{'003', '027'}}",False,False
pop_u5m,"{{'003', '027'}}",True,False
pop_5_9,"{{'028', '004'}}",False,False
pop_5_9m,"{{'028', '004'}}",True,False
pop1014,"{'005'}, {'029'}",False,False
pop1014m,"{'005'}, {'029'}",True,False
pop1517,"{'030'}, {'006'}",False,False
pop1517m,"{'030'}, {'006'}",True,False


In [72]:
#ids, names = c.load_table_censusvars('b01001_population_by_age_gender_acs_m')
keys = list(c.cols_dict['b01001_population_by_age_gender_acs_m'].keys())


In [73]:
#list(c.cols_dict[tbl_name].keys())

In [155]:
class WriteTables:
    
    def __init__(self, year='2018'):
        self.year = year
        c = CensusAPI(year)
        self.con, self.cur = c.connect_database('sdvm')
        
    
    def initializeTable(self, tbl_name):
        
        #Grabs the column ids for the specified table
        tbl_dict = column_ids.loc[tbl_name].to_dict(orient='index')
        keys = list(tbl_dict.keys())
        
        default_cols = ['muni_id']
        
        #Initializes final table dataframe with final columns and no rows
        final_table = pd.DataFrame(columns=default_cols + keys)
        
        return final_table
    
    def mapCategories(self, row):
    """ 
        Maps the moe:bool, percent:bool properties of table columns to their category 
        (estimate, moe, percent or moe percent)

        Returns: string
    """
        
        #Gets ['moe', 'percent'] as row
        tpl = tuple(row)
        
        
        if tpl == (False, False): #moe: false, percent: false
            return 'est'
        
        elif tpl == (False, True): #moe: false, percent: true
            return 'pct'
        
        elif tpl == (True, False): #moe: true, percent: false
            return 'moe'
        
        elif tpl == (True, True): #moe: true, percent: true
            return 'moe_pct'
        
        else:
            return None
        

    def populateFullTable(self, tbl_name):
        
        final_table = self.initializeTable(tbl_name)
        
        #Grabs the muni uni, muni names and geoids
        init_table = c.load_geoids()
        
        #Populates raw data from Census Subject Tables
        census_table = c.call_api(tbl_name)
        
        #Convert values to float and also set a lower bound at 0
        census_table.iloc[:, 3:] = census_table.iloc[:, 3:].applymap(lambda x: float(x)).clip(lower=0)
        
        #List of census ids that will be used in the final table
        ids = c.column_ids.loc[tbl_name].to_dict(orient='index')
        
        #List of universe ids for percentage calculations
        col_uni = c.col_uni_dict[tbl_name]
        
        #List of columns with errors
        error_cols = set()
        
        mapped_cols = c.column_ids.loc[tbl_name][['moe', 'percent']].apply(self.mapCategories, axis=1)

        est_cols = mapped_cols[(mapped_cols == 'est')].index.values
        pct_cols = mapped_cols[(mapped_cols == 'pct')].index.values
        moe_cols = mapped_cols[(mapped_cols == 'moe')].index.values
        moe_pct_cols = mapped_cols[(mapped_cols == 'moe_pct')].index.values
        
        
        for i, row in census_table.iterrows(): #Iterate every municipality
            
            muni_id = row.index
            final_table.loc[i, 'muni_id'] = row['MUNI_ID']
            
            #Initialize sets for est, est_pct, moe_est, moe_est_pct
            est, est_pct, moe_est, moe_est_pct = set(), set(), set(), set()
            est_uni = set()
            visited_cols = set()


            est_df = c.column_ids.loc[tbl_name].loc[est_cols]
            for col, est_row in est_df.iterrows():
                
                if col in visited_cols: #Checks if value has already been inserted for column
                    continue
                #print(col)   
                
                try:
                    final_table, visited_cols = self.populateEst(i, col, tbl_name, census_table, final_table, visited_cols)
                    
                except Exception as ex:
                    error_cols.add(col)
                    #print(ex)
                    #display(final_table)
                    continue
                    
                
            pct_df = c.column_ids.loc[tbl_name].loc[pct_cols]
            for col, pct_row in pct_df.iterrows():
                
                if col in visited_cols: #Checks if value has already been inserted for column
                    continue
                    
                try:
                    final_table, visited_cols = self.populateEstP(i, col, tbl_name, census_table, final_table, visited_cols)
                        
                except Exception as ex:
                    error_cols.add(col)

                    #display(final_table)
                    continue
                    #return
            
            moe_df = c.column_ids.loc[tbl_name].loc[moe_cols]
            for col, moe_row in moe_df.iterrows():
                
                if col in visited_cols: #Checks if value has already been inserted for column
                    continue
                    
                try:
                    final_table, visited_cols = self.populateMoe(i, col, tbl_name, census_table, final_table, visited_cols)
                        
                except Exception as ex:
                    error_cols.add(col)

                    #display(final_table)
                    continue
                    #return
            
            moe_pct_df = c.column_ids.loc[tbl_name].loc[moe_pct_cols]
            for col, moe_pct_row in moe_pct_df.iterrows():
                
                if col in visited_cols: #Checks if value has already been inserted for column
                    continue
                    
                try:
                    final_table, visited_cols = self.populateMoeP(i, col, tbl_name, census_table, final_table, visited_cols)
                        
                except Exception as ex:
                    error_cols.add(col)

                    #display(final_table)
                    continue
                    #return
                    
            
        
        self.con.close()   
        final_table = final_table.set_index([final_table['muni_id'] - 1])
        #print("Error found at these columns:" + " ".join(list(error_cols)))
        #print(list(error_cols))
        
        return final_table
    
    def populateEst(self, idx, col, tbl_name, census_table, final_table, visited_cols):
        
        #List of census ids that will be used in the final table
        ids = c.column_ids.loc[tbl_name].to_dict(orient='index')

        
        #Append 'E' to find it in the Census dataframe 
        ids_ = [idx + 'E' for idx in ids[col]['ids']]


        #Find the sum if there's multiple values
        #vals = sum([int(census_table.loc[idx, k]) for k in ids_])

        #Add value to table
        #final_table.loc[idx, col] = vals
        final_table[col] = acs_functions.acs_est
        #Updates the visited columns set
        visited_cols.update(col)
        
        return final_table, visited_cols
        
    def populateEstP(self, idx, col, tbl_name, census_table, final_table, visited_cols):
        
        #List of universe ids for percentage calculations
        col_uni = c.col_uni_dict[tbl_name]
        
        
        
        #Append 'E' (for estimate) to find it in the Census dataframe 
        estimate = [e + 'E' for e in col_uni[col][0]['estimate']]
        universe = [u + 'E' for u in col_uni[col][1]['universe']]

        #Find the sum if there's multiple values
        est_vals = sum([int(census_table.loc[idx, k]) for k in estimate])
        uni_vals = sum([int(census_table.loc[idx, k]) for k in universe])
        
        #Calls the Postgres procedure 'acs_est_pct' and return the result
        self.cur.callproc('acs_est_pct', ([est_vals], [uni_vals]))
        val_out = self.cur.fetchone()

        #Updates value in final table
        final_table.loc[idx, col] = val_out[0]
        
        #Updates the visited columns set
        visited_cols.update(col)
        
        return final_table, visited_cols
            

    
    def populateMoe(self, idx, col, tbl_name, census_table, final_table, visited_cols):
        
        #List of census ids that will be used in the final table
        ids = c.column_ids.loc[tbl_name].to_dict(orient='index')
        
        #Append 'E' or 'M' to find the estimate or moe values in the Census dataframe, respectively
        est_ids = [idx + 'E' for idx in ids[col]['ids']]
        moe_ids = [idx + 'M' for idx in ids[col]['ids']]

        est_vals = [int(census_table.loc[idx, k]) for k in est_ids]
        moe_vals = [int(census_table.loc[idx, k]) for k in moe_ids]


        self.cur.callproc('acs_moe', (est_vals, moe_vals))

        val_out = self.cur.fetchone()
        
        #Add value to table
        final_table.loc[idx, col] = val_out[0]
        visited_cols.update(col)
        
        return final_table, visited_cols

    
    def populateMoeP(self, idx, col, tbl_name, census_table, final_table, visited_cols):
        
        #List of universe ids for percentage calculations
        col_uni = c.col_uni_dict[tbl_name]
        
        col_uni_key = col[:-2] + 'p'
        estimate = [e + 'E' for e in col_uni[col_uni_key][0]['estimate']]
        universe = [u + 'E' for u in col_uni[col_uni_key][1]['universe']]

        moe_est = [e + 'M' for e in col_uni[col_uni_key][0]['estimate']]
        moe_uni = [u + 'M' for u in col_uni[col_uni_key][1]['universe']]

        est_vals = [int(census_table.loc[idx, k]) for k in estimate]
        uni_vals = [int(census_table.loc[idx, k]) for k in universe]

        moe_vals = [int(census_table.loc[idx, k]) for k in moe_est]
        moe_uni_vals = [int(census_table.loc[idx, k]) for k in moe_uni]

        self.cur.callproc('acs_moe_pct', (est_vals, uni_vals, moe_vals, moe_uni_vals))
        val_out = self.cur.fetchone()

        final_table.loc[idx, col] = val_out[0]
        visited_cols.update(col)

        return final_table, visited_cols 
    
    def compareTables(self, tbl_name, df_in):
        
        #Connect to Postgres database 
        self.con, self.cur = c.connect_database('sdvm')
        
        #Get columns at a string list
        keys = list(c.cols_dict[tbl_name].keys())
        cols = ", ".join(keys).replace('mpop5559mmp', 'mpop5559mp')
        
        cols = 'muni_id, ' + cols
        
        query = \
        """
        SELECT {1} from tabular.{0} WHERE acs_year = '2014-18';
        """
        #Get table from Postgres as pandas dataframe
        pg_df = pd.read_sql(query.format(tbl_name, cols), self.con, coerce_float=True)
        pg_df.index = pg_df['muni_id'] - 1
        
        #Casts all values as floats except for the GEO_ID column
        df_in.iloc[:, 1:] = df_in.iloc[:, 1:].astype('float', errors='ignore')
        
        #Removes municipalities from return Postres dataframe that aren't in the comparison dataframe
        pg_df = pg_df[(pg_df['muni_id'].isin(df_in['muni_id'].values))]

        #Ensures that the column names match in both tables
        pg_df = pg_df.reindex(columns=df_in.columns.values)
        
        #Creates comparison table and returns it with the PG dataframe
        comparison_table = pg_df.compare(df_in, align_axis=0)
        
        return pg_df, comparison_table


IndentationError: expected an indented block (<ipython-input-155-0bc61189b20b>, line 23)

In [148]:
p = WriteTables()

Connected to ds


In [150]:
start = time.time()
df = p.populateFullTable('b01001_population_by_age_gender_acs_m')
end = time.time()
print(end-start)

430.69747972488403


In [154]:
len(df.columns)

382

In [152]:
pg_df, comparison_table = p.compareTables('b01001_population_by_age_gender_acs_m', df)

Connected to ds


In [197]:
pg_df.loc[0, 'pop2534mp']

2.11

In [168]:
df[['pop8084mp']][:10], pg_df[['pop8084mp']][:10]

(        pop8084mp
 muni_id          
 0            0.37
 1            0.32
 2            0.62
 3            0.86
 4            0.79
 5            1.88
 6            0.64
 7            0.28
 8            0.45
 9            0.42,
          pop8084mp
 muni_id           
 0             0.59
 1             0.63
 2             0.77
 3             1.50
 4             1.53
 5             2.46
 6             1.70
 7             0.60
 8             1.15
 9             1.36)

In [153]:
comparison_table

Unnamed: 0_level_0,Unnamed: 1_level_0,pop20m,pop2224m,pop85om,pop1824m,pop3564m,pop65ovm,pop1834m,fpop_u18m,fpop1824m,fpop65ovm,fpop1834m,mpop1824m,mpop3564m,mpop65ovm,mpop1834m,pop8084mp,mpop7579mp
muni_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1
0,self,,,,,,,,,,,,,,,,0.59,0.42
0,other,,,,,,,,,,,,,,,,0.37,
1,self,,,,,,,,,,,,,,,,0.63,0.51
1,other,,,,,,,,,,,,,,,,0.32,
2,self,,,,,,,,,,,,,,,,0.77,0.78
2,other,,,,,,,,,,,,,,,,0.62,
3,self,,,,,,,,,,,,,,,,1.5,0.46
3,other,,,,,,,,,,,,,,,,0.86,
4,self,,,,,,,,,,,,,,,,1.53,0.46
4,other,,,,,,,,,,,,,,,,0.79,


error_dict = {}
cols = comparison_table.columns

for idx, row in comparison_table.iterrows():
    #print(idx)
    #break
    error_dict[str(idx)] = [i for i in cols if not np.isnan(comparison_table[i].loc[idx])]

error_dict
#[i for i in cols if not np.isnan(list(comparison_table[i].values)).all()]

# Appendix

## class: CensusAPI 
    - Initializes the Census API parameters
    - Gets Census ids that correspond to different columns names
   ### functions:
   ### variables:
    
## class: WriteTables
   ### functions:
   ### variables:
