In [None]:
#Importing libraries needed
import boto3
import pandas as pd
from io import StringIO
import time

In [None]:
#Settings to connect with AWS services
AWS_ACCESS_KEY = ""
AWS_SECRET_KEY = ""
AWS_REGION = ""
SCHEMA_NAME = ""
S3_STAGING_DIR = ""
S3_BUCKET_NAME = ""
S3_OUTPUT_DIRECTORY = "output"

In [None]:
#Connecting to Athena to query data
athena_client = boto3.client(
    "athena",
    aws_access_key_id = AWS_ACCESS_KEY,
    aws_secret_access_key = AWS_SECRET_KEY,
    region_name = AWS_REGION
)

In [None]:
Dict = {}
def download_and_load_query_results(
        client: boto3.client, query_response: Dict) -> pd.DataFrame:
    '''
    Function to download files from S3 to the local machine
    '''
    while True:
        try:
            client.get_query_results(
                QueryExecutionId=query_response["QueryExecutionId"]
            )
            break
        except Exception as err:
            if "not yet finished" in str(err):
                time.sleep(0.001)
            else:
                raise err
    
    temp_file_location: str = "athena_query_results.csv"
    s3_client = boto3.client(
        "s3",
        aws_access_key_id = AWS_ACCESS_KEY,
        aws_secret_access_key = AWS_SECRET_KEY,
        region_name = AWS_REGION
    )
    s3_client.download_file(
        S3_BUCKET_NAME,
        f"{S3_OUTPUT_DIRECTORY}/{query_response['QueryExecutionId']}.csv",
        temp_file_location,
    )
    return pd.read_csv(temp_file_location)

# Getting Data inside Notebook to make some transformations

In [None]:
#Making queries to dataset of each table through Athena and downloading usign function declared above
response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM covid_dataset.enigma_jhud LIMIT 7880;",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    }
)
enigma_jhud = download_and_load_query_results(athena_client, response)

response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM covid_dataset.countrycode;",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    }
)
countrycode = download_and_load_query_results(athena_client, response)

response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM covid_dataset.countypopulation;",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    }
)
countypopulation = download_and_load_query_results(athena_client, response)

response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM covid_dataset.rearc_usa_hospital_beds;",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    }
)
rearc_usa_hospital_beds = download_and_load_query_results(athena_client, response)

response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM covid_dataset.state_abv;",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    }
)
state_abv = download_and_load_query_results(athena_client, response)

response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM covid_dataset.states_daily;",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    }
)
states_daily = download_and_load_query_results(athena_client, response)

response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM covid_dataset.us_county;",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    }
)
us_county = download_and_load_query_results(athena_client, response)

response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM covid_dataset.us_daily;",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    }
)
us_daily = download_and_load_query_results(athena_client, response)

response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM covid_dataset.us_states;",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    }
)
us_states = download_and_load_query_results(athena_client, response)

response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM covid_dataset.us_total_latest;",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    }
)
us_total_latest = download_and_load_query_results(athena_client, response)

In [None]:
#Fixing state_abv DF putting first row as a column
cols = state_abv.iloc[0]
new_df = state_abv[1:]
new_df.columns = cols
state_abv = new_df

# Building Dim and Fact tables

In [None]:
factCovid1 = enigma_jhud[['fips', 'province_state', 'country_region', 'confirmed', 'deaths', 'recovered', 'active']]
factCovid2 = states_daily[['fips', 'date', 'positive', 'negative', 'hospitalizedcurrently', 'hospitalized', 'hospitalizeddischarged']]
factCovid = pd.merge(factCovid1, factCovid2, on='fips', how='inner')
factCovid.shape

In [None]:
dimRegion1 = enigma_jhud[['fips','province_state', 'country_region', 'latitude', 'longitude']]
dimRegion2 = us_county[['fips', 'county', 'state']]
dimRegion = pd.merge(dimRegion1, dimRegion2, "inner", on='fips')
dimRegion.shape

In [None]:
dimDate = states_daily[['fips','date']]
dimDate['date'] = pd.to_datetime(dimDate['date'], format='%Y%m%d')
dimDate['year'] = dimDate['date'].dt.year
dimDate['month'] = dimDate['date'].dt.month
dimDate['day'] = dimDate['date'].dt.day
dimDate['day_of_week'] = dimDate['date'].dt.dayofweek
dimDate

