In [3]:
import boto3
import pandas as pd
from io import StringIO

In [102]:
import configparser
config = configparser.ConfigParser()
config.read('cluster.config')

['cluster.config']

### 1.2 Configure AWS Credentials

In [103]:
AWS_ACCESS_KEY_ID = config['AWS']['KEY']
AWS_SECRET_KEY = config['AWS']['SECRET']

AWS_REGION = "us-east-1"
SCHEMA_NAME = "covid_19"
S3_STAGING_DIR = "s3://sai-athena-output-bucket/output"
S3_BUCKET_NAME = "sai-athena-output-bucket"
S3_OUTPUT_DIRECTORY= "output"

## Query Data from AWS Athena
### Initialize Athena Client



In [104]:
athena_client = boto3.client('athena', region_name=AWS_REGION, aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_KEY)

#### Function to download query results

In [105]:
Dict ={}

def download_load_query_results(
        client: boto3.client, query_response: Dict
) -> pd.DataFrame:
    while True:
        try:
            client.get_query_execution(
                QueryExecutionId=query_response["QueryExecutionId"]
            )
            break
        except Exception as err:
            if "not yet finished" in str(err):
                time.sleep(0.001)
            else:
                raise err
    
    temp_file_location: str="athena_query_results.csv"
    s3_client = boto3.client(
        "s3",
        aws_access_key_id = AWS_ACCESS_KEY_ID,
        aws_secret_access_key = AWS_SECRET_KEY,
        region_name = AWS_REGION,
    )
    s3_client.download_file(
        S3_BUCKET_NAME,
        f"{S3_OUTPUT_DIRECTORY}/{query_response['QueryExecutionId']}.csv",
        temp_file_location
    )
    return pd.read_csv(temp_file_location)

#### Execute Athena Queries


In [None]:
query = f"SELECT * FROM countrycode;"  #  

# Start Query Execution
response = athena_client.start_query_execution(
    QueryString=query,
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={"OutputLocation": S3_STAGING_DIR},
)

# Extract QueryExecutionId
query_execution_id = response["QueryExecutionId"]
print(f"✅ Query Execution Started: {query_execution_id}")


✅ Query Execution Started: 74499f5f-5280-4957-88fc-687fe9b08466


In [33]:
response

{'QueryExecutionId': 'e82dba61-4420-4eac-b2b0-bdd6f38d17f9',
 'ResponseMetadata': {'RequestId': '78ac5a9a-c73d-4dad-b52a-7fd6444ef7bb',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Fri, 14 Feb 2025 12:25:16 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '59',
   'connection': 'keep-alive',
   'x-amzn-requestid': '78ac5a9a-c73d-4dad-b52a-7fd6444ef7bb'},
  'RetryAttempts': 0}}

In [34]:
print(f"Trying to download from: s3://{S3_BUCKET_NAME}/{S3_OUTPUT_DIRECTORY}/{response['QueryExecutionId']}.csv")


Trying to download from: s3://sai-athena-output-bucket/output/e82dba61-4420-4eac-b2b0-bdd6f38d17f9.csv


In [36]:
query = f"SELECT * FROM countrycode;"  #  

# Start Query Execution
response = athena_client.start_query_execution(
    QueryString=query,
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={"OutputLocation": S3_STAGING_DIR},
)

# Extract QueryExecutionId
query_execution_id = response["QueryExecutionId"]
print(f"✅ Query Execution Started: {query_execution_id}")
countrycode = download_load_query_results(athena_client, response)

✅ Query Execution Started: e0a78c3f-b1e8-475d-9893-4419a10fb27a


In [55]:
query = f"SELECT * FROM state_abv;"  #  

# Start Query Execution
response = athena_client.start_query_execution(
    QueryString=query,
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={"OutputLocation": S3_STAGING_DIR},
)

