In [None]:
import time
import configparser
from io import StringIO

import boto3
import psycopg2
import pandas as pd
import numpy as np
from botocore.exceptions import ClientError

from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

In [None]:
spark = SparkSession.builder.master("local[*]").appName("Join").getOrCreate()

In [None]:
config = configparser.ConfigParser()
config.read_file(open('covid19-analytics.config'))

KEY = config.get('AWS', 'KEY')
SECRET = config.get('AWS', 'SECRET')

TARGET_OUTPUT_BUCKET=config.get('S3', 'TARGET_OUTPUT_BUCKET')
TARGET_OUTPUT_S3 = config.get('S3', 'TARGET_OUTPUT_S3')
TARGET_OUTPUT_DIR=config.get('S3', 'TARGET_OUTPUT_DIR')
TARGET_REGION = config.get('S3', 'TARGET_REGION')
TMP_DIR = config.get('FILE_PATHS', 'TMP_DIR')

DWH_CLUSTER_TYPE = config.get('DWH', 'DWH_CLUSTER_TYPE')
DWH_NUM_NODES = config.get('DWH', 'DWH_NUM_NODES')
DWH_NODE_TYPE = config.get('DWH', 'DWH_NODE_TYPE')
DWH_CLUSTER_IDENTIFIER = config.get('DWH', 'DWH_CLUSTER_IDENTIFIER')
DWH_DB = config.get('DWH', 'DWH_DB')
DWH_DB_USER = config.get('DWH', 'DWH_DB_USER')
DWH_DB_PASSWORD = config.get('DWH', 'DWH_DB_PASSWORD')
DWH_PORT = config.get('DWH', 'DWH_PORT')
DWH_IAM_ROLE_NAME = config.get('DWH', 'DWH_IAM_ROLE_NAME')

In [None]:
OUTPUT_S3_CLIENT = boto3.client(
    's3', 
    region_name=TARGET_REGION,
    aws_access_key_id=KEY, 
    aws_secret_access_key=SECRET
)

redshift_client = boto3.client(
    'redshift',
    region_name=TARGET_REGION,
    aws_access_key_id=KEY,
    aws_secret_access_key=SECRET
)

ec2_client = boto3.resource(
    'ec2',
    region_name=TARGET_REGION,
    aws_access_key_id=KEY,
    aws_secret_access_key=SECRET
)

iam_client = boto3.client(
    'iam',
    region_name=TARGET_REGION,
    aws_access_key_id=KEY,
    aws_secret_access_key=SECRET
)

In [None]:
enigma_jhu = pd.read_csv(f'{TMP_DIR}/enigma_jhu.csv')
testing_data_states_daily = pd.read_csv(f'{TMP_DIR}/testing-datastates_daily.csv')

factCovid_1 = enigma_jhu[['fips', 'province_state', 'country_region', 'confirmed', 'deaths', 'recovered', 'active' ]]
factCovid_2 = testing_data_states_daily[['fips', 'date', 'positive', 'negative', 'hospitalizedcurrently', 'hospitalized', 'hospitalizeddischarged' ]]
factCovid = pd.merge(factCovid_1, factCovid_2, on='fips', how='inner')
print(len(factCovid))

factCovid = factCovid.drop_duplicates()

In [None]:
dimHospital = pd.read_csv(f'{TMP_DIR}/hospital-bedsjson.csv')
dimHospital =  dimHospital[['fips', 'state_name', 'latitude', 'longtitude', 'hq_address', 'hospital_name', 'hospital_type', 'hq_city', 'hq_state']]
dimHospital = dimHospital.rename(columns={'longtitude': 'longitude'})

dimHospital = dimHospital.drop_duplicates()

pd.to_numeric(dimHospital['latitude'], errors= 'coerce')
pd.to_numeric(dimHospital['longitude'], errors= 'coerce')

In [None]:
dimDate = pd.read_csv(f'{TMP_DIR}/testing-datastates_daily.csv')
dimDate = dimDate[['fips', 'date']]

dimDate['date'] = pd.to_datetime(dimDate['date'], format='%Y%m%d')
dimDate['year'] = dimDate['date'].dt.year
dimDate['month'] = dimDate['date'].dt.month
dimDate["day_of_week"] = dimDate['date'].dt.dayofweek

