In [115]:
# Imports
import boto3
import configparser
import os
import pandas as pd
import pyspark
import pyspark.sql.functions as F
import time

from helper_functions import create_session
from io import StringIO
from pyspark.sql import SparkSession

In [7]:
# Instantiate client connections
session = create_session()
glue = session.client('glue')
athena = session.client('athena')
s3 = session.client('s3')

In [6]:
# Get list of tables in Glue database
database_name = 'aws-covid-project'
bucket_name = 'kc-covid-project'
table_tuple = tuple([table['Name'] for table in glue.search_tables()['TableList']])

In [15]:
# Query schemas for tables using Athena, save to S3
try:
    query = f'SELECT * FROM INFORMATION_SCHEMA.columns WHERE table_name IN {(table_tuple)}'
    res = athena.start_query_execution(
        QueryString=query,
        QueryExecutionContext={'Database':database_name},
        ResultConfiguration={
            'OutputLocation':'s3://kc-covid-project/athena_outputs/',
            'EncryptionConfiguration':{'EncryptionOption':'SSE_S3'}
                            },
    )
    print(f'Status Code: {res["ResponseMetadata"]["HTTPStatusCode"]}')
except Exception as e:
    print(e)

Status Code: 200


In [59]:
# Terminal command create empty csv file to save schema file to
! type nul > athena_outputs/table_schemas.csv

In [None]:
# Download schema file from s3 bucket, load into Pandas
try:
    schema_csv = s3.list_objects(
        Bucket=bucket_name,
        Prefix='athena_outputs'
    )['Contents'][0]['Key']
    # print(schema_csv)
    res = s3.download_file(
            Bucket=bucket_name,
            Key=schema_csv,
            Filename='athena_outputs/table_schemas.csv'
        )
    print(f'Status Code: {res["ResponseMetadata"]["HTTPStatusCode"]}')
except Exception as e:
    print(e)

In [14]:
# After loading schema file into Pandas DataFrame, use DataFrame below to design data model
# Alternatively, generate DDL from tables in Glue database
table_schemas = pd.read_csv('athena_outputs/table_schemas.csv', index_col=None)
table_schemas

Unnamed: 0,table_catalog,table_schema,table_name,column_name,ordinal_position,column_default,is_nullable,data_type,comment,extra_info
0,awsdatacatalog,aws-covid-project,enigma_nyt_usa_states,date,1,,YES,varchar,,
1,awsdatacatalog,aws-covid-project,enigma_nyt_usa_states,state,2,,YES,varchar,,
2,awsdatacatalog,aws-covid-project,enigma_nyt_usa_states,fips,3,,YES,bigint,,
3,awsdatacatalog,aws-covid-project,enigma_nyt_usa_states,cases,4,,YES,bigint,,
4,awsdatacatalog,aws-covid-project,enigma_nyt_usa_states,deaths,5,,YES,bigint,,
...,...,...,...,...,...,...,...,...,...,...
153,awsdatacatalog,aws-covid-project,rearc_usa_latest_total,hospitalized,14,,YES,bigint,,
154,awsdatacatalog,aws-covid-project,rearc_usa_latest_total,total,15,,YES,bigint,,
155,awsdatacatalog,aws-covid-project,rearc_usa_latest_total,totaltestresults,16,,YES,bigint,,
156,awsdatacatalog,aws-covid-project,rearc_usa_latest_total,posneg,17,,YES,bigint,,


In [42]:
# Filter for athena query outputs, look for file to download
# s3_resource = session.resource('s3')
# project_bucket = s3_resource.Bucket(bucket_name)
# objs = project_bucket.objects.filter(Prefix='athena_outputs')
# for obj in objs:
#     pass

In [15]:
# Download athena output query from bucket
s3.download_file(
            Bucket=bucket_name,
            Key='static-state-codes',
            Filename='athena_outputs/state_codes.csv'
        )

# Relational Modeling

### Fact Table

 - Contains keys and COVID-19 related information.

    - Fips (primary key for most of the datasets)
    - State & region keys, hospitals, dates (dimension tables)
    - Statistics from datasetes regarding COVID-19 (positive cases, negative cases, deaths, etc.)

- Fields for data modeling are from Athena INFORMATION_SCHEMA.columns based on shared keys & STAR schema data modeling concepts.

    - Fact Table
        - fips
        - date
        - states
        - regions
        - confirmed
        - deaths
        - recovered
        - active
        - positive
        - negative
        - hospitalizedcurrently
        - hospitalized
        - hospitalizeddischarged

