In [1]:
# for database connections
from sqlalchemy import create_engine, inspect, text, event

# data
import pandas as pd

# Postgres credentials
import config as cfg

# Error logging
import logging

In [2]:
# Setup the SQL connection variables
SQL_USERNAME = cfg.SQL_USERNAME
SQL_PASSWORD = cfg.SQL_PASSWORD
SQL_IP = cfg.SQL_IP
SQL_PORT = cfg.SQL_PORT
DATABASE = cfg.DATABASE

In [3]:
# Configure logging
logging.basicConfig()
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)

In [4]:
# Connect to PostgreSQL server
connection_string = f'postgresql+psycopg2://{SQL_USERNAME}:{SQL_PASSWORD}@{SQL_IP}:{SQL_PORT}/{DATABASE}'
engine = create_engine(connection_string)

## upload campaign, category, subcategory and contact tables

In [5]:
# Check to see if the category and subcategory tables are in the database
inspector = inspect(engine)
print(inspector.get_table_names())

INFO:sqlalchemy.engine.Engine:select pg_catalog.version()
INFO:sqlalchemy.engine.Engine:[raw sql] {}
INFO:sqlalchemy.engine.Engine:select current_schema()
INFO:sqlalchemy.engine.Engine:[raw sql] {}
INFO:sqlalchemy.engine.Engine:show standard_conforming_strings
INFO:sqlalchemy.engine.Engine:[raw sql] {}
INFO:sqlalchemy.engine.Engine:BEGIN (implicit)
INFO:sqlalchemy.engine.Engine:SELECT pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s, %(param_2)s]) AND pg_catalog.pg_class.relpersistence != %(relpersistence_1)s AND pg_catalog.pg_table_is_visible(pg_catalog.pg_class.oid) AND pg_catalog.pg_namespace.nspname != %(nspname_1)s
INFO:sqlalchemy.engine.Engine:[generated in 0.00420s] {'param_1': 'r', 'param_2': 'p', 'relpersistence_1': 't', 'nspname_1': 'pg_catalog'}
INFO:sqlalchemy.engine.Engine:ROLLBACK


['income', 'state', 'jobs', 'unemployment', 'employment']


In [6]:
# Display all the columns of the tables in PostgreSQL to make sure they have the correct columns prior to loading the data

# Collect the names of tables within the database
tables = inspector.get_table_names()

# Using the inspector to print the column names within each table and its types
for table in tables:
    print(f"Table name: {table}")
    columns = inspector.get_columns(table)
    for column in columns:
        print(column["name"], column["type"])
        
    print()