dimHospital = rearc_usa_hospital_beds[['fips','state_name','latitude', 'longtitude', 'hq_address', 'hospital_name', 'hospital_type', 'hq_city', 'hq_state']]

## Saving Dim and Fact tables into S3

In [None]:
bucket = S3_BUCKET_NAME
#To save our data into S3 as binary format
csv_buffer = StringIO()
#Save our table into the buffer
factCovid.to_csv(csv_buffer)

In [None]:
s3_resource = boto3.resource('s3',
        aws_access_key_id = AWS_ACCESS_KEY,
        aws_secret_access_key = AWS_SECRET_KEY,
        region_name = AWS_REGION)


In [None]:
# Saving dim and fact tables as binary format
s3_resource.Object(bucket, 'output/factCovid.csv').put(Body=csv_buffer.getvalue())

csv_buffer = StringIO()
dimRegion.to_csv(csv_buffer)
s3_resource.Object(bucket, 'output/dimRegion.csv').put(Body=csv_buffer.getvalue())

csv_buffer = StringIO()
dimDate.to_csv(csv_buffer)
s3_resource.Object(bucket, 'output/dimDate.csv').put(Body=csv_buffer.getvalue())

csv_buffer = StringIO()
dimHospital.to_csv(csv_buffer)
s3_resource.Object(bucket, 'output/dimHospital.csv').put(Body=csv_buffer.getvalue())

In [None]:
#Getting schema of dim and fact tables to be executed into redshift to create tables
dimHospitalsql = pd.io.sql.get_schema(dimHospital.reset_index(), 'dimHospital')
print(''.join(dimHospitalsql))
dimDatesql = pd.io.sql.get_schema(dimDate.reset_index(), 'dimDate')
print(''.join(dimDatesql))
dimRegionsql = pd.io.sql.get_schema(dimRegion.reset_index(), 'dimRegion')
print(''.join(dimRegionsql))
factCovidsql = pd.io.sql.get_schema(factCovid.reset_index(), 'factCovid')
print(''.join(factCovidsql))

# Importing Redshift resource to deploy Dim and Fact tables

In [None]:
import redshift_connector

conn = redshift_connector.connect(
    host='HOST-REDSHIFT',
    port=5439,
    database='dev',
    user='awsuser',
    password='PASSWORD'
)
conn.autocommit = True
cursor = redshift_connector.Cursor = conn.cursor()

## Creating Dim and Fact tables in redshift Cluster

In [None]:
cursor.execute("""
CREATE OR ALTER TABLE "dimHospital" (
"index" INTEGER,
  "fips" INTEGER,
  "state_name" TEXT,
  "latitude" REAL,
  "longtitude" REAL,
  "hq_address" TEXT,
  "hospital_name" TEXT,
  "hospital_type" TEXT,
  "hq_city" TEXT,
  "hq_state" TEXT
)
""")

cursor.execute("""CREATE TABLE "dimDate" (
"index" INTEGER,
  "fips" REAL,
  "date" TIMESTAMP,
  "year" INTEGER,
  "month" INTEGER,
  "day" INTEGER,
  "day_of_week" INTEGER
)""")

cursor.execute("""CREATE TABLE "dimRegion" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "latitude" REAL,
  "longitude" REAL,
  "county" TEXT,
  "state" TEXT
)""")

cursor.execute("""CREATE TABLE "factCovid" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "confirmed" REAL,
  "deaths" REAL,
  "recovered" REAL,
  "active" REAL,
  "date" INTEGER,
  "positive" INTEGER,
  "negative" REAL,
  "hospitalizedcurrently" REAL,
  "hospitalized" REAL,
  "hospitalizeddischarged" REAL
)""")

## Copy from S3 to RedShift

In [None]:
#For this we need to create an IAM role from redshift to S3, giving permissions to copy data from S3
#After, we need associated this role to the cluster in redshift
cursor.execute("""copy dimDate from 'STORAGE'
               credentials 'IAM'
               delimiter ','
               region 'us-east-2'
               IGNOREHEADER 1;
""")