dimDate = dimDate.drop_duplicates()

dimDate['fips'] = dimDate['fips'].astype(float)
pd.to_datetime(dimDate['date'], errors= 'coerce')


In [None]:
enigma_jhu = spark.read.csv(
    f'{TMP_DIR}/enigma_jhu.csv', 
    header=True, 
    inferSchema=True
)

ny_times_us_county = spark.read.csv(
    f'{TMP_DIR}/us_county.csv', 
    header=True, 
    inferSchema=True
)

In [None]:
dimRegion_1 = enigma_jhu.select('fips', 'province_state', 'country_region', 'latitude', 'longitude')
dimRegion_2 = ny_times_us_county.select('fips', 'county', 'state')

dimRegion_1 = dimRegion_1.repartition(4, 'fips')
dimRegion_2 = dimRegion_2.repartition(4, 'fips')
dimRegion_2 = dimRegion_2.withColumnRenamed('fips', 'fips2')

In [None]:
dimRegion = dimRegion_1.join(
    dimRegion_2, 
    dimRegion_1["fips"] == dimRegion_2["fips2"], 
    "inner"
)

In [None]:
dimRegion = dimRegion.drop('fips2')
print(dimRegion.count())

dimRegion = dimRegion.distinct()
print(dimRegion.count())

In [None]:
dimRegion = dimRegion.toPandas()
dimRegion['fips'] = dimRegion['fips'].astype(float)

pd.to_numeric(dimRegion['latitude'], errors= 'coerce')
pd.to_numeric(dimRegion['longitude'], errors= 'coerce')

In [None]:
csv_buffer = StringIO()

In [None]:
factCovid.to_csv(csv_buffer)

OUTPUT_S3_CLIENT.put_object(
    Bucket=TARGET_OUTPUT_S3,
    Key=f'{TARGET_OUTPUT_DIR}/factCovid.csv',
    Body=csv_buffer.getvalue(),
    ContentType='text/csv'
)

In [None]:
dimHospital.to_csv(csv_buffer)

OUTPUT_S3_CLIENT.put_object(
    Bucket=TARGET_OUTPUT_S3,
    Key=f'{TARGET_OUTPUT_DIR}/dimHospital.csv',
    Body=csv_buffer.getvalue(),
    ContentType='text/csv'
)

In [None]:
dimDate.to_csv(csv_buffer)

OUTPUT_S3_CLIENT.put_object(
    Bucket=TARGET_OUTPUT_S3,
    Key=f'{TARGET_OUTPUT_DIR}/dimDate.csv',
    Body=csv_buffer.getvalue(),
    ContentType='text/csv'
)

In [None]:
dimRegion.to_csv(csv_buffer)

OUTPUT_S3_CLIENT.put_object(
    Bucket=TARGET_OUTPUT_S3,
    Key=f'{TARGET_OUTPUT_DIR}/dimRegion.csv',
    Body=csv_buffer.getvalue(),
    ContentType='text/csv'
)

In [None]:
# %rm -r -f {TMP_DIR}/* # Cleanup tmp directory

In [None]:
# Construct CREATE TABLE SQL dynamically from pandas dataframe
factCovid_sql = f"{pd.io.sql.get_schema(factCovid.reset_index(), 'factCovid')};"
staging_factCovid_sql =  f"{pd.io.sql.get_schema(factCovid.reset_index(), 'staging_factCovid')};"
print(factCovid_sql)

dimHospital_sql = f"{pd.io.sql.get_schema(dimHospital.reset_index(), 'dimHospital')};"
staging_dimHospital_sql = f"{pd.io.sql.get_schema(dimHospital.reset_index(), 'staging_dimHospital')};"
print(dimHospital_sql)

dimDate_sql = f"{pd.io.sql.get_schema(dimDate.reset_index(), 'dimDate')};"
staging_dimDate_sql = f"{pd.io.sql.get_schema(dimDate.reset_index(), 'staging_dimDate')};"
print(dimDate_sql)

