- Relational data model,
- Connect to Athena and query data,
- ETL job in python,
- Save results to S3,
- Glue deployment
- Build tables on Redshift,
- Copy data to Redshift

In [49]:
#imports
import boto3
import pandas as pd
from io import StringIO
import configparser
import time

In [50]:
# Setting connection to the config file, to keep all the credentials safe
config = configparser.ConfigParser()
CONFIG_PATH = '/Users/maciej/data_eng/aws/covid_project/cluster.config'
config.read_file(open(CONFIG_PATH))

In [51]:
conf_aws = config['AWS']
conf_s3 = config['S3']
conf_dwh = config['DWH']

In [52]:
# Setting connection to Athena
athena_client = boto3.client(
    service_name = 'athena',
    aws_access_key_id = conf_aws['KEY'],
    aws_secret_access_key = conf_aws['SECRET'],
    region_name = conf_aws['REGION']
)

In [53]:
Dict = {}
def download_and_load_query_results(
    client: boto3.client, query_response: Dict
) -> pd.DataFrame:

    while True:
        try:
            # This function only loads the first 1000 rows
            client.get_query_results(
                QueryExecutionId = query_response['QueryExecutionId']
            )
            break
        except Exception as err:
            if "not yet finished" in str(err):
                time.sleep(0.001)
            else:
                raise err

    temp_file_location: str = 'athena_query_results.csv'
    s3_client = boto3.client(
        service_name = 's3',
        aws_access_key_id = conf_aws['KEY'],
        aws_secret_access_key = conf_aws['SECRET'],
        region_name = conf_aws['REGION']
    )
    s3_client.download_file(
        conf_s3['BUCKET_NAME'],
        f"{conf_s3['OUTPUT_DIRECTORY']}/{query_response['QueryExecutionId']}.csv",
        temp_file_location
    )
    return pd.read_csv(temp_file_location)

In [90]:

response = athena_client.start_query_execution(
    QueryString = f'SELECT * FROM enigma_jhu',
    QueryExecutionContext = {'Database': conf_s3['SCHEMA_NAME']},
    ResultConfiguration = {
        'OutputLocation': conf_s3['STAGING_DIR'],
        'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}
    }
)

In [115]:
# Function to get response and next call download_and_load_query_results 
# to put table into pd df
def connect(table_name):
    response = athena_client.start_query_execution(
    QueryString = f"SELECT * FROM {table_name}",
    QueryExecutionContext = {'Database': conf_s3['SCHEMA_NAME']},
    ResultConfiguration = {
        'OutputLocation': conf_s3['STAGING_DIR'],
        'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}
    }
    )
    table = download_and_load_query_results(athena_client, response)
    return table 
    

In [116]:
enigma_jhu = connect('enigma_jhu')

In [117]:
enigma_jhu.head()

