In [None]:

import boto3
from io import StringIO
import pandas as pd
import time

In [None]:
AWS_ACCESS_KEY =""
AWS_SECRET_KEY =""
AWS_REGION = "ap-south-1"
SCHEMA_NAME = "coviddata"
S3_STAGING_DIR ="s3://n-bucket/output/"
S3_BUCKET_NAME = ""
S3_OUTPUT_DIRECTORY = "output"


In [None]:
athena_client = boto3.client(
    "athena",
    aws_access_key_id = AWS_ACCESS_KEY,
    aws_secret_access_key= AWS_SECRET_KEY,
    region_name = AWS_REGION ,
)

In [None]:
#Gets the data from athena and converts it into pandas dataframe
Dict = {}
def download_and_load_query_results(
    client: boto3.client, query_response: Dict
) -> pd.DataFrame:
    while True:
        try:
            print(client.get_query_results(QueryExecutionId=query_response["QueryExecutionId"]))
            break
        except Exception as err:
            #print(err)
            if "Query has not yet finished" or "not finish successfully" in str(err):
                time.sleep(0.01)
            else:
                raise err
    temp_file_location: str = "athena_query_results.csv"
    s3_client = boto3.client("s3",
                            aws_access_key_id = AWS_ACCESS_KEY,
                            aws_secret_access_key = AWS_SECRET_KEY,
                            region_name = AWS_REGION,)
    s3_client.download_file(S3_BUCKET_NAME,
                           f"{S3_OUTPUT_DIRECTORY}/{query_response['QueryExecutionId']}.csv",
                           temp_file_location,)
    return pd.read_csv(temp_file_location)


In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM countrycode",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

countrycode = download_and_load_query_results(athena_client, response)


In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM countypopulation",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

countypopulation = download_and_load_query_results(athena_client, response)


In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM state_abv",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

state_abv = download_and_load_query_results(athena_client, response)


In [None]:
newheader = state_abv.iloc[0]
state_abv = state_abv[1:]


In [None]:
state_abv.columns = newheader
state_abv.head()

In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM states_daily",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

states_daily = download_and_load_query_results(athena_client, response)
states_daily


In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM us_county",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

us_county = download_and_load_query_results(athena_client, response)


In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM us_daily",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

us_daily = download_and_load_query_results(athena_client, response)
us_daily

In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM us_states",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

us_states = download_and_load_query_results(athena_client, response)


In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM us_total_latest",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

us_total_latest = download_and_load_query_results(athena_client, response)


In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM csv",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)
enigma_jhud = download_and_load_query_results(athena_client, response)

In [None]:
enigma_jhud

In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM rearc_usa_hospital_beds",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

rearc_usa_hospital_beds = download_and_load_query_results(athena_client, response)
rearc_usa_hospital_beds

In [None]:
factCovid_1 = enigma_jhud[['fips','province_state','country_region','confirmed','deaths','recovered','active']]
factCovid_2 = states_daily[['fips','date','positive','negative','hospitalizedcurrently','hospitalized','hospitalizeddischarged']]
factCovid = pd.merge(factCovid_1,factCovid_2,on='fips',how='inner')

In [None]:
factCovid.shape

In [None]:
dimRegion_1 = enigma_jhud[['fips','province_state','country_region','latitude','longitude']]
dimRegion_2 = us_county[['fips','county','state']]
dimRegion = pd.merge(dimRegion_1,dimRegion_2,on='fips',how='inner')
dimRegion

In [None]:
dimHospital = rearc_usa_hospital_beds[['fips','state_name','latitude','longtitude','hq_address','hospital_name','hospital_type','hq_city','hq_state']]
dimHospital

In [None]:
dimDate = states_daily[['fips','date']]
dimDate['date'] = pd.to_datetime(dimDate['date'],format="%Y%m%d")
dimDate['year'] = dimDate['date'].dt.year
dimDate['month'] = dimDate['date'].dt.month
dimDate['dayofweek'] = dimDate['date'].dt.dayofweek

dimDate

In [None]:
bucket = "nani-bucket"
csv_buffer = StringIO()
dimDate.to_csv(csv_buffer)
s3_resources = boto3.resource("s3")
s3_resources.Object(bucket,'/covid-19-output/dimDate.csv').put(Body=csv_buffer.getvalue())

In [None]:
bucket = "nani-bucket"
csv_buffer = StringIO()
dimHospital.to_csv(csv_buffer)
s3_resources = boto3.resource("s3")
s3_resources.Object(bucket,'/covid-19-output/dimHospital.csv').put(Body=csv_buffer.getvalue())