dimRegion_sql = f"{pd.io.sql.get_schema(dimRegion.reset_index(), 'dimRegion')};"
staging_dimRegion_sql = f"{pd.io.sql.get_schema(dimRegion.reset_index(), 'staging_dimRegion')};"
print(dimRegion_sql)

In [None]:
# Method implements retries while obtaining redshift properties in case creating cluster is yet complete
def get_redshift_props(redshift_client, cluster_identifier):
    retries = 30
    retry_delay = 30 # Delay between retries in seconds
    for attempt in range(retries):
        try:
            clusterProps = redshift_client.describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters'][0]
            if clusterProps['ClusterAvailabilityStatus'] == 'Available':
                return clusterProps
            elif clusterProps['ClusterAvailabilityStatus'] != 'Available':
                if attempt < retries -1:
                    print(f"Cluster '{cluster_identifier}' not ready. Retrying in {retry_delay} seconds...")
                    time.sleep(retry_delay)
        except redshift_client.exceptions.ClusterNotFoundFault as e:
            if attempt < retries -1:
                print(f"Cluster '{cluster_identifier}' not found. Retrying in {retry_delay} seconds...")
                time.sleep(retry_delay/3)
            else:
                raise e # Raise the last exception if the retries are exhausted

In [None]:
def pretty_redshift_props(props):
    pd.set_option('display.max.colwidth', 0)
    keysToShow = ['ClusterIdentifier', 'ClusterStatus', 'NodeType', 'NumberOfNodes', 'DBName', 'MasterUsername', 'Endpoint', 'VpcId']
    x = [(k, v) for k, v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=['Parameter', 'value'])

In [None]:
clusterProps = get_redshift_props(redshift_client, DWH_CLUSTER_IDENTIFIER)
if clusterProps:
    prettyClusterProps = pretty_redshift_props(clusterProps)
    DWH_ENDPOINT = clusterProps['Endpoint']['Address']
    DWH_ROLE_ARN = clusterProps['IamRoles'][0]['IamRoleArn']

In [None]:
prettyClusterProps

In [None]:
try:
    vpc = ec2_client.Vpc(id=clusterProps['VpcId'])
    default_SG = list(vpc.security_groups.all())[0]
    print(default_SG)

    default_SG.authorize_ingress(
        GroupName=default_SG.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT),
    )
except ClientError as e:
    # Check for duplicate rule errors
    error_code = e.response['Error']['Code']
    if error_code == 'InvalidPermission.Duplicate':
        print('Security group rule exists, no further actions required')
    else:
        raise e
except Exception as e:
    raise e

In [None]:
try:
    conn = psycopg2.connect(
        host=DWH_ENDPOINT,
        dbname=DWH_DB,
        user=DWH_DB_USER,
        password=DWH_DB_PASSWORD,
        port=int(DWH_PORT)
    )
except Exception as e:
    print(e)

conn.set_session(autocommit=True)

In [None]:
try:
    cur = conn.cursor()
except Exception as e:
    print("Error: Could not obtain database cursor")
    print(e)

In [None]:
# Create tables
try:
    cur.execute(staging_factCovid_sql)
    cur.execute(factCovid_sql)
except Exception as e:
    print(e)

try:
    cur.execute(staging_dimHospital_sql)
    cur.execute(dimHospital_sql)
except Exception as e:
    print(e)

try:
    cur.execute(staging_dimDate_sql)
    cur.execute(dimDate_sql)
except Exception as e:
    print(e)
    
try:
    cur.execute(staging_dimRegion_sql)
    cur.execute(dimRegion_sql)
except Exception as e:
    print(e)

In [None]:
factCovid.head(2)

In [None]:
dimDate.head(2)

In [None]:
dimRegion.head(2)

In [None]:
dimHospital.head(2)

In [None]:
try:
    cur.execute(
    f"""
    copy staging_dimhospital
    from '{TARGET_OUTPUT_BUCKET}dimHospital.csv'
    credentials 'aws_iam_role={DWH_ROLE_ARN}'
    delimiter ','
    region '{TARGET_REGION}'
    IGNOREHEADER 1
    EMPTYASNULL
    BLANKSASNULL
    """
    )
except ClientError as error:
    print(error)
