In [30]:
import boto3
import pandas as pd
from io import StringIO
import configparser
import time
import os

In [31]:
config = configparser.ConfigParser()
config.read_file(open('cluster.config'))

AWS_ACCESS_KEY = config.get("AWS","KEY")
AWS_SECRET_KEY = config.get("AWS","SECRET")
AWS_REGION = config.get("AWS","REGION")
SCHEMA_NAME = config.get("DB","SCHEMA")
S3_STAGING_DIR = config.get("S3","S3_STAGING_DIR")
S3_BUCKET_NAME = config.get("S3","S3_BUCKET_NAME")
S3_OUTPUT_DIRECTORY = config.get("S3","S3_OUTPUT_DIRECTORY")

# get the list of tables from Glue, download files and load into list of dictionary

In [219]:
# Use boto3 library in Python to paginate through all tables from AWS Glue Data Catalog
from botocore.exceptions import ClientError

def paginate_through_tables(database_name, max_items):
   session = boto3.session.Session()
   glue_client = session.client('glue', aws_access_key_id = AWS_ACCESS_KEY, aws_secret_access_key = AWS_SECRET_KEY, region_name = AWS_REGION)
   try:
       response = glue_client.get_tables(DatabaseName="covid19")
       tables = []
       for item in enumerate(response["TableList"]):
           table_name = item[1]["Name"]
           tables.append(table_name)
            
   except ClientError as e:
        raise Exception("boto3 client error in paginate_through_tables: " + e.__str__())
        
   return tables

tables = paginate_through_tables("covid19",20)
tables

['enigma_jhud',
 'enigma_nytimes_csv',
 'rearc_covid_testing_data_states_daily',
 'rearc_rearc_usa_hospital_beds',
 'static_datasets_countrycode',
 'static_datasets_countypopulation',
 'static_datasets_state_abv']

In [220]:
Dict = {}
def download_and_load_query_results(
    client: boto3.client, query_response: Dict) -> pd.DataFrame:
    while True:
        try:
            # This function only loads the first 1000 rows
            client.get_query_results(QueryExecutionId=query_response["QueryExecutionId"])
            break
        except Exception as err:
            if "not yet finished" in str(err):
                time.sleep(0.001)
            else:
                raise err
    temp_file: str = "athena_query_results.csv"
    s3_client = boto3.client("s3", aws_access_key_id = AWS_ACCESS_KEY, aws_secret_access_key = AWS_SECRET_KEY, region_name = AWS_REGION)
    #s3.download_file('BUCKET_NAME', 'OBJECT_NAME', 'FILE_NAME')
    s3_client.download_file(S3_BUCKET_NAME, f"{S3_OUTPUT_DIRECTORY}/{query_response['QueryExecutionId']}.csv",
                            temp_file)
    result_df = pd.read_csv(temp_file)
    os.remove(temp_file)
    return result_df

In [221]:
#athena_client.get_query_results(QueryExecutionId=response['QueryExecutionId'])

In [222]:
athena_client = boto3.client(
    "athena",
    aws_access_key_id = AWS_ACCESS_KEY,
    aws_secret_access_key = AWS_SECRET_KEY,
    region_name = AWS_REGION
)

In [223]:
all_data = []
for table_name in tables:
    Query = "SELECT * FROM " + table_name
    #print(Query)
    time.sleep(0.1)
    response = athena_client.start_query_execution(
                         QueryString = Query,
                         QueryExecutionContext = {"Database": SCHEMA_NAME},
                         ResultConfiguration = {"OutputLocation": S3_STAGING_DIR,
                                               "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"}                                                   }
           )
    try:
        dict = {}
        dict["table_name"] = table_name
        dict["QueryExecutionId"] = response["QueryExecutionId"]
        dict["df"] = download_and_load_query_results(athena_client, response)
    except Exception as err:
        raise err

    all_data.append(dict)

In [224]:
len(all_data)

7

In [225]:
print(all_data[6]["table_name"])
print(all_data[6]["QueryExecutionId"])
print(all_data[6]["df"].shape)

static_datasets_state_abv
30a7f849-f37b-4ec9-b946-13cccc760040
(52, 2)


In [226]:
print(all_data[6]["table_name"])
state_abv = all_data[6]["df"]
state_abv.columns = state_abv.iloc[0]
state_abv = state_abv[1:]
#state_abv

static_datasets_state_abv


# create fact and dimension tables

In [255]:
tables

['enigma_jhud',
 'enigma_nytimes_csv',
 'rearc_covid_testing_data_states_daily',
 'rearc_rearc_usa_hospital_beds',
 'static_datasets_countrycode',
 'static_datasets_countypopulation',
 'static_datasets_state_abv']