# Extract QueryExecutionId
query_execution_id = response["QueryExecutionId"]
print(f"✅ Query Execution Started: {query_execution_id}")
state_abv = download_load_query_results(athena_client, response)

✅ Query Execution Started: 4da5a9e4-4285-4693-ab84-ef2afff975e5


In [42]:
state_abv.head()

Unnamed: 0,State,Abbreviation
1,Alabama,AL
2,Alaska,AK
3,Arizona,AZ
4,Arkansas,AR
5,California,CA


In [41]:
#Make first row as header
state_abv.columns = state_abv.iloc[0]
state_abv = state_abv[1:]

In [56]:
query = f"SELECT * FROM us_states;"  #  

# Start Query Execution
response = athena_client.start_query_execution(
    QueryString=query,
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={"OutputLocation": S3_STAGING_DIR},
)

# Extract QueryExecutionId
query_execution_id = response["QueryExecutionId"]
print(f"✅ Query Execution Started: {query_execution_id}")
us_states = download_load_query_results(athena_client, response)

✅ Query Execution Started: 571c4703-dd8a-4959-a279-829e32fca139


In [57]:
query = f"SELECT * FROM us_total_latest;"  #  

# Start Query Execution
response = athena_client.start_query_execution(
    QueryString=query,
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={"OutputLocation": S3_STAGING_DIR},
)

# Extract QueryExecutionId
query_execution_id = response["QueryExecutionId"]
print(f"✅ Query Execution Started: {query_execution_id}")
us_total_latest = download_load_query_results(athena_client, response)

✅ Query Execution Started: 34c161a8-240a-4d34-b569-00a84a764335


In [58]:
query = f"SELECT * FROM us_daily;"  #  

# Start Query Execution
response = athena_client.start_query_execution(
    QueryString=query,
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={"OutputLocation": S3_STAGING_DIR},
)

# Extract QueryExecutionId
query_execution_id = response["QueryExecutionId"]
print(f"✅ Query Execution Started: {query_execution_id}")
us_daily = download_load_query_results(athena_client, response)

✅ Query Execution Started: 52aef6d0-788b-4630-8c5c-2fe23eb0f576


In [59]:
query = f"SELECT * FROM us_county;"  #  

# Start Query Execution
response = athena_client.start_query_execution(
    QueryString=query,
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={"OutputLocation": S3_STAGING_DIR},
)

# Extract QueryExecutionId
query_execution_id = response["QueryExecutionId"]
print(f"✅ Query Execution Started: {query_execution_id}")
us_county = download_load_query_results(athena_client, response)

✅ Query Execution Started: 68f7e56e-b81d-4871-b765-a478cac6bf94


In [146]:
query = f"SELECT * FROM states_daily;"  #  

# Start Query Execution
response = athena_client.start_query_execution(
    QueryString=query,
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={"OutputLocation": S3_STAGING_DIR},
)

# Extract QueryExecutionId
query_execution_id = response["QueryExecutionId"]
print(f"✅ Query Execution Started: {query_execution_id}")
states_daily = download_load_query_results(athena_client, response)

✅ Query Execution Started: 7f25f046-fe08-44e1-97ec-234a71cf4131


ClientError: An error occurred (404) when calling the HeadObject operation: Not Found

In [76]:
query = f"SELECT * FROM rearc_usa_hospital_beds;"  #  

# Start Query Execution
response = athena_client.start_query_execution(
    QueryString=query,
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={"OutputLocation": S3_STAGING_DIR},
)

# Extract QueryExecutionId
query_execution_id = response["QueryExecutionId"]
print(f"✅ Query Execution Started: {query_execution_id}")
rearc_usa_hospital_beds = download_load_query_results(athena_client, response)

✅ Query Execution Started: cdb610d5-a0e1-4d55-89ac-2fc79aece01a


In [None]:
import pandas as pd
import time

# Define Query
query = "SELECT * FROM csv;"  #  

# Start Query Execution
response = athena_client.start_query_execution(
    QueryString=query,
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={"OutputLocation": S3_STAGING_DIR},
)

