In [113]:
import time
import pandas as pd
from io import StringIO
import boto3

AWS_ACCESS_KEY = ""
AWS_SECRET_KEY = ""
AWS_REGION = "us-east-2"
SCHEMA_NAME ="covid-db"
S3_STAGING_DIR="s3://nitish-athena-output-bucket/output/"
S3_BUCKET_NAME= "nitish-athena-output-bucket"
S3_OUTPUT_DIRECTORY="output"

athena_client = boto3.client("athena",aws_access_key_id = AWS_ACCESS_KEY,
                             aws_secret_access_key = AWS_SECRET_KEY,
                             region_name = AWS_REGION)
Dict ={}

def download_and_load_query_results(
     client: boto3.client, query_response: Dict
    )-> pd.DataFrame:
    while True:
        try:
            response = client.get_query_execution(QueryExecutionId=query_response["QueryExecutionId"])
            #if 'QueryExecution' in response and 'Status' in response['QueryExecution'] and 'State' in response['QueryExecution']['Status']:
            state = response['QueryExecution']['Status']['State']
            if state == 'FAILED':
                return False
            elif state == 'SUCCEEDED':
                client.get_query_results(QueryExecutionId=query_response["QueryExecutionId"])
            break
        except Exception as err:
            if "not yet finished" in str(err):
                time.sleep(5)
            else:
                raise err
    temp_file_location: str="aathena_query_results.csv"
    s3_client = boto3.client(
        "s3",
        aws_access_key_id=AWS_ACCESS_KEY,
        aws_secret_access_key=AWS_SECRET_KEY,
        region_name=AWS_REGION,
    )
    s3_client.download_file(
        S3_BUCKET_NAME,
        f"{S3_OUTPUT_DIRECTORY}/{query_response['QueryExecutionId']}.csv",
        temp_file_location,
    )
    return pd.read_csv(temp_file_location)