In [256]:
# table enigma_jhud
factCovid_1_final = all_data[0]["df"][['fips','province_state','country_region','confirmed','deaths','recovered','active']]
# table rearc_covid_testing_data_states_daily
factCovid_2_final = all_data[2]["df"][['fips','date','positive','negative','hospitalizedcurrently','hospitalized', 'hospitalizeddischarged']]
factCovid = pd.merge(factCovid_1_final, factCovid_2_final, on='fips', how='inner')
factCovid.shape                                      

(27992, 13)

In [257]:
factCovid = factCovid[factCovid['fips'].isnull() == False]

In [260]:
dimRegion_1 = all_data[0]["df"][['fips', 'province_state', 'country_region', 'latitude', 'longitude']]
dimRegion_2 = all_data[1]["df"][['fips', 'county', 'state']]
dimRegion = pd.merge(dimRegion_1, dimRegion_2, on='fips', how='inner')
dimRegion = dimRegion[dimRegion['fips'].isnull() == False]

In [262]:
#dimRegion.shape

In [264]:
dimHospital = all_data[3]["df"][['fips','state_name','latitude','longtitude','hq_address','hospital_name','hospital_type','hq_city','hq_state']]
dimHospital = dimHospital[dimHospital['fips'].isnull() == False]
#dimHospital.shape

In [265]:
dimDate = all_data[2]["df"][['fips','date']].fillna(0)
dimDate['fips'] = dimDate['fips'].astype(int)
dimDate['date'] = pd.to_datetime(dimDate['date'], format='%Y%m%d')
dimDate['year'] = dimDate['date'].dt.year
dimDate['month'] = dimDate['date'].dt.month
dimDate['day_of_week'] = dimDate['date'].dt.dayofweek

In [266]:
dimDate.head(10)

Unnamed: 0,fips,date,year,month,day_of_week
0,49,2021-02-20,2021,2,5
1,51,2021-02-20,2021,2,5
2,78,2021-02-20,2021,2,5
3,50,2021-02-20,2021,2,5
4,53,2021-02-20,2021,2,5
5,55,2021-02-20,2021,2,5
6,54,2021-02-20,2021,2,5
7,56,2021-02-20,2021,2,5
8,2,2021-02-19,2021,2,4
9,1,2021-02-19,2021,2,4


In [267]:
star_schema = []

In [268]:
dict = {}
dict["table_name"] = "factCovid"
dict["df"] = factCovid
star_schema.append(dict)

dict = {}
dict["table_name"] = "dimRegion"
dict["df"] = dimRegion
star_schema.append(dict)

dict = {}
dict["table_name"] = "dimHospital"
dict["df"] = dimHospital
star_schema.append(dict)

dict = {}
dict["table_name"] = "dimDate"
dict["df"] = dimDate
star_schema.append(dict)

In [269]:
print(star_schema[0]["table_name"])
print(star_schema[1]["table_name"])
print(star_schema[2]["table_name"])
print(star_schema[3]["table_name"])

factCovid
dimRegion
dimHospital
dimDate


In [270]:
#dimRegion.to_csv('dimRegion.csv')

# save fact and dimension tables to S3

In [271]:
S3_BUCKET_NAME

'sunny-general-bucket'

In [273]:
def load_csv_to_s3(star_schema: list):
    s3_resource = boto3.resource('s3', aws_access_key_id = AWS_ACCESS_KEY, aws_secret_access_key = AWS_SECRET_KEY, region_name = AWS_REGION)

    for i in range(len(star_schema)):
        csv_buffer = StringIO()
        #print(star_schema[i]["table_name"])
        star_schema[i]["df"].to_csv(csv_buffer, index=False)
        filename = 'output/' + star_schema[i]["table_name"] + '.csv'
      
        #if i==1:
            #print(filename)
            #print(csv_buffer.getvalue())
        s3_resource.Object(S3_BUCKET_NAME, filename).put(Body=csv_buffer.getvalue())                  

In [274]:
load_csv_to_s3(star_schema)    

# obtain DDL that can be used to create table on Redshift

In [275]:
star_schema[3]["table_name"]

'dimDate'

In [279]:
# use reset_index() to include index
#factCovidSQL = pd.io.sql.get_schema(factCovid.reset_index(), star_schema[0]["table_name"])
factCovidSQL = pd.io.sql.get_schema(factCovid, star_schema[0]["table_name"])
dimRegionSQL = pd.io.sql.get_schema(dimRegion, star_schema[1]["table_name"])
dimHospitalSQL = pd.io.sql.get_schema(dimHospital, star_schema[2]["table_name"])
dimDateSQL = pd.io.sql.get_schema(dimDate, star_schema[3]["table_name"])
print(''.join(factCovidSQL))
print(''.join(dimRegionSQL))
print(''.join(dimHospitalSQL))
print(''.join(dimDateSQL))

