In [None]:
import boto3
import pandas as pd
import time
from io import StringIO # python3; python2: BytesIO

In [3]:
AWS_ACCESS_KEY = ""
AWS_SECRET_KEY = ""
AWS_REGION = "ap-south-1"
SCHEMA_NAME = "covid_19"
S3_STAGING_DIR = "s3://covid_data_analysis_project/output/"
S3_BUCKET_NAME = "covid_data_analysis_project"
S3_OUTPUT_DIRECTORY = "output"

In [None]:
athena_client = boto3.client(
"athena",
aws_access_key_id=AWS_ACCESS_KEY,
aws_secret_access_key=AWS_SECRET_KEY,
region_name=AWS_REGION,
)

In [None]:
Dict = {}
def download_and_load_query_results(client: boto3.client, query_response: Dict )-> pd.DataFrame:
    while True:
        try:
        #This function only loads the first 1000 rows 
            client.get_query_results(
                QueryExecutionId=query_response["QueryExecutionId"])
            break
        except Exception as err:
            if "not yet finished" in str(err):
                time.sleep(0.001)
            else:
                raise err
        temp_file_location: str = "athena_query_results.csv"
        s3_client = boto3.client(
            "s3",
            aws_access_key_id=AWS_ACCESS_KEY,
            aws_secret_access_key=AWS_SECRET_KEY,
            region_name=AWS_REGION)
        s3_client.download_file(S3_BUCKET_NAME,
                                f" {S3_OUTPUT_DIRECTORY}/{query_response['QueryExecutionId']}.csv",
                                temp_file_location)
        return pd.read_csv(temp_file_location)

In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM enigma_jhud",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_3"}})

In [None]:
response

In [None]:
enigma_jhud = download_and_load_query_results(athena_client, response)

In [None]:
enigma_jhud.head()

In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM nytimes_data_in_usa_us_county",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
    "OutputLocation": S3_STAGING_DIR,
    "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"}})

In [None]:
nytimes_data_in_usa_us_county = download_and_load_query_results (athena_client, response)

In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM nytimes_data_in_usa_us_states",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"}})

In [None]:
nytimes_data_in_usa_us_states = download_and_load_query_results (athena_client, response)

In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM static_datacountrycode",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"}})
static_datacountrycode = download_and_load_query_results(athena_client, response)

In [None]:
response = athena_client.start_query_execution (
    QueryString="SELECT * FROM static_datacountypopulation",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"}})
static_datacountypopulation = download_and_load_query_results(athena_client, response)

In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM rearc_covid_19_testing_dataus_daily",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"}})
rearc_covid_19_testing_dataus_daily = download_and_load_query_results(athena_client, response)
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM rearc_usa_hospital_beds",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration=
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"}})
rearc_usa_hospital_beds = download_and_load_query_results (athena_client, response)
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM rearc_covid_19_testing_dataus_total_latest",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"}})
rearc_covid_19_testing_dataus_total_latest = download_and_load_query_results(athena_client, response)

In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM rearc_covid_19_testing_data_states_dailystates_daily",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"}})
rearc_covid_19_testing_data_states_dailystates_daily = download_and_load_query_results (athena_client, response)

In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM static_datastate_abv",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"}})
static_datastate_abv = download_and_load_query_results(athena_client, response)

In [None]:
static_datastate_abv.head()

In [None]:
new_header = static_datastate_abv.iloc[0]

In [None]:
static_datastate_abv = static_datastate_abv[1:]

In [None]:
static_datastate_abv.columns = new_header

In [None]:
static_datastate_abv.head()

In [None]:
factCovid_1 = enigma_jhud[['fips', 'province_state', 'country_region', 'confirmed', 'deaths', 'recovered', 'active']]
factCovid_2 = rearc_covid_19_testing_data_states_dailystates_daily [['fips', 'date', 'positive', 'negative', 'hospitalizedcurrently', 'hospitalized', 'hospitalizeddischarged']] 
factCovid= pd.merge(factCovid_1, factCovid_2, on='fips', how='inner')

In [None]:
factCovid.shape

In [None]:
dimRegion_1 = enigma_jhud[['fips', 'province_state', 'country_region', 'latitude', 'longitude']]
dimRegion_2 = nytimes_data_in_usa_us_county [['fips', 'county', 'state']]
dimRegion = pd.merge(dimRegion_1, dimRegion_2, on='fips', how='inner')

In [None]:
dimHospital = rearc_usa_hospital_beds[['fips', 'state_name', 'latitude', 'longtitude', 'hq_address', 'hospital_name', 'hospital_type', 'hq_city', 'hq_state']]

In [None]:
dimDate = rearc_covid_19_testing_data_states_dailystates_daily[['fips', 'date']]

In [None]:
dimDate.head()

In [None]:
dimDate['date'] = pd.to_datetime(dimDate['date'], format='%Y%m%d')

