In [1]:
import boto3
import pandas as pd
import time
import configparser
import psycopg2

In [None]:
class Data:
    
    def __init__(self, database, user, password):
        
        #connect to database
        try:
            self.conn = psycopg2.connect("host=moecovid19.cukozics74gm.eu-west-3.redshift.amazonaws.com dbname=" + database + " user=" + user + " password=" + password + " port=5439")
        except psycopg2.Error as e:
            print(e)
        
        #start new session
        try:
            self.conn.set_session(autocommit=True)
        except psycopg2.Error as e:
            print(e)
            
        #get object from cursor
        try:
            self.cur = self.conn.cursor()
        except psycopg2.Error as e:
            print(e)
    
    
    #close connection
    def close_conn(self):
        try:
            self.cur.close()
            self.conn.close()
        except psycopg2.Error as e:
            print(e)
    
    #execute query
    def execute(self, query, param = ""):
        try:
            self.cur.execute(query, param)
        except psycopg2.Error as e:
            print(e)
    
    #print result
    def result(self):
        row = self.cur.fetchone()
        while row:
            print(row)
            row = self.cur.fetchone()

In [3]:
config = configparser.ConfigParser()
config.read_file(open("cluster.config"))
KEY = config.get("AWS", "KEY")
SECTRET = config.get("AWS", "SECTRET")
DWH_CLUSTER_TYPE = config.get("DWH", "DWH_CLUSTER_TYPE")
DWH_NUM_NODES = config.get("DWH", "DWH_NUM_NODES")
DWH_NODE_TYPE = config.get("DWH", "DWH_NODE_TYPE")
DWH_CLUSTER_IDENTIFIER = config.get("DWH", "DWH_CLUSTER_IDENTIFIER")
DWH_DB = config.get("DWH", "DWH_DB")
DWH_DB_USER = config.get("DWH", "DWH_DB_USER")
DWH_DB_PASSWORD = config.get("DWH", "DWH_DB_PASSWORD")
DWH_PORT = config.get("DWH", "DWH_PORT")
DWH_IAM_ROLE_NAME = config.get("DWH", "DWH_IAM_ROLE_NAME")

In [4]:
athena_client = boto3.client(
    "athena",
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY,
    region_name=AWS_REGION
)

In [5]:
Dict = {}
def download_and_load_query_results(client: boto3.client, query_response: Dict) -> pd.DataFrame:
    while True:
        try:
            df = client.get_query_results(
                QueryExecutionId=query_response['QueryExecutionId']
            )
            break
        except Exception as e:
            if "not yet finished" in str(e):
                time.sleep(0.001)
            else:
                raise e
    temp_file_location: str = "athena_query_results.csv"
    s3_client = boto3.client(
        "s3",
        aws_access_key_id=AWS_ACCESS_KEY,
        aws_secret_access_key=AWS_SECRET_KEY,
        region_name=AWS_REGION
    )
    s3_client.download_file(
        S3_BUCKET_NAME,
        query_response['QueryExecutionId'] + ".csv",
        temp_file_location
    )
    return pd.read_csv(temp_file_location)

In [6]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM enigma_jhud",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption":"SSE_S3"}
    }
)

enigma_jhud = download_and_load_query_results(athena_client, response)

In [7]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM countrycode",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption":"SSE_S3"}
    }
)

countrycode = download_and_load_query_results(athena_client, response)

In [8]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM countypopulation",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption":"SSE_S3"}
    }
)

countypopulation = download_and_load_query_results(athena_client, response)

In [9]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM rearc_usa_hospital_beds",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption":"SSE_S3"}
    }
)

rearc_usa_hospital_beds = download_and_load_query_results(athena_client, response)

In [10]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM state_abv",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption":"SSE_S3"}
    }
)

state_abv = download_and_load_query_results(athena_client, response)

In [11]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM states_daily",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption":"SSE_S3"}
    }
)

states_daily = download_and_load_query_results(athena_client, response)

In [12]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM us_county",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption":"SSE_S3"}
    }
)

us_county = download_and_load_query_results(athena_client, response)

In [13]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM us_daily",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption":"SSE_S3"}
    }
)

us_daily = download_and_load_query_results(athena_client, response)

