In [20]:
# SQL Alchemy
from sqlalchemy import create_engine

# PyMySQL 
import pymysql
pymysql.install_as_MySQLdb()

# Config variables
from config import remote_db_endpoint, remote_db_port
from config import remote_db_name, remote_db_user, remote_db_pwd
from config import local_db_user, local_db_pwd, local_db_endpoint, local_db_port, local_db_name

# Import Pandas
import pandas as pd

import psycopg2

# Extract CSVs into DataFrames

In [2]:
covid_cases_file = 'covid-cases-by-state.csv'
covid_cases = pd.read_csv(covid_cases_file, encoding= 'unicode_escape')
covid_cases.head()

Unnamed: 0,#,State,Total Cases,New Cases,Total Deaths,New Deaths,Total Recovered,Active Cases,Total Cases per million,Total Deaths per million,Tests,Tests per million,Total Population
0,1.0,California,20653265,35919.0,356743,298.0,12182160,8114362,62396,1078,254907318,770107,39512223
1,2.0,Texas,2345657,,26294,,971395,1347968,59365,665,33058311,836660,28995881
2,3.0,Florida,1784569,,28506,,1453639,302424,61546,983,15832807,546036,21477737
3,4.0,New York,1323315,,21673,,735381,566261,61613,1009,15703599,731157,19453561
4,5.0,Illinois,1031053,,38145,,478718,514190,53001,1961,25504313,1311036,12671821


In [3]:
relief_funds_file = 'HHS_Provider_Relief_Fund.csv'
relief_funds = pd.read_csv(relief_funds_file, encoding= 'unicode_escape')
relief_funds.head()

Unnamed: 0,Provider Name,State,City,Payment
0,BRANDON ASTIN DMD LLC,AK,ANCHOR POINT,"$19,003"
1,ELIZABETH WATNEY,AK,ANCHOR POINT,$724
2,A HAND UP BEHAVIOR SERVICES,AK,ANCHORAGE,"$1,191"
3,A JOINT EFFORT PHYSICAL THERAPY,AK,ANCHORAGE,"$11,641"
4,"AA PAIN CLINIC, INC.",AK,ANCHORAGE,"$69,976"


In [4]:
state_abbreviations_file = 'state_abbreviations.csv'
state_abbreviations = pd.read_csv(state_abbreviations_file, encoding='utf-8-sig')
state_abbreviations.head()


Unnamed: 0,State,Abbrev,Code
0,Alabama,Ala.,AL
1,Alaska,Alaska,AK
2,Arizona,Ariz.,AZ
3,Arkansas,Ark.,AR
4,California,Calif.,CA


# Transform DataFrames

In [5]:
#extract only relavent columns
covid_cases = covid_cases[['State', 'Total Cases', 'Total Deaths', 'Total Cases per million', 'Total Deaths per million', 'Tests', 'Tests per million', 'Total Population']]
covid_cases

Unnamed: 0,State,Total Cases,Total Deaths,Total Cases per million,Total Deaths per million,Tests,Tests per million,Total Population
0,California,20653265,356743,62396,1078,254907318,770107,39512223
1,Texas,2345657,26294,59365,665,33058311,836660,28995881
2,Florida,1784569,28506,61546,983,15832807,546036,21477737
3,New York,1323315,21673,61613,1009,15703599,731157,19453561
4,Illinois,1031053,38145,53001,1961,25504313,1311036,12671821
...,...,...,...,...,...,...,...,...
60,Grand Princess Ship,23429,813,,,204307,,
61,Wuhan Repatriated,122,7,,,,,
62,Diamond Princess Ship,3,,,,3,,
63,Total:,46,,,,46,,


In [6]:
relief_funds['Payment'] = relief_funds['Payment'].replace({'\$':''}, regex = True)
relief_funds['Payment'] = relief_funds['Payment'].replace({' ':''}, regex = True)
relief_funds['Payment'] = relief_funds['Payment'].replace({',':''}, regex = True)
relief_funds['Payment'] = relief_funds['Payment'].astype(float)
relief_funds.dtypes
#relief_funds

Provider Name     object
State             object
City              object
Payment          float64
dtype: object

In [7]:
payments_by_state = relief_funds[['State', 'Payment']].groupby('State').sum()
len(payments_by_state)
payments_by_state = payments_by_state.reset_index()
payments_by_state['Payment'] = payments_by_state['Payment'].map("${:,}".format)
payments_by_state.head()

Unnamed: 0,State,Payment
0,AK,"$222,894,177.0"
1,AL,"$1,304,513,045.0"
2,AR,"$925,881,569.0"
3,AS,"$5,950,917.0"
4,AZ,"$1,473,716,024.0"


# Create Local Database Connection

In [8]:
# Create Engine and Pass in MySQL Connection

# local Postgres DB Connection  
local_engine = create_engine(f"postgresql://{local_db_user}:{local_db_pwd}@{local_db_endpoint}:{local_db_port}")
#print(f"postgresql://{local_db_user}:{local_db_pwd}@{local_db_endpoint}:{local_db_port}")

# Create a local database engine connection
local_conn = local_engine.connect()