# Extract QueryExecutionId
query_execution_id = response["QueryExecutionId"]
print(f"✅ Query Execution Started: {query_execution_id}")

# Function to wait for query completion
def wait_for_query_completion(athena_client, query_execution_id):
    while True:
        query_execution = athena_client.get_query_execution(QueryExecutionId=query_execution_id)
        status = query_execution["QueryExecution"]["Status"]["State"]
        
        if status in ["SUCCEEDED", "FAILED", "CANCELLED"]:
            return status
        time.sleep(2)  # Wait before retrying

# Wait for the query to complete
status = wait_for_query_completion(athena_client, query_execution_id)

if status == "SUCCEEDED":
    # Fetch query results
    results_paginator = athena_client.get_paginator("get_query_results")
    results_pages = results_paginator.paginate(QueryExecutionId=query_execution_id)

    # Extract column names from the first page
    first_page = next(iter(results_pages))
    column_info = first_page["ResultSet"]["ResultSetMetadata"]["ColumnInfo"]
    column_names = [col["Name"] for col in column_info]

    # Extract data rows
    rows = []
    for page in results_pages:
        for row in page["ResultSet"]["Rows"]:
            rows.append([field.get("VarCharValue", None) for field in row["Data"]])

    # Convert to DataFrame
    df = pd.DataFrame(rows[1:], columns=column_names)  # Exclude first row if it's headers
    print(f"✅ DataFrame Created with {len(df)} rows.")
else:
    print(f"❌ Query failed with status: {status}")
    df = None  # Set DataFrame to None in case of failure


✅ Query Execution Started: 970ff64c-0957-45a3-94e9-9aefa96fbe75
✅ DataFrame Created with 222804 rows.


In [147]:
import pandas as pd
import time

# Define Query
query = "SELECT * FROM states_daily;"  #  

# Start Query Execution
response = athena_client.start_query_execution(
    QueryString=query,
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={"OutputLocation": S3_STAGING_DIR},
)

# Extract QueryExecutionId
query_execution_id = response["QueryExecutionId"]
print(f"✅ Query Execution Started: {query_execution_id}")

# Function to wait for query completion
def wait_for_query_completion(athena_client, query_execution_id):
    while True:
        query_execution = athena_client.get_query_execution(QueryExecutionId=query_execution_id)
        status = query_execution["QueryExecution"]["Status"]["State"]
        
        if status in ["SUCCEEDED", "FAILED", "CANCELLED"]:
            return status
        time.sleep(2)  # Wait before retrying

# Wait for the query to complete
status = wait_for_query_completion(athena_client, query_execution_id)

if status == "SUCCEEDED":
    # Fetch query results
    results_paginator = athena_client.get_paginator("get_query_results")
    results_pages = results_paginator.paginate(QueryExecutionId=query_execution_id)

    # Extract column names from the first page
    first_page = next(iter(results_pages))
    column_info = first_page["ResultSet"]["ResultSetMetadata"]["ColumnInfo"]
    column_names = [col["Name"] for col in column_info]

    # Extract data rows
    rows = []
    for page in results_pages:
        for row in page["ResultSet"]["Rows"]:
            rows.append([field.get("VarCharValue", None) for field in row["Data"]])

    # Convert to DataFrame
    states_daily = pd.DataFrame(rows[1:], columns=column_names)  # Exclude first row if it's headers
    print(f"✅ DataFrame Created with {len(states_daily)} rows.")
else:
    print(f"❌ Query failed with status: {status}")
    states_daily = None  # Set DataFrame to None in case of failure


✅ Query Execution Started: aa577bb8-f1d6-41c6-b10e-c90161e253c8
✅ DataFrame Created with 20780 rows.


In [97]:
import boto3

# Initialize Athena Client
athena_client = boto3.client(
    "athena",
    region_name=AWS_REGION,
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_KEY
)