In [14]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM us_states",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption":"SSE_S3"}
    }
)

us_states = download_and_load_query_results(athena_client, response)

In [15]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM us_total_latest",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption":"SSE_S3"}
    }
)

us_total_latest = download_and_load_query_results(athena_client, response)

In [16]:
new_header = state_abv.iloc[0]
state_abv = state_abv[1:]
state_abv.columns = new_header

In [17]:
factCovid_1 = enigma_jhud[['fips', 'province_state', 'country_region', 'confirmed', 'deaths', 'recovered']]
factCovid_2 = states_daily[["fips" , 'date', 'positive', 'negative']]
factCovid= pd.merge( factCovid_1, factCovid_2, on='fips', how="inner")

In [18]:
dimRegion_1 = enigma_jhud [['fips', 'province_state', 'country_region', 'latitude', 'longitude']]
dimRegion_2 = us_county[['fips', 'county', 'state']]
dimRegion = pd.merge(dimRegion_1, dimRegion_2, on='fips', how='inner')

In [19]:
dimhospital = rearc_usa_hospital_beds[['fips', 'state_name', "latitude", "longtitude", "hq_address", "hospital_name", 'hospital_type', 'hq_city', 'hq_state']]

In [20]:
dimdate = states_daily[['fips', 'date']]

In [21]:
dimdate['date'] = pd.to_datetime(dimdate['date'], format="%Y%m%d")

In [22]:
dimdate['year'] = dimdate['date'].dt.year
dimdate['month'] = dimdate['date'].dt.month
dimdate['day_of_week'] = dimdate['date'].dt.dayofweek

In [23]:
bucket = "moe-covid-project"

In [24]:
factCovid['negative'].fillna(0.0, inplace=True)
factCovid = factCovid[factCovid["province_state"].str.contains(",") == False]
dimRegion = dimRegion[dimRegion["province_state"].str.contains(",") == False]

In [25]:
s3_resource = boto3.resource('s3',aws_access_key_id=AWS_ACCESS_KEY,
        aws_secret_access_key=AWS_SECRET_KEY,
        region_name=AWS_REGION)
s3_resource.Object(bucket, "output/factCovid.csv").put(Body=factCovid.to_csv(header=False))

In [26]:
s3_resource.Object(bucket, "output/dimdate.csv").put(Body=dimdate.to_csv(header=False))

In [27]:
s3_resource.Object(bucket, "output/dimRegion.csv").put(Body=dimRegion.to_csv(header=False))
s3_resource.Object(bucket, "output/dimhospital.csv").put(Body=dimhospital.to_csv(header=False))

In [28]:
dimDate_schema = pd.io.sql.get_schema(dimdate.reset_index(), "dimDate")
dimHospital_schema = pd.io.sql.get_schema(dimhospital.reset_index(), "dimHospital")
dimRegion_schema = pd.io.sql.get_schema(dimRegion.reset_index(), "dimRegion")
factCovid_schema = pd.io.sql.get_schema(factCovid.reset_index(), "factCovid")

In [29]:
conn = Data("myfirstdb", "awsuser", "Mm581990")

In [30]:
schema = [dimDate_schema, dimHospital_schema, dimRegion_schema, factCovid_schema]
for i in schema:
    conn.execute(str(i))

In [31]:
conn.execute("""
    copy dimHospital from 's3://moe-covid-project/output/dimhospital.csv'
    credentials 'aws_iam_role=arn:aws:iam::558689090956:role/moe-test'
    region 'eu-west-3'
    delimiter ','
    """
)

conn.execute("""
    copy dimdate from 's3://moe-covid-project/output/dimdate.csv'
    credentials 'aws_iam_role=arn:aws:iam::558689090956:role/moe-test'
    region 'eu-west-3'
    delimiter ','
    """
)

conn.execute("""
    copy dimregion from 's3://moe-covid-project/output/dimRegion.csv'
    credentials 'aws_iam_role=arn:aws:iam::558689090956:role/moe-test'
    region 'eu-west-3'
    delimiter ','
    """
)

conn.execute("""
    copy factcovid from 's3://moe-covid-project/output/factCovid.csv'
    credentials 'aws_iam_role=arn:aws:iam::558689090956:role/moe-test'
    region 'eu-west-3'
    delimiter ','
    """
)