### Dimension tables
 - Contains dimensional keys and dimension-specific information
 
    - Region Dimension Table
        - fips
        - region
        - state
        - county
        - state_abb
        - lat
        - long

    - Hospital Dimension Table
        - fips
        - state
        - hos_lat
        - hos_long
        - hq_address
        - hospital_type
        - hospital_name
        - hq_city
        - hq_state

    - Date Dimension Table
        - fips
        - date
        - month
        - year
        - is_weekend

### Process
 1. Build relational schema for above
 2. Connect to Athena and query table data
 3. Write ETL jobs in Python
 4. Build table schema to Redshift WH
 5. Copy data to Redshift WH

In [5]:
# Store Database Variables
parser = configparser.ConfigParser()
parser.read_file(open('relational_modeling.config'))
AWS_REGION = parser.get('DB', 'AWS_REGION')
SCHEMA_NAME = parser.get('DB', 'SCHEMA_NAME')
S3_STAGING_DIR = parser.get('DB', 'S3_STAGING_DIR')
S3_BUCKET_NAME = parser.get('DB', 'S3_BUCKET_NAME')
S3_OUTPUT_DIRECTORY = parser.get('DB', 'S3_OUTPUT_DIRECTORY')

In [None]:
# Build SparkSession
spark = SparkSession \
    .builder \
    .master('local') \
    .appName('awscovidproject') \
    .getOrCreate()

In [26]:
# Querying table data and storing with Pandas; verify process scripts work as intended

# Create function
def dl_load_query(athena_client, s3_client, spark, dataset):
    '''
    ARGUMENTS:
        athena_client: AWS boto3 client object for conencting to AWS Athena service.
        s3_client: AWS boto3 client object for connecting to s3
        dataset: Name of dataset in Covid Database to query data from
        spark: SparkSession for PySpark
    RETURNS:
        Pandas DataFrame of query request to AWS Athena
    '''
    try:
        # Get query response dictionary
        start_res = athena_client.start_query_execution(
            QueryString=f'SELECT * FROM {dataset}',
            QueryExecutionContext={
                'Database':SCHEMA_NAME,
            },
            ResultConfiguration={
                'OutputLocation': S3_STAGING_DIR,
                'EncryptionConfiguration':{'EncryptionOption':'SSE_S3'}
            }
        )
        # If status request == 200, try and get query results unless query is still running
        while start_res["ResponseMetadata"]["HTTPStatusCode"] == 200:
            try:
                get_res = athena_client.get_query_results(
                    QueryExecutionId=start_res['QueryExecutionId']
                )
                break
            # If query still running, wait 0.01s
            except Exception as e:
                if 'not yet finished' in str(e):
                    time.sleep(0.01)
                else:
                    raise e
        file_save_path = f'athena_outputs/{dataset}.csv'
        dl_res = s3_client.download_file(
            Bucket=S3_BUCKET_NAME,
            Key=f'{S3_OUTPUT_DIRECTORY}/{start_res["QueryExecutionId"]}.csv',
            Filename=file_save_path
        )
        return spark.read.csv(file_save_path, header=True)
    except Exception as e:
        raise e


In [40]:
# Store all Athena queried data as Pandas DataFrames. Map with dictionary to avoid redundant function call
datasets = [table['Name'] for table in glue.search_tables()['TableList']]
dfs = {
    key: None for key in datasets
}
for table in datasets:
    if table not in os.listdir('athena_outputs'):
        dfs[table] = dl_load_query(athena, s3, table)

In [130]:
dfs['static_state_codes'] = spark.read.csv('athena_outputs/static_state_codes.csv', header=True)

In [131]:
# Addressing static_state_codes DataFrame. Because all colunmns are type string, reading headers
headers = dfs['static_state_codes'].first()[:]
dfs['static_state_codes'] = dfs['static_state_codes'].filter((F.col('col0') != 'State') & (F.col('col1') != 'Abbreviation'))
for col, new_col in zip(dfs['static_state_codes'].columns, headers):
    dfs['static_state_codes'] = dfs['static_state_codes'].withColumnRenamed(col, new_col)

In [132]:
dfs['static_state_codes'].show()