In [9]:
# Create local database
try:
    local_conn.execution_options(isolation_level="AUTOCOMMIT").execute(f"CREATE DATABASE {local_db_name}")
except Exception as e:
    print(e)

# Connect to local database
try:
    local_engine = create_engine(f"postgresql://{local_db_user}:{local_db_pwd}@{local_db_endpoint}:{local_db_port}/{local_db_name}")
    local_conn = local_engine.connect()
except Exception as e:
    print(e)  
    
#confirm tables
local_engine.table_names()

(psycopg2.errors.DuplicateDatabase) database "etl_project_db" already exists

[SQL: CREATE DATABASE etl_project_db]
(Background on this error at: http://sqlalche.me/e/13/f405)


['relief_provider',
 'relief_payments_by_state',
 'covid_cases',
 'state_abbreviations',
 'merged_df']

# Create Cloud Database Connection

In [25]:
# Create Engine and Pass in MySQL Connection

# Cloud MySQL Database Connection on AWS
cloud_engine = create_engine(f"mysql://{remote_db_user}:{remote_db_pwd}@{remote_db_endpoint}:{remote_db_port}")

# Create a remote database engine connection
cloud_conn = cloud_engine.connect()

In [26]:
# Create cloud database
try:
    cloud_conn.execution_options(isolation_level="AUTOCOMMIT").execute(f"CREATE DATABASE {remote_db_name}")
except Exception as e:
    print(e)
    
# Connect to cloud database
try:
    cloud_engine = create_engine(f"mysql://{remote_db_user}:{remote_db_pwd}@{remote_db_endpoint}:{remote_db_port}/{remote_db_name}")
    cloud_conn = cloud_engine.connect()
except Exception as e:
    print(e)   
    
#confirm tables
cloud_engine.table_names()

(pymysql.err.ProgrammingError) (1007, "Can't create database 'etl_project_db'; database exists")
[SQL: CREATE DATABASE etl_project_db]
(Background on this error at: http://sqlalche.me/e/13/f405)


[]

# Load DataFrames into Local Database

In [13]:
#upload dataframe to database
relief_funds.to_sql(name='relief_provider', if_exists='replace', con=local_conn, index=False)
payments_by_state.to_sql(name='relief_payments_by_state', if_exists='replace', con=local_conn, index=False)
covid_cases.to_sql(name='covid_cases', if_exists='replace', con=local_conn, index=False)
state_abbreviations.to_sql(name='state_abbreviations', if_exists='replace', con=local_conn, index=False)

In [14]:
#confirm tables
local_engine.table_names()

['merged_df',
 'relief_provider',
 'relief_payments_by_state',
 'covid_cases',
 'state_abbreviations']

# Load DataFrames into Cloud Database

In [28]:
#upload dataframe to database
relief_funds.to_sql(name='relief_provider', if_exists='replace', con=cloud_conn, index=False)
payments_by_state.to_sql(name='relief_payments_by_state', if_exists='replace', con=cloud_conn, index=False)
covid_cases.to_sql(name='covid_cases', if_exists='replace', con=cloud_conn, index=False)
state_abbreviations.to_sql(name='state_abbreviations', if_exists='replace', con=cloud_conn, index=False)

In [29]:
#confirm tables
cloud_engine.table_names()

['covid_cases',
 'relief_payments_by_state',
 'relief_provider',
 'state_abbreviations']

# Create Merged Table and Load to Databases

In [17]:
#create merged table
merged_df = pd.read_sql('''
                            select 
                                state_abbreviations."Code" as "State Code",
                                state_abbreviations."State" as state,
                                covid_cases."Total Cases", 
                                covid_cases."Total Deaths",
                                relief_payments_by_state."Payment"

                            from
                                covid_cases 

                            inner join
                                state_abbreviations 

                            on 
                                covid_cases."State" = state_abbreviations."State"

                            inner join
                                relief_payments_by_state

                            on
                                relief_payments_by_state."State" = state_abbreviations."Code"
                        ''', 
                        local_engine)

merged_df.head()

Unnamed: 0,State Code,state,Total Cases,Total Deaths,Payment
0,AL,Alabama,375178,12423,"$1,304,513,045.0"
1,AK,Alaska,58064,930,"$222,894,177.0"
2,AZ,Arizona,539545,6748,"$1,473,716,024.0"
3,AR,Arkansas,229556,2879,"$925,881,569.0"
4,CA,California,20653265,356743,"$8,436,326,765.0"


In [None]:
pd.read_sql

In [18]:
#load merged_df into local dataframe
merged_df.to_sql(name='merged_df', if_exists='replace', con=local_conn, index=False)

#confirm tables
local_engine.table_names()

['relief_provider',
 'relief_payments_by_state',
 'covid_cases',
 'state_abbreviations',
 'merged_df']

In [30]:
#load merged_df into cloud dataframe
merged_df.to_sql(name='merged_df', if_exists='replace', con=cloud_conn, index=False)

#confirm tables
cloud_engine.table_names()

['covid_cases',
 'merged_df',
 'relief_payments_by_state',
 'relief_provider',
 'state_abbreviations']