In [None]:
bucket = "nani-bucket"
csv_buffer = StringIO()
dimRegion.to_csv(csv_buffer)
s3_resources = boto3.resource("s3")
s3_resources.Object(bucket,'/covid-19-output/dimRegion.csv').put(Body=csv_buffer.getvalue())

In [None]:
bucket = "nani-bucket"
csv_buffer = StringIO()
factCovid.to_csv(csv_buffer)
s3_resources = boto3.resource("s3")
s3_resources.Object(bucket,'/covid-19-output/factCovid.csv').put(Body=csv_buffer.getvalue())

In [None]:
csv_buffer.getvalue()

In [None]:
factCovidsql = pd.io.sql.get_schema(factCovid.reset_index(),'factCovid')
print("".join(factCovidsql))

In [None]:
dimRegionsql = pd.io.sql.get_schema(dimRegion.reset_index(),'dimRegion')
print("".join(dimRegionsql))

In [None]:
dimDatesql = pd.io.sql.get_schema(dimDate.reset_index(),'dimDate')
print("".join(dimDatesql))

In [None]:
dimHospitalsql = pd.io.sql.get_schema(dimHospital.reset_index(),'dimHospital')
print("".join(dimHospitalsql))

In [None]:
import redshift_connector
import numpy

In [None]:
conn = redshift_connector.Connection(
    host="redshift-cluster.c3wx34pgvqip.ap-south-1.redshift.amazonaws.com",
    database='dev',
    user='awsuser',
    password='Awsuser1512'
 )

In [None]:
conn.autocommit=True
cursor= redshift_connector.Cursor = conn.cursor()

In [None]:
cursor.execute("""
CREATE TABLE dimDate (
"index" INTEGER,
  "fips" REAL,
  "date" TIMESTAMP,
  "year" INTEGER,
  "month" INTEGER,
  "dayofweek" INTEGER
)
""")

In [None]:
cursor.execute("""
CREATE TABLE "dimRegion" (
"index" INTEGER,
"fips" REAL,
"province_state" TEXT,
"country_region" TEXT,
"latitude" REAL,
"longititude" REAL,
"county" TEXT,
"state" TEXT
)
""")

In [None]:

cursor.execute("""
CREATE TABLE "factCovid" (
  "index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "confirmed" REAL,
  "deaths" REAL,
  "recovered" REAL,
  "active" REAL,
  "date" INTEGER,
  "positive" INTEGER,
  "negative" REAL,
  "hospitalizedcurrently" REAL,
  "hospitalized" REAL,
  "hospitalizeddischarged" REAL
)
""")

In [None]:
cursor.execute("""
CREATE TABLE "dimHospital" (
"index" INTEGER,
  "fips" REAL,
  "state_name" TEXT,
  "latitude" REAL,
  "longtitude" REAL,
  "hq_address" TEXT,
  "hospital_name" TEXT,
  "hospital_type" TEXT,
  "hq_city" TEXT,
  "hq_state" TEXT
)
""")

In [None]:
cursor.execute("""
               COPY dimDate 
               FROM 's3://nani-bucket//covid-19-output/dimDate.csv'
               CREDENTIALS 'aws_iam_role=arn:aws:iam::345682574541:role/service-role/AmazonRedshift-CommandsAccessRole-20230703T030629' 
               DELIMITER ',' 
               REGION 'ap-south-1'  
               IGNOREHEADER 1
               """)

In [None]:
cursor.execute("""
               COPY dimHospital 
               FROM 's3://nani-bucket//covid-19-output/dimHospital.csv'
               CREDENTIALS 'aws_iam_role=arn:aws:iam::345682574541:role/service-role/AmazonRedshift-CommandsAccessRole-20230703T030629' 
               DELIMITER ',' 
               REGION 'ap-south-1'  
               IGNOREHEADER 1
               """)

In [None]:
cursor.execute("""
               COPY dimRegion 
               FROM 's3://{}//covid-19-output/dimRegion.csv'
               CREDENTIALS 'aws_iam_role=' 
               DELIMITER ',' 
               REGION 'ap-south-1'  
               IGNOREHEADER 1
               """)

In [None]:
cursor.execute("""
               COPY factCovid
               FROM 's3://{}//covid-19-output/factCovid.csv'
               CREDENTIALS 'aws_iam_role=' 
               DELIMITER ',' 
               REGION 'ap-south-1'  
               IGNOREHEADER 1
               """)

In [None]:
cursor.close()
conn.close()