CREATE TABLE "factCovid" (
"fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "confirmed" REAL,
  "deaths" REAL,
  "recovered" REAL,
  "active" REAL,
  "date" INTEGER,
  "positive" INTEGER,
  "negative" REAL,
  "hospitalizedcurrently" REAL,
  "hospitalized" REAL,
  "hospitalizeddischarged" REAL
)
CREATE TABLE "dimRegion" (
"fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "latitude" REAL,
  "longitude" REAL,
  "county" TEXT,
  "state" TEXT
)
CREATE TABLE "dimHospital" (
"fips" INTEGER,
  "state_name" TEXT,
  "latitude" REAL,
  "longtitude" REAL,
  "hq_address" TEXT,
  "hospital_name" TEXT,
  "hospital_type" TEXT,
  "hq_city" TEXT,
  "hq_state" TEXT
)
CREATE TABLE "dimDate" (
"fips" INTEGER,
  "date" TIMESTAMP,
  "year" INTEGER,
  "month" INTEGER,
  "day_of_week" INTEGER
)


# use redshift_connector

In [280]:
import configparser

config = configparser.ConfigParser()
config.read_file(open('cluster.config'))

AWS_ACCESS_KEY = config.get("AWS","KEY")
AWS_SECRET_KEY = config.get("AWS","SECRET")
AWS_REGION = config.get("AWS","REGION")
SCHEMA_NAME = config.get("DB","SCHEMA")
DWH_CLUSTER_TYPE = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE = config.get("DWH","DWH_NODE_TYPE")
DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB = config.get("DWH","DWH_DB")
DWH_DB_USER = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD = config.get("DWH","DWH_DB_PASSWORD")
DWH_IAM_ROLE_NAME = config.get("DWH","DWH_IAM_ROLE_NAME")

In [281]:
import pandas as pd
import redshift_connector

In [198]:
iam = boto3.client('iam', region_name="us-east-2", aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_KEY)
redshift = boto3.client('redshift', region_name="us-east-2", aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_KEY)

def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', None)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "IamRoles", "VpcId"]
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

Unnamed: 0,key,Value
0,ClusterIdentifier,sunny-redshift
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,awsuser
4,DBName,testload
5,Endpoint,"{'Address': 'sunny-redshift.czyamnxhlqou.us-east-2.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-0e4182b61e5736a09
7,IamRoles,"[{'IamRoleArn': 'arn:aws:iam::724295183885:role/service-role/AmazonRedshift-CommandsAccessRole-20230212T180621', 'ApplyStatus': 'in-sync'}]"


In [199]:
DWH_ENDPOINT = myClusterProps["Endpoint"]["Address"]
DWH_PORT = myClusterProps["Endpoint"]["Port"]

In [200]:
print(DWH_DB)
print(DWH_DB_USER)

testload
awsuser


In [201]:
conn = redshift_connector.connect(host=DWH_ENDPOINT, port=DWH_PORT,
                                  database=DWH_DB, user=DWH_DB_USER, password=DWH_DB_PASSWORD)
conn.autocommit = True

In [202]:
cursor = redshift_connector.Cursor = conn.cursor()

In [203]:
cursor.execute("""
CREATE TABLE "factCovid" (
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "confirmed" REAL,
  "deaths" REAL,
  "recovered" REAL,
  "active" REAL,
  "date" INTEGER,
  "positive" INTEGER,
  "negative" REAL,
  "hospitalizedcurrently" REAL,
  "hospitalized" REAL,
  "hospitalizeddischarged" REAL
)

""")

<redshift_connector.cursor.Cursor at 0x2296cddac70>

In [204]:
cursor.execute("""
CREATE TABLE "dimRegion" (
 "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "latitude" REAL,
  "longitude" REAL,
  "county" TEXT,
  "state" TEXT
)

""")

<redshift_connector.cursor.Cursor at 0x2296cddac70>

In [205]:
cursor.execute("""
CREATE TABLE "dimHospital" (
  "fips" INTEGER,
  "state_name" TEXT,
  "latitude" REAL,
  "longtitude" REAL,
  "hq_address" TEXT,
  "hospital_name" TEXT,
  "hospital_type" TEXT,
  "hq_city" TEXT,
  "hq_state" TEXT
)
""")

<redshift_connector.cursor.Cursor at 0x2296cddac70>

In [206]:
cursor.execute("""
CREATE TABLE "dimDate" (
  "fips" INTEGER,
  "date" TIMESTAMP,
  "year" INTEGER,
  "month" INTEGER,
  "day_of_week" INTEGER
)
""")

<redshift_connector.cursor.Cursor at 0x2296cddac70>

In [207]:
cursor.execute("""
               COPY testload.public.dimdate (fips,date,year,month,day_of_week) 
               FROM 's3://sunny-general-bucket/output/dimDate.csv' 
               IAM_ROLE 'arn:aws:iam::724295183885:role/service-role/AmazonRedshift-CommandsAccessRole-20230212T180621' 
               FORMAT AS CSV DELIMITER ',' QUOTE '"' IGNOREHEADER 1 DATEFORMAT 'yyyy-mm-dd' REGION AS 'us-east-2'
               """)

<redshift_connector.cursor.Cursor at 0x2296cddac70>