In [3]:
import boto3
import pandas as pd
from io import StringIO
import time
import botocore.exceptions
import uuid

In [4]:
AWS_ACCESS_KEY = "******************"
AWS_SECRET_KEY = "******************"
AWS_REGION = "us-east-1"
SCHEMA_NAME = "covid_19"
S3_STAGING_DIR = "s3://******************/output/"
S3_BUCKET_NAME = "******************"
S3_OUTPUT_DIRECTORY = "output"

In [3]:
athena_client = boto3.client(
    "athena",
    aws_access_key_id = AWS_ACCESS_KEY,
    aws_secret_access_key = AWS_SECRET_KEY,
    region_name = AWS_REGION,
)

In [4]:
Dict = {}

def download_and_load_query_results(
    client: boto3.client, query_response: Dict, table_name: str
) -> pd.DataFrame:
    while True:
        try:
            client.get_query_results(
                QueryExecutionId=query_response["QueryExecutionId"]
            )
            break
        except Exception as err:
            if "not yet finished" in str(err):
                time.sleep(0.001)
            else:
                raise err
    temp_file_location: str = f"{table_name}_athena_query_results.csv"
    s3_client = boto3.client(
        "s3",
        aws_access_key_id=AWS_ACCESS_KEY,
        aws_secret_access_key=AWS_SECRET_KEY,
        region_name=AWS_REGION,
    )
    s3_client.download_file(
        S3_BUCKET_NAME,
        f"{S3_OUTPUT_DIRECTORY}/{query_response['QueryExecutionId']}.csv",
        temp_file_location,
    )
    return pd.read_csv(temp_file_location)

In [None]:
tables = [
    "nytimes_data_in_usa_us_county",
    "nytimes_data_in_usa_us_states",
    "rearc_covid_19_testing_data_states_daily",
    "rearc_covid_19_testing_data_us_daily",
    "rearc_covid_19_testing_data_us_total_latest",
    "static_datasets_countrycode",
    "static_datasets_countypopulation",
    "static_datasets_state_abv"
]

for table in tables:
    response = athena_client.start_query_execution(
        QueryString=f"SELECT * FROM {table} limit 30000",
        QueryExecutionContext={"Database": SCHEMA_NAME},
        ResultConfiguration={
            "OutputLocation": S3_STAGING_DIR,
            "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
        },
    )
    dataframe = download_and_load_query_results(athena_client, response, table)
    exec(f"{table} = dataframe")

# static_datasets_state_abv table with issue in columns name

In [70]:
static_datasets_state_abv.rename(columns=static_datasets_state_abv.iloc[0], inplace=True)

In [72]:
static_datasets_state_abv = static_datasets_state_abv[1:]

In [73]:
static_datasets_state_abv.head()

Unnamed: 0,State,Abbreviation
1,Alabama,AL
2,Alaska,AK
3,Arizona,AZ
4,Arkansas,AR
5,California,CA


# Data Modeling

In [77]:
factCovid_1 = enigma_jhud[['fips', 'province_state','country_region','confirmed','deaths','recovered','active']]
factCovid_2 = rearc_covid_19_testing_data_states_daily[['fips', 'date', 'positive', 'negative', 'hospitalizedcurrently', 'hospitalized', 'hospitalizeddischarged']]
factCovid = pd.merge(factCovid_1, factCovid_2, on='fips', how='inner')

In [80]:
dimRegion_1 = enigma_jhud[['fips', 'province_state','country_region','latitude','longitude']]
dimRegion_2 = nytimes_data_in_usa_us_county[['fips','county','state']]
dimRegion = pd.merge(dimRegion_1, dimRegion_2, on='fips', how='inner')

In [83]:
dimDate = rearc_covid_19_testing_data_states_daily[['fips','date']]

In [87]:
dimDate['date'] = pd.to_datetime(dimDate['date'], format='%Y%m%d')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['date'] = pd.to_datetime(dimDate['date'], format='%Y%m%d')


