In [None]:
import boto3
import pandas as pd
from io import StringIO
import time

In [None]:
AWS_ACCESS_KEY = 'ACCESS_KEY'
AWS_SECRET_KEY = 'SECRET_KEY'
AWS_REGION = 'us-east-1'
SCHEMA_NAME = 'covid_19_database'
S3_STAGING_DIR = 's3://BUCKET_NAME/DIRECTORY_NAME/'
S3_BUCKET_NAME = 'BUCKET_NAME'
S3_OUTPUT_DIRECTORY = 'DIRECTORY_NAME'

In [None]:
athena_client = boto3.client(
    "athena",
    aws_access_key_id = AWS_ACCESS_KEY,
    aws_secret_access_key = AWS_SECRET_KEY,
    region_name = AWS_REGION
)

In [None]:
Dict = {}

def download_and_load_query_results(
        client: boto3.client, query_response: Dict
) -> pd.DataFrame:
    while True:
        try:
            # response = client.get_query_execution(QueryExecutionId = query_response["QueryExecutionId"])
            # print(results)
            
            client.get_query_results(
                QueryExecutionId = query_response["QueryExecutionId"]
            )
            break
        except Exception as err:
            if "not yet finished" in str(err):
                time.sleep(0.001)
            else:
                raise err
            
    temp_file_location: str = "athena_query_results.csv"
    s3_client = boto3.client(
        "s3",
        aws_access_key_id = AWS_ACCESS_KEY,
        aws_secret_access_key = AWS_SECRET_KEY,
        region_name = AWS_REGION,
    )
    s3_client.download_file(
        S3_BUCKET_NAME,
        f"{S3_OUTPUT_DIRECTORY}/{query_response['QueryExecutionId']}.csv",
        temp_file_location,
    )

    return pd.read_csv(temp_file_location)

In [None]:
response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM enigma_jhud LIMIT 10000",
    # QueryString = "SELECT * FROM nytimes_data_in_usa_countyus_county",
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)
# time.sleep(10)
# temp = athena_client.get_query_execution(QueryExecutionId = response["QueryExecutionId"])
# print(temp)
enigma_jhud = download_and_load_query_results(athena_client, response)

In [None]:
response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM nytimes_data_in_usa_countyus_county",
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

nytimes_data_in_usa_county = download_and_load_query_results(athena_client, response)

In [None]:
response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM nytimes_data_in_usa_stateus_states",
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

nytimes_data_in_usa_state = download_and_load_query_results(athena_client, response)

In [None]:
response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM rearc_covid_19_testing_data_states_daily_states_daily",
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

rearc_covid_19_testing_data_states_daily = download_and_load_query_results(athena_client, response)

In [None]:
response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM rearc_covid_19_testing_data_us_daily_us_daily",
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

rearc_covid_19_testing_data_us_daily = download_and_load_query_results(athena_client, response)

In [None]:
response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM rearc_covid_19_testing_data_us_total_latest_us_total_latest",
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

rearc_covid_19_testing_data_us_total_latest = download_and_load_query_results(athena_client, response)

In [None]:
# response = athena_client.start_query_execution(
#     QueryString = "SELECT * FROM rearc_usa_hospital_beds_rearc_usa_hospital_beds LIMIT 10",
#     QueryExecutionContext = {"Database": SCHEMA_NAME},
#     ResultConfiguration = {
#         "OutputLocation": S3_STAGING_DIR,
#         "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
#     },
# )

# rearc_usa_hospital_beds_rearc_usa_hospital_beds = download_and_load_query_results(athena_client, response)

In [None]:
response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM static_datacountrycode",
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

static_data_countrycode = download_and_load_query_results(athena_client, response)

In [None]:
response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM static_datacountypopulation",
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

static_data_countypopulation = download_and_load_query_results(athena_client, response)

In [None]:
response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM static_datastate_abv",
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

static_data_state_abv = download_and_load_query_results(athena_client, response)

In [None]:
static_data_state_abv.head()

In [None]:
new_header = static_data_state_abv.iloc[0]
static_data_state_abv.columns = new_header
static_data_state_abv = static_data_state_abv[1:]

In [None]:
static_data_state_abv.head()

In [None]:
enigma_jhud.columns

In [None]:
factCovid_1 = enigma_jhud[['fips', 'province_state', 'country_region', 'confirmed', 'deaths', 'recovered', 'active']]
factCovid_2 = rearc_covid_19_testing_data_states_daily[['fips', 'date', 'positive', 'negative', 'hospitalizedcurrently', 'hospitalized', 'hospitalizeddischarged']]

factCovid = pd.merge(factCovid_1, factCovid_2, on='fips', how='inner')

In [None]:
# print(factCovid.info())

In [None]:
dimRegion_1 = enigma_jhud[['fips', 'province_state', 'country_region','latitude', 'longitude']]
dimRegion_2 = nytimes_data_in_usa_county[['fips', 'county', 'state']]

dimRegion = pd.merge(dimRegion_1, dimRegion_2, on='fips', how='inner')

In [None]:
dimDate = rearc_covid_19_testing_data_states_daily[['fips', 'date']]

In [None]:
dimDate.head()

In [None]:
dimDate['date'] = pd.to_datetime(dimDate['date'], format = '%Y%m%d')

In [None]:
dimDate.head()

In [None]:
dimDate['year'] = dimDate['date'].dt.year
dimDate['month'] = dimDate['date'].dt.month
dimDate['day_of_week'] = dimDate['date'].dt.dayofweek

In [None]:
dimDate.head()

In [None]:
bucket = 'PRIMARY_BUCKET_NAME'  # DIFFERENT THEN PREVIOUS BUCKET

In [None]:
csv_buffer = StringIO()

In [None]:
factCovid.to_csv(csv_buffer)

In [None]:
s3_resource = boto3.resource('s3', aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key= AWS_SECRET_KEY)
s3_resource.Object(bucket, 'output/factCovid.csv').put(Body=csv_buffer.getvalue())

In [None]:
csv_buffer = StringIO()
dimRegion.to_csv(csv_buffer)
s3_resource = boto3.resource('s3', aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key= AWS_SECRET_KEY)
s3_resource.Object(bucket, 'output/dimRegion.csv').put(Body=csv_buffer.getvalue())

In [None]:
csv_buffer = StringIO()
dimDate.to_csv(csv_buffer)
s3_resource = boto3.resource('s3', aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key= AWS_SECRET_KEY)
s3_resource.Object(bucket, 'output/dimDate.csv').put(Body=csv_buffer.getvalue())

In [None]:
dimDatesql = pd.io.sql.get_schema(dimDate.reset_index(), 'dimDate')
print(''.join(dimDatesql))

In [None]:
factCovidsql = pd.io.sql.get_schema(factCovid.reset_index(), 'factCovid')
print(''.join(factCovidsql))

In [None]:
dimRegionsql = pd.io.sql.get_schema(dimRegion.reset_index(), 'dimRegion')
print(''.join(dimRegionsql))

In [None]:
# !pip install redshift_connector