INFO:sqlalchemy.engine.Engine:BEGIN (implicit)
INFO:sqlalchemy.engine.Engine:SELECT pg_catalog.pg_attribute.attname AS name, pg_catalog.format_type(pg_catalog.pg_attribute.atttypid, pg_catalog.pg_attribute.atttypmod) AS format_type, (SELECT pg_catalog.pg_get_expr(pg_catalog.pg_attrdef.adbin, pg_catalog.pg_attrdef.adrelid) AS pg_get_expr_1 
FROM pg_catalog.pg_attrdef 
WHERE pg_catalog.pg_attrdef.adrelid = pg_catalog.pg_attribute.attrelid AND pg_catalog.pg_attrdef.adnum = pg_catalog.pg_attribute.attnum AND pg_catalog.pg_attribute.atthasdef) AS "default", pg_catalog.pg_attribute.attnotnull AS not_null, pg_catalog.pg_class.relname AS table_name, pg_catalog.pg_description.description AS comment, pg_catalog.pg_attribute.attgenerated AS generated, (SELECT json_build_object(%(json_build_object_2)s, pg_catalog.pg_attribute.attidentity = %(attidentity_1)s, %(json_build_object_3)s, pg_catalog.pg_sequence.seqstart, %(json_build_object_4)s, pg_catalog.pg_sequence.seqincrement, %(json_build_object_5

Table name: income
fips VARCHAR(5)
econ_state VARCHAR(2)
county VARCHAR(40)
median_hh_inc_acs NUMERIC
percapita_inc NUMERIC
poverty_rate_0_17_acs NUMERIC
poverty_rate_acs NUMERIC
deep_pov_all NUMERIC
deep_pov_children NUMERIC
num_allinpov_acs NUMERIC
pct_pov_0_17 NUMERIC
pov_0_17 NUMERIC
med_hh_inc NUMERIC
pov_all NUMERIC
pct_pov_all NUMERIC
num_in_pov_0_17_acs NUMERIC
last_update TIMESTAMP

Table name: state
econ_state VARCHAR(2)
latitude DOUBLE PRECISION
longitude DOUBLE PRECISION
name VARCHAR(25)
last_update TIMESTAMP

Table name: jobs
fips VARCHAR(5)
econ_state VARCHAR(2)
county VARCHAR(40)
pctemp_agriculture NUMERIC
pctemp_mining NUMERIC
pctemp_construction NUMERIC
pctemp_manufacturing NUMERIC
pctemp_trade NUMERIC
pctemp_trans NUMERIC
pctemp_information NUMERIC
pctemp_fire NUMERIC
pctemp_services NUMERIC
pctemp_government NUMERIC
num_civ_employed NUMERIC
last_update TIMESTAMP

Table name: unemployment
id VARCHAR(8)
fips VARCHAR(5)
econ_year VARCHAR(4)
unemp_rate NUMERIC
num_unempl

In [7]:
# Open campaign csv file and read it into a pandas dataframe
state_df = pd.read_csv('Resources/statelatlong.csv')
state_df.head()

Unnamed: 0,econ_state,latitude,longitude,name
0,AK,63.588753,-154.493062,Alaska
1,AL,32.318231,-86.902298,Alabama
2,AR,35.20105,-91.831833,Arkansas
3,AZ,34.048928,-111.093731,Arizona
4,CA,36.778261,-119.417932,California


In [None]:
# Open category csv file and read it into a pandas dataframe
income_df = pd.read_csv('Resources/income_cleaned.csv')
income_df.head()

In [8]:
# Open campaign csv file and read it into a pandas dataframe
jobs_df = pd.read_csv('Resources/jobs_cleaned.csv')
jobs_df.head()

Unnamed: 0,fips,econ_state,county,pctemp_agriculture,pctemp_mining,pctemp_construction,pctemp_manufacturing,pctemp_trade,pctemp_trans,pctemp_information,pctemp_fire,pctemp_services,pctemp_government,num_civ_employed
0,0,US,United States,1.259202,0.512723,6.592262,10.108008,13.745334,5.363914,2.011223,6.55584,49.244129,4.607366,154842185
1,1000,AL,Alabama,0.99319,0.39821,6.60499,14.332569,14.083735,5.454652,1.519607,5.523166,45.678045,5.411837,2097384
2,1001,AL,Autauga,0.517902,0.354783,6.072099,12.951635,12.445967,6.797977,1.362042,5.978305,44.082864,9.436424,24522
3,1003,AL,Baldwin,0.952772,0.257648,8.58546,9.249035,16.4779,5.003628,1.525907,7.520165,45.203016,5.224469,95091
4,1005,AL,Barbour,5.717342,0.0,6.810888,23.047664,12.813503,6.632592,0.606205,3.720433,33.638417,7.012956,8413


In [17]:
# Open subcategory csv file and read it into a pandas dataframe
employment_df = pd.read_csv('Resources/employment_cleaned.csv')
employment_df.tail()

Unnamed: 0,id,fips,econ_year,num_civ_labor_force,num_employed
22941,45888,72145,2017,13414,11746
22942,45889,72147,2017,3027,2589
22943,45890,72149,2017,7557,6090
22944,45891,72151,2017,9000,7512
22945,45892,72153,2017,10160,8409


In [18]:
employment_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 22946 entries, 0 to 22945
Data columns (total 5 columns):
 #   Column               Non-Null Count  Dtype
---  ------               --------------  -----
 0   id                   22946 non-null  int64
 1   fips                 22946 non-null  int64
 2   econ_year            22946 non-null  int64
 3   num_civ_labor_force  22946 non-null  int64
 4   num_employed         22946 non-null  int64
dtypes: int64(5)
memory usage: 896.5 KB


In [19]:
employment_df['id'] = employment_df['id'].astype(str)
employment_df['fips'] = employment_df['fips'].astype(str)
employment_df['econ_year'] = employment_df['econ_year'].astype(str)

In [20]:
employment_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 22946 entries, 0 to 22945
Data columns (total 5 columns):
 #   Column               Non-Null Count  Dtype 
---  ------               --------------  ----- 
 0   id                   22946 non-null  object
 1   fips                 22946 non-null  object
 2   econ_year            22946 non-null  object
 3   num_civ_labor_force  22946 non-null  int64 
 4   num_employed         22946 non-null  int64 
dtypes: int64(2), object(3)
memory usage: 896.5+ KB


In [24]:
# Open contact csv file and read it into a pandas dataframe
unemployment_df = pd.read_csv('Resources/unemployment_cleaned.csv')
unemployment_df.head()

Unnamed: 0,id,fips,econ_year,unemp_rate,num_unemployed
0,1,0,2020,8.1,12933704
1,2,1000,2020,5.9,131056
2,3,1001,2020,4.9,1262
3,4,1003,2020,5.6,5425
4,5,1005,2020,7.0,605


In [25]:
unemployment_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 22946 entries, 0 to 22945
Data columns (total 5 columns):
 #   Column          Non-Null Count  Dtype  
---  ------          --------------  -----  
 0   id              22946 non-null  int64  
 1   fips            22946 non-null  int64  
 2   econ_year       22946 non-null  int64  
 3   unemp_rate      22946 non-null  float64
 4   num_unemployed  22946 non-null  int64  
dtypes: float64(1), int64(4)
memory usage: 896.5 KB


In [26]:
unemployment_df['id'] = unemployment_df['id'].astype(str)
unemployment_df['fips'] = unemployment_df['fips'].astype(str)
unemployment_df['econ_year'] = unemployment_df['econ_year'].astype(str)

In [27]:
unemployment_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 22946 entries, 0 to 22945
Data columns (total 5 columns):
 #   Column          Non-Null Count  Dtype  
---  ------          --------------  -----  
 0   id              22946 non-null  object 
 1   fips            22946 non-null  object 
 2   econ_year       22946 non-null  object 
 3   unemp_rate      22946 non-null  float64
 4   num_unemployed  22946 non-null  int64  
dtypes: float64(1), int64(1), object(3)
memory usage: 896.5+ KB


In [None]:
# If state table exists in the database, load the state data into the table
if 'state' in inspector.get_table_names():
    state_df.to_sql('state', schema='public', con=engine, index=False, if_exists='append', method='multi')

In [None]:
# If income table exists in the database, load the income data into the table
if 'income' in inspector.get_table_names():
    income_df.to_sql('income', schema='public', con=engine, index=False, if_exists='append', method='multi')

In [10]:
# If income table exists in the database, load the income data into the table
if 'jobs' in inspector.get_table_names():
    jobs_df.to_sql('jobs', schema='public', con=engine, index=False, if_exists='append', method='multi')

INFO:sqlalchemy.engine.Engine:BEGIN (implicit)
INFO:sqlalchemy.engine.Engine:SELECT pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relname = %(table_name)s AND pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, %(param_4)s, %(param_5)s]) AND pg_catalog.pg_namespace.nspname = %(nspname_1)s
INFO:sqlalchemy.engine.Engine:[generated in 0.00227s] {'table_name': 'jobs', 'param_1': 'r', 'param_2': 'p', 'param_3': 'f', 'param_4': 'v', 'param_5': 'm', 'nspname_1': 'public'}
INFO:sqlalchemy.engine.Engine:INSERT INTO public.jobs (fips, econ_state, county, pctemp_agriculture, pctemp_mining, pctemp_construction, pctemp_manufacturing, pctemp_trade, pctemp_trans, pctemp_information, pctemp_fire, pctemp_services, pctemp_government, num_civ_employed) VALUES (%(fips_m0)s, %(econ_state_m0)s, %(county_m0)s, %(pctemp_agriculture_m0)s, %(pctemp_mining_

In [21]:
# If employment table exists in the database, load the employment data into the table
if 'employment' in inspector.get_table_names():
    employment_df.to_sql('employment', schema='public', con=engine, index=False, if_exists='append', method='multi')

INFO:sqlalchemy.engine.Engine:BEGIN (implicit)
INFO:sqlalchemy.engine.Engine:SELECT pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relname = %(table_name)s AND pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, %(param_4)s, %(param_5)s]) AND pg_catalog.pg_namespace.nspname = %(nspname_1)s
INFO:sqlalchemy.engine.Engine:[cached since 217.4s ago] {'table_name': 'employment', 'param_1': 'r', 'param_2': 'p', 'param_3': 'f', 'param_4': 'v', 'param_5': 'm', 'nspname_1': 'public'}
INFO:sqlalchemy.engine.Engine:INSERT INTO public.employment (id, fips, econ_year, num_civ_labor_force, num_employed) VALUES (%(id_m0)s, %(fips_m0)s, %(econ_year_m0)s, %(num_civ_labor_force_m0)s, %(num_employed_m0)s), (%(id_m1)s, %(fips_m1)s, %(econ_year_m1)s, %(num_civ_labor_force_m1)s, %(num_employed_m1)s), (%(id_m2)s, %(fips_m2)s, %(econ_year_m2)s, %(num_civ_l

In [28]:
# If contact table exists in the database, load the contact data into the table
if 'unemployment' in inspector.get_table_names():
    unemployment_df.to_sql('unemployment', schema='public', con=engine, index=False, if_exists='append', method='multi')

INFO:sqlalchemy.engine.Engine:BEGIN (implicit)
INFO:sqlalchemy.engine.Engine:SELECT pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relname = %(table_name)s AND pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, %(param_4)s, %(param_5)s]) AND pg_catalog.pg_namespace.nspname = %(nspname_1)s
INFO:sqlalchemy.engine.Engine:[cached since 636.9s ago] {'table_name': 'unemployment', 'param_1': 'r', 'param_2': 'p', 'param_3': 'f', 'param_4': 'v', 'param_5': 'm', 'nspname_1': 'public'}
INFO:sqlalchemy.engine.Engine:INSERT INTO public.unemployment (id, fips, econ_year, unemp_rate, num_unemployed) VALUES (%(id_m0)s, %(fips_m0)s, %(econ_year_m0)s, %(unemp_rate_m0)s, %(num_unemployed_m0)s), (%(id_m1)s, %(fips_m1)s, %(econ_year_m1)s, %(unemp_rate_m1)s, %(num_unemployed_m1)s), (%(id_m2)s, %(fips_m2)s, %(econ_year_m2)s, %(unemp_rate_m2)s, %(num_une