except Exception as e:
    print(e)

In [None]:
try:
    cur.execute(
    f"""
    copy staging_factCovid
    from '{TARGET_OUTPUT_BUCKET}factCovid.csv'
    credentials 'aws_iam_role={DWH_ROLE_ARN}'
    delimiter ','
    region '{TARGET_REGION}'
    IGNOREHEADER 1
    EMPTYASNULL
    BLANKSASNULL
    """
    )
except ClientError as error:
    print(error)
except Exception as e:
    print(e)

In [None]:
try:
    cur.execute(
    f"""
    copy staging_dimdate
    from '{TARGET_OUTPUT_BUCKET}dimDate.csv'
    credentials 'aws_iam_role={DWH_ROLE_ARN}'
    delimiter ','
    region '{TARGET_REGION}'
    IGNOREHEADER 1
    EMPTYASNULL
    BLANKSASNULL
    """
    )
except ClientError as error:
    print(error)
except Exception as e:
    print(e)

In [None]:
try:
    cur.execute(
    f"""
    copy staging_dimRegion
    from '{TARGET_OUTPUT_BUCKET}dimRegion.csv'
    credentials 'aws_iam_role={DWH_ROLE_ARN}'
    delimiter ','
    region '{TARGET_REGION}'
    IGNOREHEADER 1
    EMPTYASNULL
    BLANKSASNULL
    """
    )
except ClientError as error:
    print(error)
except Exception as e:
    print(e)

In [None]:
columns = [col for col in factCovid.columns if col != 'index']


select_cols = ({','.join(columns)})
select_sub = {','.join([f'sub.{col}' for col in columns])}

print(select_cols)
print(select_sub)

In [None]:
# Inserting unique 'dimHospital' records using all columes exclusing 'index' to verify uniqueness
columns = [col for col in dimHospital.columns if col != 'index']

insert_dimHospital = f"""
insert into dimHospital ({','.join(columns)})
select {','.join([f'sub.{col}' for col in columns])}
from (
    select {','.join(columns)},
        row_number() over (partition by {','.join(columns)} order by index) as row_num
    from staging_dimHospital
) sub
where row_num = 1;
"""


# Inserting unique 'dimDate' records using all columes exclusing 'index' to verify uniqueness
columns = [col for col in dimDate.columns if col != 'index']

insert_dimDate = f"""
insert into dimDate ({','.join(columns)})
select {','.join([f'sub.{col}' for col in columns])}
from (
    select {','.join(columns)},
        row_number() over (partition by {','.join(columns)} order by index) as row_num
    from staging_dimDate
) sub
where row_num = 1;
"""


# Inserting unique 'dimRegion' records using all columes exclusing 'index' to verify uniqueness
columns = [col for col in dimRegion.columns if col != 'index']

insert_dimRegion = f"""
insert into dimRegion ({','.join(columns)})
select {','.join([f'sub.{col}' for col in columns])}
from (
    select {','.join(columns)},
        row_number() over (partition by {','.join(columns)} order by index) as row_num
    from staging_dimRegion
) sub
where row_num = 1;
"""


# Inserting unique 'factCovid' records using all columes exclusing 'index' to verify uniqueness
columns = [col for col in factCovid.columns if col != 'index']

insert_factCovid = f"""
insert into factCovid ({','.join(columns)})
select {','.join([f'sub.{col}' for col in columns])}
from (
    select {','.join(columns)},
    from staging_factCovid
) sub
where row_num = 1;
"""

In [None]:
try:
    cur.execute(insert_factCovid)
except Exception as e:
    print(e)

In [None]:
column_names = [desc[0] for desc in cur.description]

In [None]:
rows =cur.fetchmany(10)
print(column_names)
for row in rows:
    print(row)

In [None]:
try:
    cur.execute("select * from users;")
except Exception as e:
    print("Unable to select from 'users' table")
    print(e)

In [None]:
rows =cur.fetchmany(10)
for row in rows:
    print(row)

In [None]:
"""
try:
    conn.close()
except psycopg2.Error as e:
    print(e)
"""

In [None]:
# redshift_client.delete_cluster(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER, SkipFinalClusterSnapshot=True)