# Test Query
query = "SELECT 1;"

try:
    response = athena_client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={"Database": SCHEMA_NAME},
        ResultConfiguration={"OutputLocation": S3_STAGING_DIR},
    )
    print("✅ Athena Connection Successful! Query Started.")
    print(response)

except Exception as e:
    print("🚨 Athena Connection Failed:", e)


✅ Athena Connection Successful! Query Started.
{'QueryExecutionId': '671403bb-25bc-4b7a-84f2-6279711aa047', 'ResponseMetadata': {'RequestId': '29642a3c-ae7e-4b28-8664-0eac35ebd37c', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Fri, 14 Feb 2025 17:32:15 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '59', 'connection': 'keep-alive', 'x-amzn-requestid': '29642a3c-ae7e-4b28-8664-0eac35ebd37c'}, 'RetryAttempts': 0}}


In [98]:
import boto3

s3_client = boto3.client("s3", region_name=AWS_REGION)

try:
    response = s3_client.head_bucket(Bucket=S3_BUCKET_NAME)
    print(f"✅ S3 Bucket '{S3_BUCKET_NAME}' exists and is accessible.")
except Exception as e:
    print(f"🚨 Error: S3 Bucket '{S3_BUCKET_NAME}' does not exist or you lack permissions.")
    print(e)


✅ S3 Bucket 'sai-athena-output-bucket' exists and is accessible.


countrycode, state_abv, us_states,us_total_latest, us_daily,us_county,states_daily, rearc_usa_hospital_beds


## ETL

In [200]:
rearc_usa_hospital_beds.columns

Index(['objectid', 'hospital_name', 'hospital_type', 'hq_address',
       'hq_address1', 'hq_city', 'hq_state', 'hq_zip_code', 'county_name',
       'state_name', 'state_fips', 'cnty_fips', 'fips', 'num_licensed_beds',
       'num_staffed_beds', 'num_icu_beds', 'adult_icu_beds', 'pedi_icu_beds',
       'bed_utilization', 'avg_ventilator_usage',
       'potential_increase_in_bed_capac', 'latitude', 'longtitude'],
      dtype='object')

In [83]:
 us_total_latest.head()