In [None]:
dimDate.head()

In [None]:
dimDate['year'] = dimDate['date'].dt.year
dimDate['month'] = dimDate['date'].dt.month
dimDate['day_of_week'] = dimDate['date'].dt.dayofweek

In [None]:
dimDate.head()

In [None]:
bucket_name = 'covid_data_analysis_project'

In [None]:
csv_buffer = StringIO()

In [None]:
factCovid.to_csv(csv_buffer)

In [None]:
s3_resource = boto3.resource('s3')
s3_resource.Object(bucket_name, 'transformed/output/factCovid.csv').put(Body=csv_buffer.getvalue())

In [None]:
csv_buffer.getvalue()

In [None]:
csv_buffer = StringIO()
dimRegion.to_csv(csv_buffer)
s3_resource.Object(bucket_name, 'transformed/output/dimRegion.csv').put(Body=csv_buffer.getvalue())

In [None]:
csv_buffer = StringIO()
dimDate.to_csv(csv_buffer)
s3_resource.Object(bucket_name, 'transformed/output/dimDate.csv').put(Body=csv_buffer.getvalue())

In [None]:
csv_buffer = StringIO()
dimHospital.to_csv(csv_buffer)
s3_resource.Object(bucket_name, 'transformed/output/dimHospital.csv').put(Body=csv_buffer.getvalue())

In [None]:
dimHospitalsql = pd.io.sql.get_schema(dimHospital.reset_index(), 'dimHospital')
print(''.join(dimHospitalsql))

In [None]:
dimDatesql = pd.io.sql.get_schema(dimDate.reset_index(), 'dimDate')
print(''.join(dimDatesql))

In [None]:
dimRegionsql = pd.io.sql.get_schema(dimRegion.reset_index(), 'dimRegion')
print(''.join(dimRegionsql))

In [None]:
factCovidsql = pd.io.sql.get_schema(factCovid.reset_index(), 'factCovid')
print(''.join(factCovidsql))

In [None]:
import redshift_connector

In [None]:
conn = redshift_connector.connect(
    host='',
    port='',
    user='',
    password='')

In [None]:
conn.autocommit = True

In [None]:
cursor = redshift_connector.Cursor = conn.cursor()

In [None]:
cursor.execute(""" CREATE TABLE "dimDate" (
    "index" INTEGER,
    "fips" INTEGER,
    "date" TIMESTAMP,
    "year" INTEGER,
    "month" INTEGER,
    "day_of_week" INTEGER
)
""")

In [None]:
cursor.execute("""CREATE TABLE "dimHospital" (
    "index" INTEGER,
    "fips" REAL,
    "state_name" TEXT, "latitude" REAL,
    "longtitude" REAL,
    "hq_address" TEXT,
    "hospital_name" TEXT,
    "hospital_type" TEXT,
    "hq_city" TEXT,
    "hq_state" TEXT)""")

cursor.execute("""CREATE TABLE "factCovid" (
    "index" INTEGER,
    "fips" REAL,
    "province_state" TEXT,
    "country_region" TEXT,
    "confirmed" REAL,
    "deaths" REAL,
    "recovered" REAL,
    "active" REAL,
    "date" INTEGER,
    "positive" REAL,
    "negative" REAL,
    "hospitalizedcurrently" REAL,
    "hospitalized" REAL,
    "hospitalizeddischarged" REAL)""")

In [None]:
cursor.execute("""CREATE TABLE "dimRegion" (
    "index" INTEGER,
    "fips" REAL,
    "province_state" TEXT,
    "country_region" TEXT,
    "latitude" REAL,
    "longitude" REAL,
    "county" TEXT,
    "state" TEXT)""")

In [None]:
cursor.execute("""
copy factCovid from 's3://covid_data_analysis_project/transformed/output/factCovid.csv'
credentials 'aws_iam_role=arn:aws:iam::206986907456:role/redshift-s3-access'
delimiter','
region 'ap-south-1'
IGNOREHEADER 1""")

In [None]:
cursor.execute("""
copy dimRegion from 's3://covid_data_analysis_project/transformed/output/dimRegion.csv'
credentials 'aws_iam_role=arn:aws:iam::206986907456:role/redshift-s3-access'
delimiter','
region 'ap-south-1'
IGNOREHEADER 1""")

In [None]:
cursor.execute("""
copy dimDate from 's3://covid_data_analysis_project/transformed/output/dimDate.csv'
credentials 'aws_iam_role=arn:aws:iam::206986907456:role/redshift-s3-access'
delimiter','
region 'ap-south-1'
IGNOREHEADER 1""")

In [None]:
cursor.execute("""
copy dimHospital from 's3://covid_data_analysis_project/transformed/output/dimHospital.csv'
credentials 'aws_iam_role=arn:aws:iam::206986907456:role/redshift-s3-access'
delimiter','
region 'ap-south-1'
IGNOREHEADER 1""")