In [1]:
import numpy as np
import pandas as pd
import urllib
import pyodbc
from pathlib import Path
import requests
import zipfile
import sqlalchemy

In [2]:
conn_string = "DRIVER={ODBC Driver 17 for SQL Server}; SERVER=AWS-PROD-SQL\Sockeye; DATABASE=Elmer; trusted_connection=yes"
sql_conn = pyodbc.connect(conn_string)
params = urllib.parse.quote_plus(conn_string)
engine = sqlalchemy.create_engine("mssql+pyodbc:///?odbc_connect=%s" % params)

In [3]:
data_dir = Path('..\\..\\data\\PUMS\\')
year = 2019
pums_csv_name_persons = 'psam_p53.csv'
pums_csv_name_housing = 'hsam_p53.csv'

In [4]:
pums_base_url = 'https://www2.census.gov/programs-surveys/acs/data/pums/{}/1-Year/'.format(str(year))
pums_doc_url = 'https://www2.census.gov/programs-surveys/acs/tech_docs/pums/data_dict/PUMS_Data_Dictionary_2019.csv'

In [5]:
person_zip = 'csv_pwa.zip'
pums_person_url = pums_base_url + person_zip
pums_person_file = data_dir / person_zip
housing_zip = 'csv_hwa.zip'
pums_housing_url = pums_base_url + housing_zip
pums_housing_file = data_dir / housing_zip

In [6]:
r=requests.get(pums_person_url)
with open(pums_person_file, 'wb') as f:
    f.write(r.content)
with zipfile.ZipFile(pums_person_file, 'r') as zip_ref:
    zip_ref.extractall(data_dir)  

In [12]:
r=requests.get(pums_housing_url)
with open(pums_housing_file, 'wb') as f:
    f.write(r.content)
with zipfile.ZipFile(pums_housing_file, 'r') as zip_ref:
    zip_ref.extractall(data_dir)   

In [7]:
data_dict_cols = ['dd_type','colname','datatype','length', 'col_e','col_f','col_g']
df_dd = pd.read_csv(pums_doc_url, header=None, names=data_dict_cols)

In [8]:
df_colnames = df_dd[df_dd.dd_type =='NAME']
df_vals = df_dd[df_dd.dd_type == 'VAL']

In [130]:
person_csv_path = data_dir / 'psam_p53.csv'
df_persons = pd.read_csv(person_csv_path, dtype=object)

In [13]:
housing_csv_path = data_dir / 'psam_h53.csv'
df_housing = pd.read_csv(housing_csv_path, dtype=object)

In [14]:
def lookup_df(pums_col):
    df = df_vals[df_vals.colname == pums_col][['colname','datatype','length','col_e','col_f','col_g']]
    return df

In [15]:
def data_type(pums_col):
    data_type = df_colnames[df_colnames.colname == pums_col].datatype.unique()[0]
    range_cols = df_vals[df_vals.col_e != df_vals.col_f].colname.unique()
    if data_type == 'C' and pums_col not in range_cols:
        returned_data_type = 'char_lookup'
    else:
        returned_data_type = data_type
    return returned_data_type

In [16]:
def decode_char_column(pums_df, col_name):
    df_temp = lookup_df(col_name)
    #df_temp = df_housing.merge(df_access, how='left', left_on='ACCESS', right_on='col_e')
    pums_df[col_name] = pums_df.merge(df_temp, how='left', left_on=col_name, right_on='col_e')[['col_g']]
    return pums_df

In [17]:
def decode_columns(pums_df):
    working_df = pums_df.copy(deep=True)
    for col in working_df.columns:
        col_type = data_type(col)
        if col_type == 'char_lookup':
            working_df_df = decode_char_column(working_df, col)  
    return working_df

In [22]:
df_housing_decoded = decode_columns(df_housing)

In [247]:
df_persons_decoded = decode_columns(df_persons)

In [18]:
def castable(df, col_name, dtype):
    try:
        df[col_name].astype(dtype)
        return True
    
    except:
        return False

In [19]:
def get_stage_table_col_types(pums_df):
    col_types = {}
    for c in pums_df.columns:
        if castable(pums_df, c, np.int16):
            dtype = np.int16
        elif castable(pums_df, c, np.int32):
            dtype = np.int32
        elif castable(pums_df, c, np.int64):
            dtype = np.int64
        elif castable(pums_df, c, np.float64 ):
            dtype = np.float64
        else:
            dtype = object
        col_types[c] = dtype
    return col_types  

In [20]:
def recast_coltypes(df):
    new_dtypes = get_stage_table_col_types(df)
    for col in new_dtypes.keys():
        df[col] = df[col].astype(new_dtypes[col])

RT           object
SERIALNO     object
DIVISION     object
SPORDER       int16
PUMA          int16
REGION       object
ST           object
ADJINC       object
PWGTP         int16
AGEP          int16
CIT          object
CITWP       float64
COW          object
DDRS         object
DEAR         object
DEYE         object
DOUT         object
DPHY         object
DRAT         object
DRATX        object
DREM         object
ENG          object
FER          object
GCL          object
GCM          object
GCR          object
HIMRKS       object
HINS1        object
HINS2        object
HINS3        object
             ...   
PWGTP51       int16
PWGTP52       int16
PWGTP53       int16
PWGTP54       int16
PWGTP55       int16
PWGTP56       int16
PWGTP57       int16
PWGTP58       int16
PWGTP59       int16
PWGTP60       int16
PWGTP61       int16
PWGTP62       int16
PWGTP63       int16
PWGTP64       int16
PWGTP65       int16
PWGTP66       int16
PWGTP67       int16
PWGTP68       int16
PWGTP69       int16


In [21]:
def df_to_staging(df, table_name):
    recast_coltypes(df)
    df.to_sql(name=table_name, schema='stg', con=engine)

In [303]:
df_to_staging(df_persons_decoded, 'psam_p53_2019')

In [23]:
df_to_staging(df_housing_decoded, 'psam_h53_2019')

In [None]:
crsr = sql_conn.cursor()
crsr.execute('exec census.merge_pums_persons_2019')
sql_conn.commit()

In [24]:
df_colnames.to_sql(name='pums_persons_dd', schema='stg', con=engine)

ValueError: Table 'pums_persons_dd' already exists.

In [25]:
crsr = sql_conn.cursor()
sql_statement = '''
	with dd as (
		select distinct colname, col_e as col_definition
		from stg.pums_persons_dd
	)
	update meta.[columns]
	set [description] = dd.col_definition
	from meta.[columns] c
		join meta.[tables] t ON c.table_id = t.table_id
		join dd on c.[name] = dd.colname
	where t.[name] = 'pums_persons_2019'
'''
crsr.execute(sql_statement)
sql_conn.commit()