Unnamed: 0,positive,negative,pending,hospitalizedcurrently,hospitalizedcumulative,inicucurrently,inicucumulative,onventilatorcurrently,onventilatorcumulative,recovered,hash,lastmodified,death,hospitalized,total,totaltestresults,posneg,notes
0,1061101,5170081,2775,53793,111955,9486,4192,4712,373,153947,95064ba29ccbc20dbec397033dfe4b1f45137c99,2020-05-01T09:12:31.891Z,57266,111955,6233957,6231182,6231182,"""NOTE: """"total"""""


## Data Transformation using Pandas

#### Process Fact Table (COVID Cases)

In [250]:
factcovid_1 = df[['fips','province_state','country_region','confirmed','deaths','recovered','active']]

factcovid_2 = states_daily[['fips','date','positive','negative','hospitalizedcurrently','hospitalized','hospitalizeddischarged']]
#only drop fips na rows
factcovid_1 = factcovid_1.dropna(subset=['fips'])
factcovid_2 = factcovid_2.dropna(subset=['fips'])
#change fips to int

factcovid_1['fips'] = factcovid_1['fips'].astype(int)
factcovid_2['fips'] = factcovid_2['fips'].astype(int)
factcovid = pd.merge(factcovid_1, factcovid_2, on='fips', how='inner')

In [251]:
factcovid.shape

(26418, 13)

In [252]:
factcovid.head()

Unnamed: 0,fips,province_state,country_region,confirmed,deaths,recovered,active,date,positive,negative,hospitalizedcurrently,hospitalized,hospitalizeddischarged
0,72,Puerto Rico,US,3,0,0,,20210307,101327,305972,147,,
1,72,Puerto Rico,US,3,0,0,,20210306,101327,305972,147,,
2,72,Puerto Rico,US,3,0,0,,20210305,101066,305972,136,,
3,72,Puerto Rico,US,3,0,0,,20210304,100867,305972,171,,
4,72,Puerto Rico,US,3,0,0,,20210303,100765,305972,169,,


#### Process Dimension Tables

In [253]:
dimRegion_1 = df[['fips','province_state','country_region','latitude','longitude']]
#drop null in fips

dimRegion_1 = dimRegion_1.dropna(subset=['fips'])
dimRegion_2 = us_county[['fips','county','state']]
dimRegion_2 = dimRegion_2.dropna(subset=['fips'])
#change fips to int
dimRegion_1['fips'] = dimRegion_1['fips'].astype(int)
dimRegion_2['fips'] = dimRegion_2['fips'].astype(int)

dimRegion = pd.merge(dimRegion_1, dimRegion_2, on='fips', how='inner')

In [254]:
dimRegion.head()

Unnamed: 0,fips,province_state,country_region,latitude,longitude,county,state
0,6037,California,US,34.052,-118.244,Los Angeles,California
1,6037,California,US,34.052,-118.244,Los Angeles,California
2,6037,California,US,34.052,-118.244,Los Angeles,California
3,6037,California,US,34.052,-118.244,Los Angeles,California
4,6037,California,US,34.052,-118.244,Los Angeles,California


In [173]:
#drop duplicated rows

dimRegion = dimRegion.drop_duplicates()

dimRegion.shape

(2212, 7)

In [174]:
dimDate= states_daily[['fips','date']]

In [177]:
#create date dimension

dimDate['date'] = pd.to_datetime(dimDate['date'])
dimDate['day'] = dimDate['date'].dt.day

dimDate['month'] = dimDate['date'].dt.month
dimDate['day_of_week'] = dimDate['date'].dt.dayofweek


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['date'] = pd.to_datetime(dimDate['date'])
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['day'] = dimDate['date'].dt.day
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['month'] = dimDate['date'].dt.month
A value is trying to be set on a copy of a slice from a DataFrame.
Try 

In [255]:
dimDate

Unnamed: 0,fips,date,day,month,day_of_week
0,2,2021-03-07,7,3,6
1,1,2021-03-07,7,3,6
2,5,2021-03-07,7,3,6
3,60,2021-03-07,7,3,6
4,4,2021-03-07,7,3,6
...,...,...,...,...,...
20775,53,2020-01-17,17,1,4
20776,53,2020-01-16,16,1,3
20777,53,2020-01-15,15,1,2
20778,53,2020-01-14,14,1,1


In [256]:
dimHospital = rearc_usa_hospital_beds[['fips','hq_state','hospital_name', 'hospital_type', 'hq_address',
       'hq_address1', 'hq_city', 'hq_zip_code', 'county_name',
       'state_name', 'state_fips', 'cnty_fips', 'num_licensed_beds',
       'num_staffed_beds', 'num_icu_beds', 'adult_icu_beds', 'pedi_icu_beds',
       'bed_utilization', 'avg_ventilator_usage',
       'potential_increase_in_bed_capac', 'latitude', 'longtitude']]

In [257]:
# store this in aws s3 bucket

csv_buffer = StringIO()

csv_buffer

<_io.StringIO at 0x1e850bc6680>

In [183]:
bucket=  'sai-covid19-project'

In [258]:
factcovid.to_csv(csv_buffer, index=False)

s3_resource = boto3.resource('s3')
s3_resource.Object(bucket, 'output/factcovid.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'REHZXGAFQFX4H647',
  'HostId': 'WQQTyy26XF/siTvjak7OzsNb+u3gz84O32kEbxTg9/2sq1MHPEUfHgv7lLoUTZHoWfI/fvLQ/pV3q3g2XaZUxYCcoEnKX1/7',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'WQQTyy26XF/siTvjak7OzsNb+u3gz84O32kEbxTg9/2sq1MHPEUfHgv7lLoUTZHoWfI/fvLQ/pV3q3g2XaZUxYCcoEnKX1/7',
   'x-amz-request-id': 'REHZXGAFQFX4H647',
   'date': 'Fri, 14 Feb 2025 21:25:29 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"c7fae93a360a2c72852017280c5d94f4"',
   'x-amz-checksum-crc32': '0IisNw==',
   'x-amz-checksum-type': 'FULL_OBJECT',
   'content-length': '0',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'ETag': '"c7fae93a360a2c72852017280c5d94f4"',
 'ChecksumCRC32': '0IisNw==',
 'ChecksumType': 'FULL_OBJECT',
 'ServerSideEncryption': 'AES256'}

In [260]:
csv_buffer.getvalue()

'fips,province_state,country_region,confirmed,deaths,recovered,active,date,positive,negative,hospitalizedcurrently,hospitalized,hospitalizeddischarged\r\n72,Puerto Rico,US,3,0,0,,20210307,101327,305972,147,,\r\n72,Puerto Rico,US,3,0,0,,20210306,101327,305972,147,,\r\n72,Puerto Rico,US,3,0,0,,20210305,101066,305972,136,,\r\n72,Puerto Rico,US,3,0,0,,20210304,100867,305972,171,,\r\n72,Puerto Rico,US,3,0,0,,20210303,100765,305972,169,,\r\n72,Puerto Rico,US,3,0,0,,20210302,100735,305972,162,,\r\n72,Puerto Rico,US,3,0,0,,20210301,100584,305972,155,,\r\n72,Puerto Rico,US,3,0,0,,20210228,100297,305972,150,,\r\n72,Puerto Rico,US,3,0,0,,20210227,100044,305972,174,,\r\n72,Puerto Rico,US,3,0,0,,20210226,99860,305972,169,,\r\n72,Puerto Rico,US,3,0,0,,20210225,99619,305972,184,,\r\n72,Puerto Rico,US,3,0,0,,20210224,99519,305972,242,,\r\n72,Puerto Rico,US,3,0,0,,20210223,99476,305972,225,,\r\n72,Puerto Rico,US,3,0,0,,20210222,99257,305972,233,,\r\n72,Puerto Rico,US,3,0,0,,20210221,99084,305972,217,,\

## Store Transformed Data in Amazon S3
 


In [262]:
#upload remaining 
csv_buffer = StringIO()
dimRegion.to_csv(csv_buffer, index=False)
s3_resource.Object(bucket, 'output/dimRegion.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': '0WSESASMSH80DKEG',
  'HostId': 'v2zPWtF4LsT+zvVXgDZAuEccFVzkC+pBBlHE/by24mWXupKckwb3zdvs7TtQebZDBUKW6LkzTmA=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'v2zPWtF4LsT+zvVXgDZAuEccFVzkC+pBBlHE/by24mWXupKckwb3zdvs7TtQebZDBUKW6LkzTmA=',
   'x-amz-request-id': '0WSESASMSH80DKEG',
   'date': 'Fri, 14 Feb 2025 21:27:19 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"41848ea49228a914d556d2b55d417f2c"',
   'x-amz-checksum-crc32': 'TYuEsA==',
   'x-amz-checksum-type': 'FULL_OBJECT',
   'content-length': '0',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'ETag': '"41848ea49228a914d556d2b55d417f2c"',
 'ChecksumCRC32': 'TYuEsA==',
 'ChecksumType': 'FULL_OBJECT',
 'ServerSideEncryption': 'AES256'}

In [263]:
csv_buffer = StringIO()
dimDate.to_csv(csv_buffer, index=False)

s3_resource.Object(bucket, 'output/dimDate.csv').put(Body=csv_buffer.getvalue())


{'ResponseMetadata': {'RequestId': '8D240YC6T7ZG03RF',
  'HostId': '1jpllxdBQ/eIWFJrp5v7Y2rQM0nc/BLrrh/qo4XeW98UEg1h2v4IXy5l2UiXLaxOkG57ZUD0Hic=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': '1jpllxdBQ/eIWFJrp5v7Y2rQM0nc/BLrrh/qo4XeW98UEg1h2v4IXy5l2UiXLaxOkG57ZUD0Hic=',
   'x-amz-request-id': '8D240YC6T7ZG03RF',
   'date': 'Fri, 14 Feb 2025 21:27:34 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"e0cd2ff62032d34ec7abf89426ac6e47"',
   'x-amz-checksum-crc32': '3P+DQg==',
   'x-amz-checksum-type': 'FULL_OBJECT',
   'content-length': '0',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'ETag': '"e0cd2ff62032d34ec7abf89426ac6e47"',
 'ChecksumCRC32': '3P+DQg==',
 'ChecksumType': 'FULL_OBJECT',
 'ServerSideEncryption': 'AES256'}

In [264]:
csv_buffer = StringIO()
dimHospital.to_csv(csv_buffer, index=False)

s3_resource.Object(bucket, 'output/dimHospital.csv').put(Body=csv_buffer.getvalue())


{'ResponseMetadata': {'RequestId': 'KN4BRYTD2FV1TA3A',
  'HostId': 'l4W8cMwBZf8o6NU36DiaTWfGF+Vw0vSd4ItsQec/2mUy8A2j3WiSBmdPFgyyIxyUp5qDcuWEWDY=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'l4W8cMwBZf8o6NU36DiaTWfGF+Vw0vSd4ItsQec/2mUy8A2j3WiSBmdPFgyyIxyUp5qDcuWEWDY=',
   'x-amz-request-id': 'KN4BRYTD2FV1TA3A',
   'date': 'Fri, 14 Feb 2025 21:27:38 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"dc470abf485f72b0b986583f145ee5f2"',
   'x-amz-checksum-crc32': 'YRSkqg==',
   'x-amz-checksum-type': 'FULL_OBJECT',
   'content-length': '0',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'ETag': '"dc470abf485f72b0b986583f145ee5f2"',
 'ChecksumCRC32': 'YRSkqg==',
 'ChecksumType': 'FULL_OBJECT',
 'ServerSideEncryption': 'AES256'}

In [265]:
print(''.join(pd.io.sql.get_schema(factcovid.reset_index(), 'factcovid',)))


CREATE TABLE "factcovid" (
"index" INTEGER,
  "fips" INTEGER,
  "province_state" TEXT,
  "country_region" TEXT,
  "confirmed" TEXT,
  "deaths" TEXT,
  "recovered" TEXT,
  "active" TEXT,
  "date" TEXT,
  "positive" TEXT,
  "negative" TEXT,
  "hospitalizedcurrently" TEXT,
  "hospitalized" TEXT,
  "hospitalizeddischarged" TEXT
)


In [267]:
print(''.join(pd.io.sql.get_schema(dimDate.reset_index(), 'dimDate',)))


CREATE TABLE "dimDate" (
"index" INTEGER,
  "fips" TEXT,
  "date" TIMESTAMP,
  "day" INTEGER,
  "month" INTEGER,
  "day_of_week" INTEGER
)


In [268]:
print(''.join(pd.io.sql.get_schema(dimHospital.reset_index(), 'dimHospital',)))


CREATE TABLE "dimHospital" (
"index" INTEGER,
  "fips" REAL,
  "hq_state" TEXT,
  "hospital_name" TEXT,
  "hospital_type" TEXT,
  "hq_address" TEXT,
  "hq_address1" TEXT,
  "hq_city" TEXT,
  "hq_zip_code" INTEGER,
  "county_name" TEXT,
  "state_name" TEXT,
  "state_fips" REAL,
  "cnty_fips" REAL,
  "num_licensed_beds" REAL,
  "num_staffed_beds" REAL,
  "num_icu_beds" INTEGER,
  "adult_icu_beds" INTEGER,
  "pedi_icu_beds" REAL,
  "bed_utilization" REAL,
  "avg_ventilator_usage" REAL,
  "potential_increase_in_bed_capac" INTEGER,
  "latitude" REAL,
  "longtitude" REAL
)


In [269]:
print(''.join(pd.io.sql.get_schema(dimRegion.reset_index(), 'dimRegion',)))


CREATE TABLE "dimRegion" (
"index" INTEGER,
  "fips" INTEGER,
  "province_state" TEXT,
  "country_region" TEXT,
  "latitude" TEXT,
  "longitude" TEXT,
  "county" TEXT,
  "state" TEXT
)


In [270]:
factcovid

Unnamed: 0,fips,province_state,country_region,confirmed,deaths,recovered,active,date,positive,negative,hospitalizedcurrently,hospitalized,hospitalizeddischarged
0,72,Puerto Rico,US,3,0,0,,20210307,101327,305972,147,,
1,72,Puerto Rico,US,3,0,0,,20210306,101327,305972,147,,
2,72,Puerto Rico,US,3,0,0,,20210305,101066,305972,136,,
3,72,Puerto Rico,US,3,0,0,,20210304,100867,305972,171,,
4,72,Puerto Rico,US,3,0,0,,20210303,100765,305972,169,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...
26413,72,Puerto Rico,US,3647,132,0,3515,20200320,14,114,,,
26414,72,Puerto Rico,US,3647,132,0,3515,20200319,5,56,,,
26415,72,Puerto Rico,US,3647,132,0,3515,20200318,5,31,,,
26416,72,Puerto Rico,US,3647,132,0,3515,20200317,5,13,,,


#### Load Data into Amazon Redshift

In [273]:
import redshift_connector

# Redshift Connection Details
REDSHIFT_HOST = "default-workgroup.503561422673.us-east-1.redshift-serverless.amazonaws.com"
REDSHIFT_PORT = 5439
REDSHIFT_DB = "dev"  # ✅ Correct database name
REDSHIFT_USER = "awsuser"
REDSHIFT_PASSWORD = "Passw0rd123"  # ✅ Ensure this is correct


In [274]:

try:
    # Establish Connection
    conn = redshift_connector.connect(
        host=REDSHIFT_HOST,
        port=REDSHIFT_PORT,
        database=REDSHIFT_DB,
        user=REDSHIFT_USER,
        password=REDSHIFT_PASSWORD
    )
    print("✅ Successfully connected to Redshift!")

    # Create a cursor object
    cursor = conn.cursor()

    # Execute a test query
    cursor.execute("SELECT version();")
    print("📊 Redshift Version:", cursor.fetchone())

except Exception as e:
    print("🚨 Redshift Connection Failed:", e)


🚨 Redshift Connection Failed: ('connection time out', TimeoutError(10060, 'A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond', None, 10060, None))


#### Create Tables in Redshift

In [248]:
cursor.execute("""CREATE TABLE "dimDate" (
"index" INTEGER,
  "fips" TEXT,
  "date" TIMESTAMP,
  "day" INTEGER,
  "month" INTEGER,
  "day_of_week" INTEGER
)""")

<redshift_connector.cursor.Cursor at 0x1e8544a1640>

In [249]:
cursor.execute("""
SELECT current_database();
""")

tables = cursor.fetchall()
print("Tables in Public Schema:", tables)


Tables in Public Schema: (['myfirstdb'],)


#### Load Data from S3 to Redshift

COPY dev.public.dimHospital FROM 's3://sai-covid19-project/output/dimHospital.csv' IAM_ROLE 'arn:aws:iam::503561422673:role/redshift-s3-access' FORMAT AS CSV DELIMITER ',' QUOTE '"' IGNOREHEADER 1 REGION AS 'us-east-1'