+--------------------+------------+
|               State|Abbreviation|
+--------------------+------------+
|             Alabama|          AL|
|              Alaska|          AK|
|             Arizona|          AZ|
|            Arkansas|          AR|
|          California|          CA|
|            Colorado|          CO|
|         Connecticut|          CT|
|            Delaware|          DE|
|District of Columbia|          DC|
|             Florida|          FL|
|             Georgia|          GA|
|              Hawaii|          HI|
|               Idaho|          ID|
|            Illinois|          IL|
|             Indiana|          IN|
|                Iowa|          IA|
|              Kansas|          KS|
|            Kentucky|          KY|
|           Louisiana|          LA|
|               Maine|          ME|
+--------------------+------------+
only showing top 20 rows



### Fact Table Fields, Data Lineage, and Join Plan
    - Fact Table
        - fips -> enigma_jhu, rearc_states_daily_test
        - date -> rearc_states_daily_test
        - states -> enigma_jhu
        - regions -> enigma_jhu
        - confirmed -> enigma_jhu
        - deaths -> enigma_jhu
        - recovered -> enigma_jhu
        - active -> enigma_jhu
        - positive -> rearc_states_daily_test
        - negative -> rearc_states_daily_test
        - hospitalizedcurrently -> rearc_states_daily_test
        - hospitalized -> rearc_states_daily_test
        - hospitalizeddischarged -> rearc_states_daily_test

In [148]:
dfs['enigma_jhu'].select('fips', 'province_state', 'country_region', 'confirmed', 'deaths', 'recovered', 'active').show()

+----+--------------+--------------+---------+------+---------+------+
|fips|province_state|country_region|confirmed|deaths|recovered|active|
+----+--------------+--------------+---------+------+---------+------+
|null|         Anhui|         China|        1|  null|     null|  null|
|null|       Beijing|         China|       14|  null|     null|  null|
|null|     Chongqing|         China|        6|  null|     null|  null|
|null|        Fujian|         China|        1|  null|     null|  null|
|null|         Gansu|         China|     null|  null|     null|  null|
|null|     Guangdong|         China|       26|  null|     null|  null|
|null|       Guangxi|         China|        2|  null|     null|  null|
|null|       Guizhou|         China|        1|  null|     null|  null|
|null|           Hai|         China|        4|  null|     null|  null|
|null|         Hebei|         China|        1|  null|     null|  null|
|null|  Heilongjiang|         China|     null|  null|     null|  null|
|null|

In [154]:
dfs['rearc_states_daily_test'].select('fips', 'date', 'positive', 'negative', 'hospitalizedcurrently', 'hospitalized', 'hospitalizeddischarged').show()

+----+--------+--------+--------+---------------------+------------+----------------------+
|fips|    date|positive|negative|hospitalizedcurrently|hospitalized|hospitalizeddischarged|
+----+--------+--------+--------+---------------------+------------+----------------------+
|  49|20210220|  366034| 1507875|                  260|       14421|                  null|
|  51|20210220|  561812|    null|                 1594|       23436|                 45667|
|  78|20210220|    2575|   43564|                 null|        null|                  null|
|  50|20210220|   14359|  309335|                   39|        null|                  null|
|  53|20210220|  332904|    null|                  608|       18969|                  null|
|  55|20210220|  611789| 2588501|                  370|       25716|                  null|
|  54|20210220|  129364|    null|                  292|        null|                  null|
|  56|20210220|   53683|  178648|                   31|        1367|            

In [185]:
# Join select DataFrame fields together. enigma_jhu to rearc_states_daily_test
dfs.keys()
temp1 = dfs['enigma_jhu'] \
    .withColumn('fips', F.regexp_replace(F.col('fips'), r'^[0]*', '')) \
    .select('fips', 'province_state', 'country_region', 'confirmed', 'deaths', 'recovered', 'active')
temp2 = dfs['rearc_states_daily_test'] \
    .select('fips', 'date', 'positive', 'negative', 'hospitalizedcurrently', 'hospitalized', 'hospitalizeddischarged')
fact = temp1.join(temp2, temp1.fips == temp2.fips)
fact.count()

26418

In [180]:
temp1.select('fips').distinct().show()

DataFrame[fips: string]

In [183]:
# temp1.select('fips').distinct().orderBy('fips').show()
temp1.withColumn('fips', F.regexp_replace(F.col('fips'), r'^[0]*', '')).select('fips').distinct().orderBy('fips').show()


+-----+
| fips|
+-----+
| null|
|10001|
|10003|
|10005|
| 1001|
| 1003|
| 1005|
| 1007|
| 1009|
| 1011|
| 1013|
| 1015|
| 1017|
| 1019|
| 1021|
| 1023|
| 1025|
| 1027|
| 1029|
| 1031|
+-----+
only showing top 20 rows