In [89]:
dimDate['year'] = dimDate['date'].dt.year
dimDate['month'] = dimDate['date'].dt.month
dimDate['day_of_week'] = dimDate['date'].dt.dayofweek

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['year'] = dimDate['date'].dt.year
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['month'] = dimDate['date'].dt.month
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['day_of_week'] = dimDate['date'].dt.dayofweek


In [102]:
bucket = '*************'

def send_dataframe_to_s3(dataframe, bucket, s3_key):
    csv_buffer = StringIO()
    dataframe.to_csv(csv_buffer, index=False)
    s3_resource = boto3.resource('s3')
    s3_resource.Object(bucket, s3_key).put(Body=csv_buffer.getvalue())

send_dataframe_to_s3(factCovid, bucket, 'output/factCovid.csv')
send_dataframe_to_s3(dimRegion, bucket, 'output/dimRegion.csv')
send_dataframe_to_s3(dimDate, bucket, 'output/dimDate.csv')

In [110]:
dimDatesql = pd.io.sql.get_schema(dimDate.reset_index(), 'dimDate')
print(''.join(dimDatesql))

CREATE TABLE "dimDate" (
"index" INTEGER,
  "fips" REAL,
  "date" TIMESTAMP,
  "year" INTEGER,
  "month" INTEGER,
  "day_of_week" INTEGER
)


In [112]:
factCovidsql = pd.io.sql.get_schema(factCovid.reset_index(), 'factCovid')
print(''.join(factCovidsql))

CREATE TABLE "factCovid" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "confirmed" REAL,
  "deaths" REAL,
  "recovered" REAL,
  "active" REAL,
  "date" INTEGER,
  "positive" INTEGER,
  "negative" REAL,
  "hospitalizedcurrently" REAL,
  "hospitalized" REAL,
  "hospitalizeddischarged" REAL
)


In [113]:
dimRegionsql = pd.io.sql.get_schema(dimRegion.reset_index(), 'dimRegion')
print(''.join(dimRegionsql))

CREATE TABLE "dimRegion" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "latitude" REAL,
  "longitude" REAL,
  "county" TEXT,
  "state" TEXT
)


In [5]:
import psycopg2

host = '**********************'
port = 5439 
database = 'dev'
user = ''**********************''
password = ''**********************''

# Connection to Redshift
conn = psycopg2.connect(
    host=host,
    port=port,
    user=user,
    password=password,
    database=database
)

In [6]:
conn.autocommit = True

In [7]:
cursor = conn.cursor()

In [136]:
cursor.execute("""
CREATE TABLE "dimDate" (
  "index" INTEGER,
  "fips" REAL,
  "date" TIMESTAMP,
  "year" INTEGER,
  "month" INTEGER,
  "day_of_week" INTEGER
  )
  """)

cursor.execute("""
CREATE TABLE "factCovid" (
  "index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "confirmed" REAL,
  "deaths" REAL,
  "recovered" REAL,
  "active" REAL,
  "date" INTEGER,
  "positive" INTEGER,
  "negative" REAL,
  "hospitalizedcurrently" REAL,
  "hospitalized" REAL,
  "hospitalizeddischarged" REAL
)
  """)

cursor.execute("""
CREATE TABLE "dimRegion" (
  "index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "latitude" REAL,
  "longitude" REAL,
  "county" TEXT,
  "state" TEXT
)
  """)

In [None]:
cursor.execute("""
COPY dimDate FROM 's3://**************/output/dimDate.csv'
CREDENTIALS 'aws_iam_role=arn:aws:iam::**************:role/redshift-s3-access'
DELIMITER ','
REGION 'us-east-1'
IGNOREHEADER 1;
""")

cursor.execute("""
COPY dimRegion FROM 's3://**************/output/dimRegion.csv'
CREDENTIALS 'aws_iam_role=arn:aws:iam::**************:role/redshift-s3-access'
DELIMITER ','
REGION 'us-east-1'
IGNOREHEADER 1;
""")

cursor.execute("""
COPY factCovid FROM 's3://**************/output/factCovid.csv'
CREDENTIALS 'aws_iam_role=arn:aws:iam::**************:role/redshift-s3-access'
DELIMITER ','
REGION 'us-east-1'
IGNOREHEADER 1;
""")