response = athena_client.start_query_execution(
    QueryString="SELECT * FROM enigma_jhud ",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
         "OutputLocation": S3_STAGING_DIR,
         "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)


enigma_jhud = download_and_load_query_results(athena_client, response)
enigma_jhud.head()

Unnamed: 0,fips,admin2,province_state,country_region,last_update,latitude,longitude,confirmed,deaths,recovered,active,combined_key
0,,,Anhui,China,2020-01-22T17:00:00,31.826,117.226,1.0,,,,"""Anhui"
1,,,Beijing,China,2020-01-22T17:00:00,40.182,116.414,14.0,,,,"""Beijing"
2,,,Chongqing,China,2020-01-22T17:00:00,30.057,107.874,6.0,,,,"""Chongqing"
3,,,Fujian,China,2020-01-22T17:00:00,26.079,117.987,1.0,,,,"""Fujian"
4,,,Gansu,China,2020-01-22T17:00:00,36.061,103.834,,,,,"""Gansu"


In [120]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM country_code",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
         "OutputLocation": S3_STAGING_DIR,
         "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)
country_code = download_and_load_query_results(athena_client, response)
country_code.head()

Unnamed: 0,country,alpha-2 code,alpha-3 code,numeric code,latitude,longitude
0,Afghanistan,AF,AFG,4.0,33.0,65.0
1,Albania,AL,ALB,8.0,41.0,20.0
2,Algeria,DZ,DZA,12.0,28.0,3.0
3,American Samoa,AS,ASM,16.0,-14.3333,-170.0
4,Andorra,AD,AND,20.0,42.5,1.6


In [121]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM county_population",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
         "OutputLocation": S3_STAGING_DIR,
         "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)
county_population = download_and_load_query_results(athena_client, response)
county_population.head()

Unnamed: 0,id,id2,county,state,population estimate 2018
0,0500000US01001,1001,Autauga,Alabama,55601
1,0500000US01003,1003,Baldwin,Alabama,218022
2,0500000US01005,1005,Barbour,Alabama,24881
3,0500000US01007,1007,Bibb,Alabama,22400
4,0500000US01009,1009,Blount,Alabama,57840


In [122]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM covid_19_testing_data_us_total",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
         "OutputLocation": S3_STAGING_DIR,
         "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)
covid_19_testing_data_us_total = download_and_load_query_results(athena_client, response)
covid_19_testing_data_us_total.head()

Unnamed: 0,positive,negative,pending,hospitalizedcurrently,hospitalizedcumulative,inicucurrently,inicucumulative,onventilatorcurrently,onventilatorcumulative,recovered,hash,lastmodified,death,hospitalized,total,totaltestresults,posneg,notes
0,1061101,5170081,2775,53793,111955,9486,4192,4712,373,153947,95064ba29ccbc20dbec397033dfe4b1f45137c99,2020-05-01T09:12:31.891Z,57266,111955,6233957,6231182,6231182,"""NOTE: """"total"""""


In [123]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM covid_19_testing_datastates_daily",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
         "OutputLocation": S3_STAGING_DIR,
         "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)
covid_19_testing_datastates_daily = download_and_load_query_results(athena_client, response)
covid_19_testing_datastates_daily.head()

Unnamed: 0,date,state,positive,probablecases,negative,pending,totaltestresultssource,totaltestresults,hospitalizedcurrently,hospitalizedcumulative,...,dataqualitygrade,deathincrease,hospitalizedincrease,hash,commercialscore,negativeregularscore,negativescore,positivescore,score,grade
0,20210307,AK,56886.0,,,,totalTestsViral,1731628.0,33.0,1293.0,...,,0,0,dc4bccd4bb885349d7e94d6fed058e285d4be164,0,0,0,0,0,
1,20210307,AL,499819.0,107742.0,1931711.0,,totalTestsPeopleViral,2323788.0,494.0,45976.0,...,,-1,0,997207b430824ea40b8eb8506c19a93e07bc972e,0,0,0,0,0,
2,20210307,AR,324818.0,69092.0,2480716.0,,totalTestsViral,2736442.0,335.0,14926.0,...,,22,11,50921aeefba3e30d31623aa495b47fb2ecc72fae,0,0,0,0,0,
3,20210307,AS,0.0,,2140.0,,totalTestsViral,2140.0,,,...,,0,0,96d23f888c995b9a7f3b4b864de6414f45c728ff,0,0,0,0,0,
4,20210307,AZ,826454.0,56519.0,3073010.0,,totalTestsViral,7908105.0,963.0,57907.0,...,,5,44,0437a7a96f4471666f775e63e86923eb5cbd8cdf,0,0,0,0,0,


In [124]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM covid_19_testing_dataus_daily",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
         "OutputLocation": S3_STAGING_DIR,
         "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)
covid_19_testing_dataus_daily = download_and_load_query_results(athena_client, response)
covid_19_testing_dataus_daily.head()

Unnamed: 0,date,states,positive,negative,pending,hospitalizedcurrently,hospitalizedcumulative,inicucurrently,inicucumulative,onventilatorcurrently,...,lastmodified,recovered,total,posneg,deathincrease,hospitalizedincrease,negativeincrease,positiveincrease,totaltestresultsincrease,hash
0,20210307,56,28755524.0,74579770.0,11808.0,40212.0,878613.0,8137.0,45475.0,2801.0,...,2021-03-07T24:00:00Z,,0,0,839,726,130414,41265,1156241,8b26839690cd05c0cef69cb9ed85641a76b5e78e
1,20210306,56,28714259.0,74449356.0,11783.0,41401.0,877887.0,8409.0,45453.0,2811.0,...,2021-03-06T24:00:00Z,,0,0,1674,503,142201,59620,1409138,d0c0482ea549c9d5c04a7c86acb6fc6a8095a592
2,20210305,56,28654639.0,74307155.0,12213.0,42541.0,877384.0,8634.0,45373.0,2889.0,...,2021-03-05T24:00:00Z,,0,0,2221,2781,271917,68787,1744417,a35ea4289cec4bb55c9f29ae04ec0fd5ac4e0222
3,20210304,56,28585852.0,74035238.0,12405.0,44172.0,874603.0,8970.0,45293.0,2973.0,...,2021-03-04T24:00:00Z,,0,0,1743,1530,177957,65487,1590984,a19ad6379a653834cbda3093791ad2c3b9fab5ff
4,20210303,56,28520365.0,73857281.0,11778.0,45462.0,873073.0,9359.0,45214.0,3094.0,...,2021-03-03T24:00:00Z,,0,0,2449,2172,267001,66836,1406795,9e1d2afda1b0ec243060d6f68a7134d011c0cb2a


In [126]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM nytimes_data_in_us_county",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
         "OutputLocation": S3_STAGING_DIR,
         "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)
nytimes_data_in_us_county = download_and_load_query_results(athena_client, response)
nytimes_data_in_us_county.head()

Unnamed: 0,date,county,state,fips,cases,deaths
0,2020-01-21,Snohomish,Washington,53061.0,1,0
1,2020-01-22,Snohomish,Washington,53061.0,1,0
2,2020-01-23,Snohomish,Washington,53061.0,1,0
3,2020-01-24,Cook,Illinois,17031.0,1,0
4,2020-01-24,Snohomish,Washington,53061.0,1,0


In [127]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM nytimes_data_in_us_states",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
         "OutputLocation": S3_STAGING_DIR,
         "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)
nytimes_data_in_us_states = download_and_load_query_results(athena_client, response)
nytimes_data_in_us_states.head()

Unnamed: 0,date,state,fips,cases,deaths
0,2020-01-21,Washington,53,1,0
1,2020-01-22,Washington,53,1,0
2,2020-01-23,Washington,53,1,0
3,2020-01-24,Illinois,17,1,0
4,2020-01-24,Washington,53,1,0


In [128]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM rearc_usa_hospital_beds",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
         "OutputLocation": S3_STAGING_DIR,
         "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)
rearc_usa_hospital_beds = download_and_load_query_results(athena_client, response)
rearc_usa_hospital_beds.head()

Unnamed: 0,objectid,hospital_name,hospital_type,hq_address,hq_address1,hq_city,hq_state,hq_zip_code,county_name,state_name,...,num_licensed_beds,num_staffed_beds,num_icu_beds,adult_icu_beds,pedi_icu_beds,bed_utilization,avg_ventilator_usage,potential_increase_in_bed_capac,latitude,longtitude
0,1,Phoenix VA Health Care System (AKA Carl T Hayd...,VA Hospital,650 E Indian School Rd,,Phoenix,AZ,85012,Maricopa,Arizona,...,129.0,129.0,0,0,,,0.0,0,33.495498,-112.066157
1,2,Southern Arizona VA Health Care System,VA Hospital,3601 S 6th Ave,,Tucson,AZ,85723,Pima,Arizona,...,295.0,295.0,2,2,,,2.0,0,32.181263,-110.965885
2,3,VA Central California Health Care System,VA Hospital,2615 E Clinton Ave,,Fresno,CA,93703,Fresno,California,...,57.0,57.0,2,2,,,2.0,0,36.773324,-119.779742
3,4,VA Connecticut Healthcare System - West Haven ...,VA Hospital,950 Campbell Ave,,West Haven,CT,6516,New Haven,Connecticut,...,216.0,216.0,1,1,,,2.0,0,41.2844,-72.95761
4,5,Wilmington VA Medical Center,VA Hospital,1601 Kirkwood Hwy,,Wilmington,DE,19805,New Castle,Delaware,...,60.0,60.0,0,0,,,1.0,0,39.740206,-75.606532


In [129]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM states_abv",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
         "OutputLocation": S3_STAGING_DIR,
         "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)
states_abv = download_and_load_query_results(athena_client, response)
states_abv.head()

Unnamed: 0,col0,col1
0,State,Abbreviation
1,Alabama,AL
2,Alaska,AK
3,Arizona,AZ
4,Arkansas,AR


In [130]:
new_header = states_abv.iloc[0] #grabbing first row for the header
new_header

col0           State
col1    Abbreviation
Name: 0, dtype: object

In [131]:
states_abv = states_abv[1:] #slice the df from 2nd row onwards
states_abv

Unnamed: 0,col0,col1
1,Alabama,AL
2,Alaska,AK
3,Arizona,AZ
4,Arkansas,AR
5,California,CA
6,Colorado,CO
7,Connecticut,CT
8,Delaware,DE
9,District of Columbia,DC
10,Florida,FL


In [132]:
states_abv.columns = new_header

In [133]:
states_abv.head()

Unnamed: 0,State,Abbreviation
1,Alabama,AL
2,Alaska,AK
3,Arizona,AZ
4,Arkansas,AR
5,California,CA


In [143]:
fact_covid1 = enigma_jhud[['fips','province_state','country_region','confirmed','deaths','recovered','active']]

In [135]:
fact_covid2 = covid_19_testing_datastates_daily[['fips','date','positive','negative','hospitalizedcurrently','hospitalized','hospitalizeddischarged']]

In [136]:
fact_covid = pd.merge(fact_covid1,fact_covid2,on='fips',how='inner')

In [137]:
fact_covid.head()

Unnamed: 0,fips,province_state,country_region,confirmed,deaths,recovered,active,date,positive,negative,hospitalizedcurrently,hospitalized,hospitalizeddischarged
0,72.0,Puerto Rico,US,3.0,0.0,0.0,,20210307,101327.0,305972.0,147.0,,
1,72.0,Puerto Rico,US,3.0,0.0,0.0,,20210306,101327.0,305972.0,147.0,,
2,72.0,Puerto Rico,US,3.0,0.0,0.0,,20210305,101066.0,305972.0,136.0,,
3,72.0,Puerto Rico,US,3.0,0.0,0.0,,20210304,100867.0,305972.0,171.0,,
4,72.0,Puerto Rico,US,3.0,0.0,0.0,,20210303,100765.0,305972.0,169.0,,


In [139]:
fact_covid.shape

(26418, 13)

In [147]:
dim_region1 = enigma_jhud[['fips','province_state','country_region','latitude','longitude']]
dim_region2 = nytimes_data_in_us_county[['fips','county','state']]
dim_region = pd.merge(dim_region1,dim_region2,on='fips',how='inner')
dim_region.head()

Unnamed: 0,fips,province_state,country_region,latitude,longitude,county,state
0,,Anhui,China,31.826,117.226,New York City,New York
1,,Anhui,China,31.826,117.226,Unknown,Rhode Island
2,,Anhui,China,31.826,117.226,New York City,New York
3,,Anhui,China,31.826,117.226,Unknown,Rhode Island
4,,Anhui,China,31.826,117.226,New York City,New York


In [146]:
dim_region.shape

(45101020, 7)

In [150]:
dim_hospital = rearc_usa_hospital_beds[['fips','state_name','latitude','longtitude','hq_address','hospital_name','hospital_type','hq_city','hq_state']]
dim_hospital.head()

Unnamed: 0,fips,state_name,latitude,longtitude,hq_address,hospital_name,hospital_type,hq_city,hq_state
0,4013.0,Arizona,33.495498,-112.066157,650 E Indian School Rd,Phoenix VA Health Care System (AKA Carl T Hayd...,VA Hospital,Phoenix,AZ
1,4019.0,Arizona,32.181263,-110.965885,3601 S 6th Ave,Southern Arizona VA Health Care System,VA Hospital,Tucson,AZ
2,6019.0,California,36.773324,-119.779742,2615 E Clinton Ave,VA Central California Health Care System,VA Hospital,Fresno,CA
3,9009.0,Connecticut,41.2844,-72.95761,950 Campbell Ave,VA Connecticut Healthcare System - West Haven ...,VA Hospital,West Haven,CT
4,10003.0,Delaware,39.740206,-75.606532,1601 Kirkwood Hwy,Wilmington VA Medical Center,VA Hospital,Wilmington,DE


In [160]:
dim_date=covid_19_testing_datastates_daily[['fips','date']]
dim_date.head()

Unnamed: 0,fips,date
0,2,20210307
1,1,20210307
2,5,20210307
3,60,20210307
4,4,20210307


In [161]:
dim_date['date']=pd.to_datetime(dim_date['date'],format='%Y%m%d')
dim_date.head()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dim_date['date']=pd.to_datetime(dim_date['date'],format='%Y%m%d')


Unnamed: 0,fips,date
0,2,2021-03-07
1,1,2021-03-07
2,5,2021-03-07
3,60,2021-03-07
4,4,2021-03-07


In [162]:
dim_date['year'] = dim_date['date'].dt.year
dim_date['month'] = dim_date['date'].dt.month
dim_date['day_of_week'] = dim_date['date'].dt.dayofweek
dim_date.head()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dim_date['year'] = dim_date['date'].dt.year
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dim_date['month'] = dim_date['date'].dt.month
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dim_date['day_of_week'] = dim_date['date'].dt.dayofweek


Unnamed: 0,fips,date,year,month,day_of_week
0,2,2021-03-07,2021,3,6
1,1,2021-03-07,2021,3,6
2,5,2021-03-07,2021,3,6
3,60,2021-03-07,2021,3,6
4,4,2021-03-07,2021,3,6


In [163]:
dim_date['is_weekend'] = dim_date['date'].dt.dayofweek > 4
dim_date.head()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dim_date['is_weekend'] = dim_date['date'].dt.dayofweek > 4


Unnamed: 0,fips,date,year,month,day_of_week,is_weekend
0,2,2021-03-07,2021,3,6,True
1,1,2021-03-07,2021,3,6,True
2,5,2021-03-07,2021,3,6,True
3,60,2021-03-07,2021,3,6,True
4,4,2021-03-07,2021,3,6,True


In [170]:
bucket = 'nitish-covid-de-project'
csv_buffer = StringIO()
fact_covid.to_csv(csv_buffer)
s3_resource = boto3.resource('s3',aws_access_key_id = AWS_ACCESS_KEY,
                             aws_secret_access_key = AWS_SECRET_KEY)
s3_resource.Object(bucket, 'output/fact_covid.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'DED15G22285T0ZPH',
  'HostId': 'Ea06+LMwjRpLNLdnNo56xA8a5Xz/26mAm2wAKAZboDqL3WLkq7XXXzN6jIGbdrEHeP8OHtW53Ug=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'Ea06+LMwjRpLNLdnNo56xA8a5Xz/26mAm2wAKAZboDqL3WLkq7XXXzN6jIGbdrEHeP8OHtW53Ug=',
   'x-amz-request-id': 'DED15G22285T0ZPH',
   'date': 'Wed, 01 Mar 2023 07:18:59 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"1b814dd620bec707ed02a098af1354bc"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 1},
 'ETag': '"1b814dd620bec707ed02a098af1354bc"',
 'ServerSideEncryption': 'AES256'}

In [184]:
csv_buffer1 = StringIO()
dim_hospital.to_csv(csv_buffer1)
s3_resource.Object(bucket, 'output/dim_hospital.csv').put(Body=csv_buffer1.getvalue())

{'ResponseMetadata': {'RequestId': 'R87228JPS0KQW135',
  'HostId': 'VRskH8BIx1sAznotH0gJu1R6lLByAneMxnpGxITrWv4uYVAKPCDG1hZngm9/tt+BkMvc4pDXALnGRwHUzZq1EA==',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'VRskH8BIx1sAznotH0gJu1R6lLByAneMxnpGxITrWv4uYVAKPCDG1hZngm9/tt+BkMvc4pDXALnGRwHUzZq1EA==',
   'x-amz-request-id': 'R87228JPS0KQW135',
   'date': 'Wed, 01 Mar 2023 07:39:45 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"a26c4e35d128fe6f64955ba9aac1d221"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"a26c4e35d128fe6f64955ba9aac1d221"',
 'ServerSideEncryption': 'AES256'}

In [207]:
csv_buffer2 = StringIO()
dim_date.to_csv(csv_buffer2)
s3_resource.Object(bucket, 'output/dim_date.csv').put(Body=csv_buffer2.getvalue())

{'ResponseMetadata': {'RequestId': 'SQNQZDCWJ03XNX2M',
  'HostId': 'VaVCwvVhgjqLkEhvlCP17guIqwchGahIpMbM3axVUrBVSx9go6V9iw+Wb7Fi8Drmnf9bTap7qIY=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'VaVCwvVhgjqLkEhvlCP17guIqwchGahIpMbM3axVUrBVSx9go6V9iw+Wb7Fi8Drmnf9bTap7qIY=',
   'x-amz-request-id': 'SQNQZDCWJ03XNX2M',
   'date': 'Thu, 02 Mar 2023 06:56:35 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"9f3efffe072a10d4588101c2666ad520"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"9f3efffe072a10d4588101c2666ad520"',
 'ServerSideEncryption': 'AES256'}

In [197]:
from io import BytesIO

In [198]:
c_file = BytesIO()

In [200]:
import gzip

In [206]:
with gzip.GzipFile(fileobj = c_file,mode='wb',compresslevel = 6) as gz:
    buff = StringIO()
    dim_region.to_csv(buff)
    gz.write(buff.getvalue().encode('utf-8','replace'))
c_file.seek(0)
s3_resource.Object(bucket, 'output/dim_region.csv').put(Body=c_file)

{'ResponseMetadata': {'RequestId': '3QK2329GXMSMPQ5Q',
  'HostId': 'ym3Bi40YdQJcjaXCMt5mh3W0HJkkX/USC9v0ZWXX8bNqvSgdc3BpFxRzlRVpoi/akr0KPA3Wny8=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'ym3Bi40YdQJcjaXCMt5mh3W0HJkkX/USC9v0ZWXX8bNqvSgdc3BpFxRzlRVpoi/akr0KPA3Wny8=',
   'x-amz-request-id': '3QK2329GXMSMPQ5Q',
   'date': 'Thu, 02 Mar 2023 06:21:41 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"c92b402ef541d07f361cc5e92188c041"',
   'server': 'AmazonS3',
   'content-length': '0',
   'connection': 'close'},
  'RetryAttempts': 0},
 'ETag': '"c92b402ef541d07f361cc5e92188c041"',
 'ServerSideEncryption': 'AES256'}

In [214]:
dim_date_sql = pd.io.sql.get_schema(dim_date.reset_index(),'dim_date')
print(dim_date_sql)

CREATE TABLE "dim_date" (
"index" INTEGER,
  "fips" INTEGER,
  "date" TIMESTAMP,
  "year" INTEGER,
  "month" INTEGER,
  "day_of_week" INTEGER,
  "is_weekend" INTEGER
)


In [215]:
dim_hospital_sql = pd.io.sql.get_schema(dim_hospital.reset_index(),'dim_hospital')
print(dim_hospital_sql)

CREATE TABLE "dim_hospital" (
"index" INTEGER,
  "fips" REAL,
  "state_name" TEXT,
  "latitude" REAL,
  "longtitude" REAL,
  "hq_address" TEXT,
  "hospital_name" TEXT,
  "hospital_type" TEXT,
  "hq_city" TEXT,
  "hq_state" TEXT
)


In [216]:
dim_region_sql = pd.io.sql.get_schema(dim_region.reset_index(),'dim_region')
print(dim_region_sql)

CREATE TABLE "dim_region" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "latitude" REAL,
  "longitude" REAL,
  "county" TEXT,
  "state" TEXT
)


In [217]:
fact_covid_sql = pd.io.sql.get_schema(fact_covid.reset_index(),'fact_covid')
print(fact_covid_sql)

CREATE TABLE "fact_covid" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "confirmed" REAL,
  "deaths" REAL,
  "recovered" REAL,
  "active" REAL,
  "date" INTEGER,
  "positive" REAL,
  "negative" REAL,
  "hospitalizedcurrently" REAL,
  "hospitalized" REAL,
  "hospitalizeddischarged" REAL
)


In [266]:
import redshift_connector


In [267]:
conn = redshift_connector.connect(
    host='redshift-cluster-1.cgtd8karybop.us-east-2.redshift.amazonaws.com',
    database='dev',
    user='awsuser',
    password='Mypassword1'
 )

In [268]:
conn.autocommit = True

In [269]:
cursor=redshift_connector.Cursor=conn.cursor()

In [270]:
cursor.execute("""CREATE TABLE "dim_date" (
"index" INTEGER,
  "fips" INTEGER,
  "date" TIMESTAMP,
  "year" INTEGER,
  "month" INTEGER,
  "day_of_week" INTEGER,
  "is_weekend" BOOLEAN
)
""")

<redshift_connector.cursor.Cursor at 0x20c13cf5fd0>

In [271]:
cursor.execute("""CREATE TABLE "dim_hospital" (
"index" INTEGER,
  "fips" REAL,
  "state_name" TEXT,
  "latitude" REAL,
  "longtitude" REAL,
  "hq_address" TEXT,
  "hospital_name" TEXT,
  "hospital_type" TEXT,
  "hq_city" TEXT,
  "hq_state" TEXT
)
""")

cursor.execute("""CREATE TABLE "dim_region" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "latitude" REAL,
  "longitude" REAL,
  "county" TEXT,
  "state" TEXT
)
""")

cursor.execute("""CREATE TABLE "fact_covid" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "confirmed" REAL,
  "deaths" REAL,
  "recovered" REAL,
  "active" REAL,
  "date" INTEGER,
  "positive" REAL,
  "negative" REAL,
  "hospitalizedcurrently" REAL,
  "hospitalized" REAL,
  "hospitalizeddischarged" REAL
)
""")

<redshift_connector.cursor.Cursor at 0x20c13cf5fd0>

In [272]:
cursor.execute("""copy dim_date from 's3://nitish-covid-de-project/output/dim_date.csv'
credentials 'aws_iam_role=arn:aws:iam::688666572633:role/redshift-role'
delimiter ',' region 'us-east-2'
IGNOREHEADER 1
""")

<redshift_connector.cursor.Cursor at 0x20c13cf5fd0>

In [273]:
cursor.execute("""copy dim_hospital from 's3://nitish-covid-de-project/output/dim_hospital.csv'
credentials 'aws_iam_role=arn:aws:iam::688666572633:role/redshift-role'
delimiter ',' region 'us-east-2'
IGNOREHEADER 1
""")

cursor.execute("""copy fact_covid from 's3://nitish-covid-de-project/output/fact_covid.csv'
credentials 'aws_iam_role=arn:aws:iam::688666572633:role/redshift-role'
delimiter ',' region 'us-east-2'
IGNOREHEADER 1
""")

<redshift_connector.cursor.Cursor at 0x20c13cf5fd0>

In [274]:
cursor.execute("""copy dim_region from 's3://nitish-covid-de-project/output/dim_region.csv'
credentials 'aws_iam_role=arn:aws:iam::688666572633:role/redshift-role'
delimiter ',' region 'us-east-2'
IGNOREHEADER 1 gzip
""")

<redshift_connector.cursor.Cursor at 0x20c13cf5fd0>