Unnamed: 0,fips,admin2,province_state,country_region,last_update,latitude,longitude,confirmed,deaths,recovered,active,combined_key
0,,,Anhui,China,2020-01-22T17:00:00,31.826,117.226,1.0,,,,"""Anhui"
1,,,Beijing,China,2020-01-22T17:00:00,40.182,116.414,14.0,,,,"""Beijing"
2,,,Chongqing,China,2020-01-22T17:00:00,30.057,107.874,6.0,,,,"""Chongqing"
3,,,Fujian,China,2020-01-22T17:00:00,26.079,117.987,1.0,,,,"""Fujian"
4,,,Gansu,China,2020-01-22T17:00:00,36.061,103.834,,,,,"""Gansu"


In [118]:
#Rest of the tables
countrycode = connect('countrycode')
countypopulation = connect('countypopulation')
rearc_hospital_beds = connect('rearc_hospital_beds')
state_abv = connect('state_abv')
us_county = connect('us_county')
us_daily = connect('us_daily')
us_states = connect('us_states')
us_states_daily = connect('us_states_daily')
us_total_latest = connect('us_total_latest')


In [120]:
state_abv.head()

Unnamed: 0,col0,col1
0,State,Abbreviation
1,Alabama,AL
2,Alaska,AK
3,Arizona,AZ
4,Arkansas,AR


In [124]:
state_abv = state_abv.iloc[1:,:].rename(columns={'col0': 'State', 'col1': 'Abbreviation'})
state_abv.head()

Unnamed: 0,State,Abbreviation
2,Alaska,AK
3,Arizona,AZ
4,Arkansas,AR
5,California,CA
6,Colorado,CO


In [131]:
for column in enigma_jhu:
    print(column)

fips
admin2
province_state
country_region
last_update
latitude
longitude
confirmed
deaths
recovered
active
combined_key


In [133]:
factCovid = enigma_jhu[['fips', 'province_state', 'country_region', 'confirmed',
                    'deaths', 'recovered', 'active']]\
    .rename(columns={'province_state': 'state', 'country_region': 'region'})

In [134]:
factCovid.head()

Unnamed: 0,fips,state,region,confirmed,deaths,recovered,active
0,,Anhui,China,1.0,,,
1,,Beijing,China,14.0,,,
2,,Chongqing,China,6.0,,,
3,,Fujian,China,1.0,,,
4,,Gansu,China,,,,


In [148]:
factCovid_2 = us_states_daily[['date', 'positive', 'negative',
                    'hospitalizedcurrently', 'hospitalized', 'hospitalizedcumulative', 'fips']]


In [150]:
factCovid = factCovid.merge(factCovid_2, on ='fips', how='inner')

In [156]:
factCovid.shape

(26418, 13)

In [154]:
dimRegion_1 = enigma_jhu[['fips', 'province_state', 'country_region', 'latitude', 'longitude']]
dimRegion_2 = us_county[['fips', 'county', 'state']]
dimRegion = dimRegion_1.merge(dimRegion_2, on ='fips', how='inner')

In [158]:
dimHospital = rearc_hospital_beds[['fips', 'state_name', 'longtitude', 'latitude',
                        'hq_address', 'hospital_name', 'hospital_type', 'hq_city',
                        'hq_state']]

In [160]:
dimDate = us_states_daily[['fips', 'date']]

In [161]:
dimDate.head()

Unnamed: 0,fips,date
0,2,20210307
1,1,20210307
2,5,20210307
3,60,20210307
4,4,20210307


In [168]:
%%capture
dimDate['date'] = pd.to_datetime(dimDate['date'], format ='%Y%m%d')

In [163]:
dimDate.head()

Unnamed: 0,fips,date
0,2,2021-03-07
1,1,2021-03-07
2,5,2021-03-07
3,60,2021-03-07
4,4,2021-03-07


In [165]:
%%capture
dimDate['year'] = dimDate['date'].dt.year
dimDate['month'] = dimDate['date'].dt.month
dimDate['day'] = dimDate['date'].dt.dayofweek

In [169]:
dimDate.head()

Unnamed: 0,fips,date,year,month,day
0,2,2021-03-07,2021,3,6
1,1,2021-03-07,2021,3,6
2,5,2021-03-07,2021,3,6
3,60,2021-03-07,2021,3,6
4,4,2021-03-07,2021,3,6


In [185]:
#store output in the s3
csv_buffer = StringIO()
factCovid.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
s3_resource.Object(conf_s3['BUCKET_NAME'], 'pd-tables-output/factCovid.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'JZWJGAY66T22XJ34',
  'HostId': 'KAX9BYbgP598iTJu3cclbA2McAlAH4UVmrBB4ABzFbDQpRcK4NDTOPeKFNMT5d9qe8LGjrCP01w=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'KAX9BYbgP598iTJu3cclbA2McAlAH4UVmrBB4ABzFbDQpRcK4NDTOPeKFNMT5d9qe8LGjrCP01w=',
   'x-amz-request-id': 'JZWJGAY66T22XJ34',
   'date': 'Fri, 30 Sep 2022 16:05:41 GMT',
   'etag': '"62655c9369f3566b4f911b2662290160"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"62655c9369f3566b4f911b2662290160"'}

In [186]:
#store output in the s3
csv_buffer = StringIO()
dimRegion.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
s3_resource.Object(conf_s3['BUCKET_NAME'], 'pd-tables-output/dimRegion.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': '3YN5QTYTD5KR6PH2',
  'HostId': 'dBbtSPdIabmD4mpD1I8+EdMBVt1f/mMYie4RKmGPcTtc2BCnoFtrNsNT+baG3VrZL9ZDlWejoeZjSFnawdhuKQ==',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'dBbtSPdIabmD4mpD1I8+EdMBVt1f/mMYie4RKmGPcTtc2BCnoFtrNsNT+baG3VrZL9ZDlWejoeZjSFnawdhuKQ==',
   'x-amz-request-id': '3YN5QTYTD5KR6PH2',
   'date': 'Fri, 30 Sep 2022 16:07:16 GMT',
   'etag': '"7ea0062204825d5cb58b2e03d8df2cf1"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"7ea0062204825d5cb58b2e03d8df2cf1"'}

In [187]:
#store output in the s3
csv_buffer = StringIO()
dimHospital.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
s3_resource.Object(conf_s3['BUCKET_NAME'], 'pd-tables-output/dimHospital.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': '3QH64P54JJZRYEX4',
  'HostId': 'HCPEBsWfg83tr5ywlIGbW08u4txWyVMcPrzYVyrIrpt6zk/M6c8V1SyXBLbPe+rA5oxZQUvXzxg=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'HCPEBsWfg83tr5ywlIGbW08u4txWyVMcPrzYVyrIrpt6zk/M6c8V1SyXBLbPe+rA5oxZQUvXzxg=',
   'x-amz-request-id': '3QH64P54JJZRYEX4',
   'date': 'Fri, 30 Sep 2022 16:11:01 GMT',
   'etag': '"b280f8519b7739b1977a4efddeb44d3b"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"b280f8519b7739b1977a4efddeb44d3b"'}

In [188]:
#store output in the s3
csv_buffer = StringIO()
dimDate.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
s3_resource.Object(conf_s3['BUCKET_NAME'], 'pd-tables-output/dimDate.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'BRV4HJYC4ZYZT8SX',
  'HostId': 'h0OIjUoDM9AoLHcOXCuXcmGi63eZpVKGbruqg/BXO+T6vEmlXDauVQwV6Rs+gN4ifyypZYKaNbs=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'h0OIjUoDM9AoLHcOXCuXcmGi63eZpVKGbruqg/BXO+T6vEmlXDauVQwV6Rs+gN4ifyypZYKaNbs=',
   'x-amz-request-id': 'BRV4HJYC4ZYZT8SX',
   'date': 'Fri, 30 Sep 2022 16:11:03 GMT',
   'etag': '"91c7fdac0a9dc5b435ff8e6c2bda3764"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"91c7fdac0a9dc5b435ff8e6c2bda3764"'}

In [198]:
sqlschema = pd.io.sql.get_schema(dimDate.reset_index(), 'dimDate')
print(''.join(sqlschema))

CREATE TABLE "dimDate" (
"index" INTEGER,
  "fips" INTEGER,
  "date" TIMESTAMP,
  "year" INTEGER,
  "month" INTEGER,
  "day" INTEGER
)


In [200]:
sqlschema = pd.io.sql.get_schema(dimHospital.reset_index(), 'dimHospital')
print(''.join(sqlschema))

CREATE TABLE "dimHospital" (
"index" INTEGER,
  "fips" REAL,
  "state_name" TEXT,
  "longtitude" REAL,
  "latitude" REAL,
  "hq_address" TEXT,
  "hospital_name" TEXT,
  "hospital_type" TEXT,
  "hq_city" TEXT,
  "hq_state" TEXT
)


In [201]:
sqlschema = pd.io.sql.get_schema(dimRegion.reset_index(), 'dimRegion')
print(''.join(sqlschema))

CREATE TABLE "dimRegion" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "latitude" REAL,
  "longitude" REAL,
  "county" TEXT,
  "state" TEXT
)


In [202]:
sqlschema = pd.io.sql.get_schema(factCovid.reset_index(), 'factCovid')
print(''.join(sqlschema))

CREATE TABLE "factCovid" (
"index" INTEGER,
  "fips" REAL,
  "state" TEXT,
  "region" TEXT,
  "confirmed" REAL,
  "deaths" REAL,
  "recovered" REAL,
  "active" REAL,
  "date" INTEGER,
  "positive" REAL,
  "negative" REAL,
  "hospitalizedcurrently" REAL,
  "hospitalized" REAL,
  "hospitalizedcumulative" REAL
)
