### Importing Required Libraries

- **`boto3`**: Used to interact with AWS services such as S3 and Athena.
- **`pandas`**: A powerful library for data manipulation and analysis, especially for working with tabular data.
- **`StringIO`**: Enables handling in-memory string-based file operations, useful for working with CSV data.
- **`time`**: Provides functionality for adding delays, such as waiting for AWS Athena queries to complete.
- **`Dict`**: A type annotation from the `typing` module for defining dictionary structures in functions or variables.

In [129]:
import boto3
import pandas as pd
from io import StringIO #python3; python2: BytesIO
import time
from typing import Dict

### AWS Configuration Variables

- **`AWS_ACCESS_KEY`**: Your AWS access key used for authentication with AWS services.
- **`AWS_SECRET_KEY`**: Your AWS secret key, paired with the access key for secure authentication.
- **`AWS_REGION`**: Specifies the AWS region where resources (e.g., Athena, S3) are hosted. In this case, it's `us-east-2`.
- **`SCHEMA_NAME`**: Name of the Athena database schema being queried (`surya-covid19`).
- **`S3_STAGING_DIR`**: Path to the S3 directory where Athena query results are stored (`s3://covid19-athena-bucket/output/`).
- **`S3_BUCKET_NAME`**: Name of the S3 bucket used for storing query outputs and data (`covid19-athena-bucket`).
- **`S3_OUTPUT_DIRECTORY`**: Subdirectory inside the S3 bucket for storing results (`output`).

In [130]:
AWS_ACCESS_KEY = "AKIA2ZIOMX2VUBB32KFF"
AWS_SECRET_KEY = "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"
AWS_REGION = "us-east-2"
SCHEMA_NAME = "surya-covid19"
S3_STAGING_DIR = "s3://covid19-athena-bucket/output/"
S3_BUCKET_NAME = "covid19-athena-bucket"
S3_OUTPUT_DIRECTORY = "output"

### Initializing Athena Client

- **`athena_client`**: Creates a connection to AWS Athena using the `boto3` library.
  - **`"athena"`**: Specifies that the client will interact with the Athena service.
  - **`aws_access_key_id`**: The AWS access key for authentication.
  - **`aws_secret_access_key`**: The corresponding AWS secret key for secure authentication.
  - **`region_name`**: Specifies the AWS region where Athena is hosted (in this case, `us-east-2`).
- This client will be used to execute queries and interact with Athena.

In [131]:
athena_client = boto3.client(
    "athena",
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY,
    region_name=AWS_REGION,
)

### Function to Download and Load Athena Query Results

- **Purpose**: This function downloads the results of an Athena query from S3 and loads it into a Pandas DataFrame.

#### Function Definition
def download_and_load_query_results(client: boto3.client, query_response: Dict) -> pd.DataFrame

1. **Check Query Status**: It continuously checks if the query execution is complete using `get_query_results`.
2. **Download Results**: Once the query is complete, it downloads the results from S3 to a temporary file.
3. **Load Data**: The downloaded CSV is read into a Pandas DataFrame.

**Returns**: A Pandas DataFrame containing the query results.

In [4]:
Dict ={}
# Function to download and load query results from AWS Athena
def download_and_load_query_results(
    client: boto3.client, query_response: Dict
) -> pd.DataFrame:
    """
    Downloads the results of an Athena query execution from S3 and loads it into a pandas DataFrame.

    Args:
        client (boto3.client): Boto3 client for Athena.
        query_response (Dict): Query response containing the QueryExecutionId.

    Returns:
        pd.DataFrame: The query results as a pandas DataFrame.
    """
    while True:
        try:
            # Check query execution status (this function only loads the first 1000 rows)
            client.get_query_results(
                QueryExecutionId=query_response["QueryExecutionId"]
            )
            break  # Exit the loop if query execution is complete
        except Exception as err:
            if "not yet finished" in str(err):
                time.sleep(0.001)  # Wait before retrying
            else:
                raise err  # Raise other exceptions

    # Temporary file location for storing query results
    temp_file_location: str = "athena_query_results.csv"

    # Create an S3 client to download the results
    s3_client = boto3.client(
        "s3",
        aws_access_key_id=AWS_ACCESS_KEY,
        aws_secret_access_key=AWS_SECRET_KEY,
        region_name=AWS_REGION,
    )

    # Download the file from the specified S3 bucket and directory
    s3_client.download_file(
        S3_BUCKET_NAME,
        f"{S3_OUTPUT_DIRECTORY}/{query_response['QueryExecutionId']}.csv",
        temp_file_location,
    )

    # Load the downloaded file into a pandas DataFrame
    return pd.read_csv(temp_file_location)

### Starting Athena Query Execution for Multiple Tables

This code executes an Athena query on each of the specified tables (e.g., `enigma_jhud`, `nytimes_data_in_usa_us_county`, etc.).

For each table:
1. **Query Execution**: The `start_query_execution` function is called with the SQL query that selects all data (`SELECT * FROM <table_name> LIMIT 10000`).
2. **Query Configuration**:
   - **`QueryExecutionContext`**: Specifies the Athena database (`SCHEMA_NAME`).
   - **`ResultConfiguration`**: Defines the output location in S3 (`S3_STAGING_DIR`) and specifies encryption (`SSE_S3`).

In [5]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM enigma_jhud limit 10000",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

In [6]:
response

{'QueryExecutionId': 'c4b66ff3-125c-4afe-ac40-b6e2b9c0a4bb',
 'ResponseMetadata': {'RequestId': '9a86bdd7-99d3-4b3a-a4ee-b21744ab3d84',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Fri, 24 Jan 2025 20:26:57 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '59',
   'connection': 'keep-alive',
   'x-amzn-requestid': '9a86bdd7-99d3-4b3a-a4ee-b21744ab3d84'},
  'RetryAttempts': 0}}

### Downloading and Displaying Athena Query Results for `enigma_jhud`

- **Downloading Results**: The function `download_and_load_query_results` is called with the `athena_client` and `response` (the result of the Athena query).
- **Displaying Data**: After the data is loaded into the `enigma_jhud` DataFrame, the `.head()` function is used to display the first few rows of the data for inspection.

In [7]:
enigma_jhud = download_and_load_query_results(athena_client, response)

In [8]:
enigma_jhud.head()

Unnamed: 0,fips,admin2,province_state,country_region,last_update,latitude,longitude,confirmed,deaths,recovered,active,combined_key,partition_0
0,,,,Austria,2020-03-18T14:53:05,47.516,14.55,1646.0,4.0,9.0,,Austria,csv
1,,,,Norway,2020-03-18T15:53:09,60.472,8.469,1550.0,6.0,1.0,,Norway,csv
2,,,,Belgium,2020-03-18T12:33:02,50.833,4.0,1486.0,14.0,31.0,,Belgium,csv
3,,,Guangdong,China,2020-03-18T08:53:03,23.342,113.424,1370.0,8.0,1313.0,,"""Guangdong",csv
4,,,,Sweden,2020-03-18T17:53:02,63.0,16.0,1279.0,10.0,1.0,,Sweden,csv


In [9]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM nytimes_data_in_usa_us_county limit 1000",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

nytimes_data_in_usa_us_county = download_and_load_query_results(athena_client, response)

In [10]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM nytimes_data_in_usa_us_states limit 1000",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

nytimes_data_in_usa_us_states = download_and_load_query_results(athena_client, response)

In [11]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM rearc_covid_19_testing_data_states_daily limit 1000",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

rearc_covid_19_testing_data_states_daily = download_and_load_query_results(athena_client, response)

In [12]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM rearc_covid_19_testing_data_us_daily limit 1000",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

rearc_covid_19_testing_data_us_daily = download_and_load_query_results(athena_client, response)

In [13]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM rearc_covid_19_testing_data_us_total_latest limit 1000",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

rearc_covid_19_testing_data_us_total_latest = download_and_load_query_results(athena_client, response)

In [26]:
import pandas as pd
import json

file_path = r'rearc-usa-hospital-beds/usa-hospital-beds.geojson.4cCa297c'

# Read the file line by line and load into a list
data = []
try:
    with open(file_path, 'r') as file:
        for line in file:
            try:
                data.append(json.loads(line))  # Parse each JSON line
            except json.JSONDecodeError as e:
                print(f"Error decoding JSON: {e}")
except FileNotFoundError:
    print(f"File not found: {file_path}")
except Exception as e:
    print(f"An unexpected error occurred: {e}")

# Convert the list of JSON objects into a Pandas DataFrame if data is loaded
if data:
    df = pd.DataFrame(data)
    print(df.head())  # Display the first few rows of the DataFrame
else:
    print("No data was loaded from the file.")

Error decoding JSON: Unterminated string starting at: line 1 column 393 (char 392)
   OBJECTID                                      HOSPITAL_NAME HOSPITAL_TYPE  \
0         1  Phoenix VA Health Care System (AKA Carl T Hayd...   VA Hospital   
1         2             Southern Arizona VA Health Care System   VA Hospital   
2         3           VA Central California Health Care System   VA Hospital   
3         4  VA Connecticut Healthcare System - West Haven ...   VA Hospital   
4         5                       Wilmington VA Medical Center   VA Hospital   

               HQ_ADDRESS HQ_ADDRESS1     HQ_CITY HQ_STATE HQ_ZIP_CODE  \
0  650 E Indian School Rd        None     Phoenix       AZ       85012   
1          3601 S 6th Ave        None      Tucson       AZ       85723   
2      2615 E Clinton Ave        None      Fresno       CA       93703   
3        950 Campbell Ave        None  West Haven       CT        6516   
4       1601 Kirkwood Hwy        None  Wilmington       DE       1

### Converting JSON Data to CSV

This code processes a **GeoJSON file** (`usa-hospital-beds.geojson.4cCa297c`), where it:
1. Reads each line and checks if it's valid JSON.
2. Saves valid JSON data into a **Pandas DataFrame**.
3. Saves the clean data into a CSV file (`beds.csv`).
4. Logs any invalid JSON lines in an error file (`error_lines.txt`).

In [49]:
import json
import pandas as pd

file_path = r'/Users/suryadeipreddy/Python/covid-19/rearc-usa-hospital-beds/usa-hospital-beds.geojson.4cCa297c'
valid_data = []
error_lines = []

# Read the file line by line
with open(file_path, 'r') as file:
    for i, line in enumerate(file, start=1):
        try:
            # Try to parse the JSON line
            valid_data.append(json.loads(line))
        except json.JSONDecodeError as e:
            # If there's an error, log the problematic line
            print(f"Error decoding JSON on line {i}: {e}")
            error_lines.append((i, line.strip()))

# Convert valid data to a DataFrame
if valid_data:
    df = pd.DataFrame(valid_data)
    print(f"Successfully loaded {len(valid_data)} rows.")
else:
    print("No valid data found in the file.")

# Save valid data to a cleaned CSV file
output_path = r'/Users/suryadeipreddy/Python/covid-19/beds.csv'
df.to_csv(output_path, index=False)
print(f"Cleaned data saved to: {output_path}")

# Save problematic lines for further inspection (optional)
if error_lines:
    with open(r'/Users/suryadeipreddy/Python/covid-19/error_lines.txt', 'w') as error_file:
        for line_num, line_content in error_lines:
            error_file.write(f"Line {line_num}: {line_content}\n")
    print(f"Problematic lines saved to: C:\\Users\\Hp\\Desktop\\s34\\error_lines.txt")

Error decoding JSON on line 830: Unterminated string starting at: line 1 column 393 (char 392)
Successfully loaded 829 rows.
Cleaned data saved to: /Users/suryadeipreddy/Python/covid-19/beds.csv
Problematic lines saved to: C:\Users\Hp\Desktop\s34\error_lines.txt


In [53]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM rearc_usa_hospital_beds limit 1000",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

rearc_usa_hospital_beds = download_and_load_query_results(athena_client, response)

In [31]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM static_datasets_countrycode limit 1000",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

static_datasets_countrycode = download_and_load_query_results(athena_client, response)

In [37]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM static_datasets_countypopulation limit 1000",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

static_datasets_countypopulation = download_and_load_query_results(athena_client, response)

In [35]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM static_datasets_state_abv limit 1000",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

static_datasets_state_abv = download_and_load_query_results(athena_client, response)

In [55]:
static_datasets_state_abv.head()

Unnamed: 0,col0,col1
0,State,Abbreviation
1,Alabama,AL
2,Alaska,AK
3,Arizona,AZ
4,Arkansas,AR


### The dataset was not crawled perfectly as the Column Names are different from what it need to have!, so we are performing operations to change the entity names

In [56]:
new_header = static_datasets_state_abv.iloc[0]

static_datasets_state_abv = static_datasets_state_abv.iloc[1:]
static_datasets_state_abv.columns = new_header

In [57]:
static_datasets_state_abv.head()

Unnamed: 0,State,Abbreviation
1,Alabama,AL
2,Alaska,AK
3,Arizona,AZ
4,Arkansas,AR
5,California,CA


### Creating Fact Table

**Fact Table Creation**:
   - We selected specific columns from the `enigma_jhud` and `rearc_covid_19_testing_data_states_daily` DataFrames to create a **fact table** (`factCovid`).
   - **`factCovid_1`**: Columns selected from `enigma_jhud` include:
     - `fips`, `province_state`, `country_region`, `confirmed`, `deaths`, `recovered`, `active`.
   - **`factCovid_2`**: Columns selected from `rearc_covid_19_testing_data_states_daily` include:
     - `fips`, `date`, `positive`, `negative`, `hospitalizedcurrently`, `hospitalized`, `hospitalizeddischarged`.
   - The two DataFrames are merged using `fips` as the key using an **inner join** to create the `factCovid` table.

In [58]:
print(enigma_jhud.columns)

Index(['fips', 'admin2', 'province_state', 'country_region', 'last_update',
       'latitude', 'longitude', 'confirmed', 'deaths', 'recovered', 'active',
       'combined_key', 'partition_0'],
      dtype='object')


In [59]:
print(rearc_covid_19_testing_data_states_daily.columns)

Index(['date', 'state', 'positive', 'probablecases', 'negative', 'pending',
       'totaltestresultssource', 'totaltestresults', 'hospitalizedcurrently',
       'hospitalizedcumulative', 'inicucurrently', 'inicucumulative',
       'onventilatorcurrently', 'onventilatorcumulative', 'recovered',
       'lastupdateet', 'datemodified', 'checktimeet', 'death', 'hospitalized',
       'hospitalizeddischarged', 'datechecked', 'totaltestsviral',
       'positivetestsviral', 'negativetestsviral', 'positivecasesviral',
       'deathconfirmed', 'deathprobable', 'totaltestencountersviral',
       'totaltestspeopleviral', 'totaltestsantibody', 'positivetestsantibody',
       'negativetestsantibody', 'totaltestspeopleantibody',
       'positivetestspeopleantibody', 'negativetestspeopleantibody',
       'totaltestspeopleantigen', 'positivetestspeopleantigen',
       'totaltestsantigen', 'positivetestsantigen', 'fips', 'positiveincrease',
       'negativeincrease', 'total', 'totaltestresultsincrease', 

In [61]:
# Select specific columns from the enigma_jhud DataFrame
factCovid_1 = enigma_jhud[['fips', 'province_state', 'country_region', 'confirmed', 'deaths', 'recovered', 'active']]

# Select specific columns from the rearc_covid_19_testing_data_states_daily DataFrame
factCovid_2 = rearc_covid_19_testing_data_states_daily[['fips', 'date', 'positive', 'negative', 'hospitalizedcurrently','hospitalized','hospitalizeddischarged']]

# Merge the two DataFrames on the 'fips' column using an inner join
factCovid = pd.merge(factCovid_1, factCovid_2, on='fips', how='right')

In [62]:
print(nytimes_data_in_usa_us_county.columns)

Index(['date', 'county', 'state', 'fips', 'cases', 'deaths'], dtype='object')


### Creating Dimension Tables

**Creating Dimension Tables**:
   - **`dimRegion`**: Merged data from `enigma_jhud` and `nytimes_data_in_usa_us_county`. Key columns: `fips`, `province_state`, `country_region`, `latitude`, `longitude`, `county`, `state`.
   - **`dimHospital`**: Merged data from `rearc_usa_hospital_beds`. Key columns: `fips`, `state_name`, `latitude`, `longitude`, `hospital_name`, `hospital_type`.
   - **`dimDate`**: Extracted date-related data from `rearc_covid_19_testing_data_states_daily`. Key columns: `fips`, `date`, `year`, `month`, `day_of_week`.

In [63]:
# Selecting specific columns from enigma_jhud and nytimes_data_in_usa_us_county
dimRegion_1 = enigma_jhud[['fips', 'province_state', 'country_region', 'latitude', 'longitude']]
dimRegion_2 = nytimes_data_in_usa_us_county[['fips', 'county', 'state']]

# Merging the two DataFrames on the 'fips' column
dimRegion = pd.merge(dimRegion_1, dimRegion_2, on='fips', how='inner')

In [64]:
print(rearc_usa_hospital_beds.columns)

Index(['objectid', 'hospital_name', 'hospital_type', 'hq_address',
       'hq_address1', 'hq_city', 'hq_state', 'hq_zip_code', 'county_name',
       'state_name', 'state_fips', 'cnty_fips', 'fips', 'num_licensed_beds',
       'num_staffed_beds', 'num_icu_beds', 'adult_icu_beds', 'pedi_icu_beds',
       'bed_utilization', 'avg_ventilator_usage',
       'potential_increase_in_bed_capac', 'latitude', 'longtitude'],
      dtype='object')


In [65]:
# Selecting specific columns from eds
dimHospital = rearc_usa_hospital_beds[['fips', 'state_name', 'latitude', 'longtitude', 'hq_address', 'hospital_name', 'hospital_type', 'hq_city', 'hq_state']]

In [66]:
# Selecting specific columns from rearc_covid_19_testing_data_states_daily
dimDate = rearc_covid_19_testing_data_states_daily[['fips', 'date']]
dimDate['date']=pd.to_datetime(dimDate['date'],format='%Y%m%d')

dimDate['year'] = dimDate['date'].dt.year
dimDate['month'] = dimDate['date'].dt.month
dimDate['Day_of_week'] = dimDate['date'].dt.dayofweek

dimDate.head()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['date']=pd.to_datetime(dimDate['date'],format='%Y%m%d')
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['year'] = dimDate['date'].dt.year
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['month'] = dimDate['date'].dt.month
A value is trying to be set on a copy of a slice from a

Unnamed: 0,fips,date,year,month,Day_of_week
0,2,2021-03-07,2021,3,6
1,1,2021-03-07,2021,3,6
2,5,2021-03-07,2021,3,6
3,60,2021-03-07,2021,3,6
4,4,2021-03-07,2021,3,6


## Uploading Data to S3

This section of the code is responsible for saving the transformed **fact** and **dimension** tables into **CSV** format and uploading them to an **S3 bucket** for storage.

**Setting S3 Bucket Name**:
   - The S3 bucket (`'surya-covid-19-data'`) where the files will be stored is specified.

### Key Steps:
- Data is saved as CSV in memory using `StringIO`.
- The `boto3` S3 client uploads the CSV data to the specified S3 bucket paths.

In [67]:
bucket = 'surya-covid-19-data'

**Creating In-Memory CSV Buffer**:
   - The `StringIO()` function is used to create an in-memory buffer to store the CSV data temporarily before uploading it to S3.

In [68]:
csv_buffer = StringIO()

In [69]:
csv_buffer #Datatype

<_io.StringIO at 0x10c312a40>

**Saving Data to CSV Buffer**:
   - The **`factCovid`**, **`dimRegion`**, **`dimHospital`**, and **`dimDate`** DataFrames are saved as CSV into the in-memory buffer using `to_csv(csv_buffer)`.

In [70]:
factCovid.to_csv(csv_buffer)

**Uploading Data to S3**:
   - The **`boto3.resource()`** function is used to interact with the S3 service.
   - The `Object` method of the S3 resource is used to specify the destination path within the S3 bucket (`'output/factCovid.csv'`, `'output/dimRegion.csv'`, etc.).
   - The **`put()`** method is then used to upload the contents of the buffer to the specified location in the S3 bucket.

In [71]:
s3_resource = boto3.resource(
    's3',
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY,
    region_name=AWS_REGION

)

s3_resource.Object(bucket, 'output/factCovid.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'H02A39KMSPBMH5R1',
  'HostId': 'HeO8g9UzxWJrsCjcUPIs7yOAt/6r/meoDUiHu1qxo3ImPJHFggFLw2+yvcviOEsYjQar9VraHHk=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'HeO8g9UzxWJrsCjcUPIs7yOAt/6r/meoDUiHu1qxo3ImPJHFggFLw2+yvcviOEsYjQar9VraHHk=',
   'x-amz-request-id': 'H02A39KMSPBMH5R1',
   'date': 'Fri, 24 Jan 2025 20:58:48 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"885d80ffe48bc95e2c5561d467331bb3"',
   'content-length': '0',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'ETag': '"885d80ffe48bc95e2c5561d467331bb3"',
 'ServerSideEncryption': 'AES256'}

**Displaying the CSV Content**:
   - `csv_buffer.getvalue()` is called to print the contents of the buffer, showing what was uploaded.

In [72]:
csv_buffer.getvalue()

',fips,province_state,country_region,confirmed,deaths,recovered,active,date,positive,negative,hospitalizedcurrently,hospitalized,hospitalizeddischarged\n0,2.0,,,,,,,20210307,56886,,33.0,1293.0,\n1,1.0,,,,,,,20210307,499819,1931711.0,494.0,45976.0,\n2,5.0,,,,,,,20210307,324818,2480716.0,335.0,14926.0,\n3,60.0,,,,,,,20210307,0,2140.0,,,\n4,4.0,,,,,,,20210307,826454,3073010.0,963.0,57907.0,118932.0\n5,6.0,,,,,,,20210307,3501394,,4291.0,,\n6,8.0,,,,,,,20210307,436602,2199458.0,326.0,23904.0,23003.0\n7,9.0,,,,,,,20210307,285330,,428.0,12257.0,\n8,11.0,,,,,,,20210307,41419,,150.0,,\n9,10.0,,,,,,,20210307,88354,545070.0,104.0,,\n10,12.0,,,,,,,20210307,1909209,9339038.0,3307.0,82237.0,\n11,13.0,,,,,,,20210307,1023487,,2008.0,56797.0,\n12,66.0,,,,,,,20210307,7749,112887.0,2.0,,\n13,15.0,,,,,,,20210307,28699,,27.0,2226.0,\n14,19.0,,,,,,,20210307,282384,1044418.0,167.0,,\n15,16.0,,,,,,,20210307,172931,505964.0,150.0,7184.0,\n16,17.0,,,,,,,20210307,1198335,,1141.0,,\n17,18.0,,,,,,,20210307,667262,

In [73]:
dimRegion.to_csv(csv_buffer)

In [74]:
s3_resource.Object(bucket, 'output/dimRegion.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'G7EAKCN4C5R3PW20',
  'HostId': 'u/aOxZBSp2v/Ai19/Nj2Tpp0iOwaRtRl5djB30Dw/3dk+SXLG4QalLke6ly95PBqtVQlyeYC8/w=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'u/aOxZBSp2v/Ai19/Nj2Tpp0iOwaRtRl5djB30Dw/3dk+SXLG4QalLke6ly95PBqtVQlyeYC8/w=',
   'x-amz-request-id': 'G7EAKCN4C5R3PW20',
   'date': 'Fri, 24 Jan 2025 20:58:57 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"c9557088bc912b76f6fa39e69b8583c7"',
   'content-length': '0',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'ETag': '"c9557088bc912b76f6fa39e69b8583c7"',
 'ServerSideEncryption': 'AES256'}

In [75]:
rearc_usa_hospital_beds.to_csv(csv_buffer)

In [76]:
s3_resource.Object(bucket, 'output/dimHospital.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'GGDK7GCDWQD5CRAB',
  'HostId': 'w3/WfGgFfhvDOEEAG0GOGYnZVkKJXe6lsvzXpBno3kQciM8GRF6uF+cOlqWo07tR870HoUJiVxo=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'w3/WfGgFfhvDOEEAG0GOGYnZVkKJXe6lsvzXpBno3kQciM8GRF6uF+cOlqWo07tR870HoUJiVxo=',
   'x-amz-request-id': 'GGDK7GCDWQD5CRAB',
   'date': 'Fri, 24 Jan 2025 20:59:01 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"4eda24fb1ddc060230a7a5ce1a438136"',
   'content-length': '0',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'ETag': '"4eda24fb1ddc060230a7a5ce1a438136"',
 'ServerSideEncryption': 'AES256'}

In [77]:
dimDate.to_csv(csv_buffer)
s3_resource.Object(bucket, 'output/dimDate.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'WN2V902RDS0R33Q5',
  'HostId': 'ZuylhKaj010Nj/GgyAvSY5Bn7z19mCVJ6W0LO2Jq+z2wC91OGIe0arRE8+nIUL3acgGTRK1QUgs=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'ZuylhKaj010Nj/GgyAvSY5Bn7z19mCVJ6W0LO2Jq+z2wC91OGIe0arRE8+nIUL3acgGTRK1QUgs=',
   'x-amz-request-id': 'WN2V902RDS0R33Q5',
   'date': 'Fri, 24 Jan 2025 20:59:05 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"b8588ca5602f453b9b2b4f2ca6bbde1e"',
   'content-length': '0',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'ETag': '"b8588ca5602f453b9b2b4f2ca6bbde1e"',
 'ServerSideEncryption': 'AES256'}

### Generating SQL Schema for Tables

This code generates the SQL `CREATE TABLE` statements for the fact and dimension tables:

1. **Generate SQL for `dimDate`**: Creates the SQL schema for the `dimDate` DataFrame.
2. **Generate SQL for `dimRegion`**: Creates the SQL schema for the `dimRegion` DataFrame.
3. **Generate SQL for `dimHospital`**: Creates the SQL schema for the `dimHospital` table (from `rearc_usa_hospital_beds` DataFrame).
4. **Generate SQL for `factCovid`**: Creates the SQL schema for the `factCovid` DataFrame.

The schemas are printed, and they can be used to create tables in a relational database like Redshift.


In [78]:
dimdatesql = pd.io.sql.get_schema(dimDate.reset_index(),'dimDate')
print(''.join(dimdatesql))

CREATE TABLE "dimDate" (
"index" INTEGER,
  "fips" INTEGER,
  "date" TIMESTAMP,
  "year" INTEGER,
  "month" INTEGER,
  "Day_of_week" INTEGER
)


In [79]:
dimregionsql = pd.io.sql.get_schema(dimRegion.reset_index(),'dimRegion')
print(''.join(dimregionsql))

CREATE TABLE "dimRegion" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "latitude" REAL,
  "longitude" REAL,
  "county" TEXT,
  "state" TEXT
)


In [80]:
dimhospitalsql = pd.io.sql.get_schema(rearc_usa_hospital_beds.reset_index(),'dimHosiptal')
print(''.join(dimhospitalsql))

CREATE TABLE "dimHosiptal" (
"index" INTEGER,
  "objectid" INTEGER,
  "hospital_name" TEXT,
  "hospital_type" TEXT,
  "hq_address" TEXT,
  "hq_address1" TEXT,
  "hq_city" TEXT,
  "hq_state" TEXT,
  "hq_zip_code" INTEGER,
  "county_name" TEXT,
  "state_name" TEXT,
  "state_fips" INTEGER,
  "cnty_fips" INTEGER,
  "fips" INTEGER,
  "num_licensed_beds" REAL,
  "num_staffed_beds" REAL,
  "num_icu_beds" INTEGER,
  "adult_icu_beds" INTEGER,
  "pedi_icu_beds" REAL,
  "bed_utilization" REAL,
  "avg_ventilator_usage" REAL,
  "potential_increase_in_bed_capac" INTEGER,
  "latitude" REAL,
  "longtitude" REAL
)


In [81]:
factcovidsql = pd.io.sql.get_schema(factCovid.reset_index(),'factcovid')
print(''.join(factcovidsql))

CREATE TABLE "factcovid" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "confirmed" REAL,
  "deaths" REAL,
  "recovered" REAL,
  "active" REAL,
  "date" INTEGER,
  "positive" INTEGER,
  "negative" REAL,
  "hospitalizedcurrently" REAL,
  "hospitalized" REAL,
  "hospitalizeddischarged" REAL
)


# Connecting to Amazon Redshift

This section of the code establishes a connection to an **Amazon Redshift** cluster.

**Installing Redshift Connector**:
   - The `redshift_connector` package is installed using the `pip3` command.


In [84]:
!pip3 install redshift_connector

Collecting redshift_connector
  Downloading redshift_connector-2.1.5-py3-none-any.whl.metadata (69 kB)
Collecting scramp<1.5.0,>=1.2.0 (from redshift_connector)
  Downloading scramp-1.4.5-py3-none-any.whl.metadata (19 kB)
Collecting lxml>=4.6.5 (from redshift_connector)
  Downloading lxml-5.3.0-cp312-cp312-macosx_10_9_universal2.whl.metadata (3.8 kB)
Collecting asn1crypto>=1.5.1 (from scramp<1.5.0,>=1.2.0->redshift_connector)
  Downloading asn1crypto-1.5.1-py2.py3-none-any.whl.metadata (13 kB)
Downloading redshift_connector-2.1.5-py3-none-any.whl (138 kB)
Downloading lxml-5.3.0-cp312-cp312-macosx_10_9_universal2.whl (8.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.2/8.2 MB[0m [31m8.3 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hDownloading scramp-1.4.5-py3-none-any.whl (12 kB)
Downloading asn1crypto-1.5.1-py2.py3-none-any.whl (105 kB)
Installing collected packages: asn1crypto, scramp, lxml, redshift_connector
Successfully installed asn1c

**Importing the Connector**:
   - The `redshift_connector` library is imported for interacting with Redshift.

In [85]:
import redshift_connector

**Connecting to Redshift**:
   - A connection to the Redshift cluster is established using the `connect` method, which requires the following details:
     - `host`: The Redshift cluster endpoint.
     - `database`: The name of the Redshift database (`dev`).
     - `user`: The username for authentication (`awsuser`).
     - `password`: The password for the Redshift user.
     - `ssl`: Secure connection (`True`).

In [248]:
# Establish the connection
conn = redshift_connector.connect(
    host='redshift-cluster-1.cxrugpuskhnx.us-east-2.redshift.amazonaws.com',
    database='dev',
    user='awsuser',
    password='SuryaCovid19',  # Use your updated password
    ssl=True  # Secure connection
)

print("Connection established successfully!")

Connection established successfully!


**Autocommit**:
   - Autocommit mode is enabled, ensuring that SQL commands are automatically committed without needing explicit commit commands.

In [249]:
conn.autocommit = True

**Creating Cursor**:
   - A cursor is created using the connection object (`conn.cursor()`), which is used to execute SQL queries.

In [250]:
cursor = redshift_connector.Cursor = conn.cursor()

## Creating Tables in Redshift

This section of the code creates the necessary **tables** in Amazon Redshift to store the fact and dimension data.

**Creating `factcovid` Table**:
   - This table stores the COVID-19 data, including columns such as `fips`, `province_state`, `country_region`, `confirmed`, `deaths`, `recovered`, `active`, `positive`, `negative`, etc.


In [238]:
cursor.execute("""
CREATE TABLE "factcovid" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "confirmed" REAL,
  "deaths" REAL,
  "recovered" REAL,
  "active" REAL,
  "date" INTEGER,
  "positive" INTEGER,
  "negative" REAL,
  "hospitalizedcurrently" REAL,
  "hospitalized" REAL,
  "hospitalizeddischarged" REAL
)
""")


<redshift_connector.cursor.Cursor at 0x10a7a2570>

**Creating `dimDate` Table**:
   - This dimension table stores date-related information such as `fips`, `date`, `year`, `month`, and `day_of_week`.

In [235]:
cursor.execute("""
CREATE TABLE "dimDate" (
    "index" INTEGER,
    "fips" INTEGER,
    "date" TIMESTAMP,
    "year" INTEGER,
    "month" INTEGER,
    "day_of_week" INTEGER
)
""")

<redshift_connector.cursor.Cursor at 0x10a7a2570>

**Creating `dimRegion` Table**:
   - This dimension table contains regional information, including `fips`, `province_state`, `country_region`, `latitude`, `longitude`, `county`, and `state`.

In [236]:
cursor.execute("""
CREATE TABLE "dimRegion" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "latitude" REAL,
  "longitude" REAL,
  "county" TEXT,
  "state" TEXT
)
""")

<redshift_connector.cursor.Cursor at 0x10a7a2570>

**Creating `dimHospital` Table**:
   - This dimension table holds hospital-related information like `fips`, `state_name`, `latitude`, `longitude`, `hospital_name`, `hospital_type`, and `hq_address`.

In [237]:
cursor.execute("""
CREATE TABLE "dimHosiptal" (
"index" INTEGER,
  "fips" INTEGER,
  "state_name" TEXT,
  "latitude" REAL,
  "longtitude" REAL,
  "hq_address" TEXT,
  "hospital_name" TEXT,
  "hospital_type" TEXT,
  "hq_city" TEXT,
  "hq_state" TEXT
)
""")

<redshift_connector.cursor.Cursor at 0x10a7a2570>

## Loading Data into Redshift from S3

This section of the code uses the `COPY` command to load the data from **Amazon S3** into the previously created Redshift tables.

**Copying Data into `factcovid` Table**:
   - The `COPY` command loads data from the `factCovid.csv` file in S3 into the `factcovid` table in Redshift.
   - **`CREDENTIALS`**: Specifies the IAM role (`aws_iam_role`) with permissions to read from the S3 bucket.
   - **`DELIMITER ','`**: Indicates that the CSV file is comma-delimited.
   - **`IGNOREHEADER 1`**: Ignores the header row of the CSV file.
   - **`FILLRECORD`**: Ensures missing values are handled correctly.
   - **`NULL AS ''`**: Specifies that empty fields in the CSV file should be treated as `NULL`.

In [251]:
cursor.execute("""
COPY factcovid
FROM 's3://surya-covid-19-data/output/factCovid.csv'
CREDENTIALS 'aws_iam_role=arn:aws:iam::741448924843:role/s3-redshift-role'
DELIMITER ','
REGION 'us-east-2'
IGNOREHEADER 1
FILLRECORD
NULL AS ''
""")

<redshift_connector.cursor.Cursor at 0x10a7a1df0>

**Copying Data into `dimDate` Table**:
   - Similar to the `factcovid` table, the data from `dimDate.csv` is loaded into the `dimDate` table.

In [253]:
cursor.execute("""
COPY dimDate
FROM 's3://surya-covid-19-data/output/dimDate.csv'
CREDENTIALS 'aws_iam_role=arn:aws:iam::741448924843:role/s3-redshift-role'
DELIMITER ','
REGION 'us-east-2'
IGNOREHEADER 1
FILLRECORD
NULL AS ''
""")

ProgrammingError: {'S': 'ERROR', 'C': 'XX000', 'M': "Load into table 'dimdate' failed.  Check 'stl_load_errors' system table for details.", 'F': '/opt/brazil-pkg-cache/packages/RedshiftPADB/RedshiftPADB-1.0.6073.0/AL2_x86_64/generic-flavor/src/src/pg/src/backend/commands/commands_copy.c', 'L': '796', 'R': 'CheckMaxRowError'}

**Copying Data into `dimRegion` Table**:
   - The data from `dimRegion.csv` is loaded into the `dimRegion` table in Redshift.

In [233]:
cursor.execute("""
COPY dimRegion
FROM 's3://surya-covid-19-data/output/dimRegion.csv'
CREDENTIALS 'aws_iam_role=arn:aws:iam::741448924843:role/s3-redshift-role'
DELIMITER ','
REGION 'us-east-2'
IGNOREHEADER 1
FILLRECORD
NULL AS ''
""")

ProgrammingError: {'S': 'ERROR', 'C': 'XX000', 'M': 'Cannot COPY into nonexistent table dimregion', 'F': '/opt/brazil-pkg-cache/packages/RedshiftPADB/RedshiftPADB-1.0.6073.0/AL2_x86_64/generic-flavor/src/src/sys/compression_analyzer.cpp', 'L': '710', 'R': 'XenGetLoadTableId'}

**Copying Data into `dimHospital` Table**:
   - The `dimHospital.csv` file is loaded into the `dimHospital` table.

In [244]:
cursor.execute("""
COPY dimHospital
FROM 's3://surya-covid-19-data/output/dimHospital.csv'
CREDENTIALS 'aws_iam_role=arn:aws:iam::741448924843:role/s3-redshift-role'
DELIMITER ','
REGION 'us-east-2'
IGNOREHEADER 1
FILLRECORD
NULL AS ''
""")

ProgrammingError: {'S': 'ERROR', 'C': 'XX000', 'M': 'Cannot COPY into nonexistent table dimhospital', 'F': '/opt/brazil-pkg-cache/packages/RedshiftPADB/RedshiftPADB-1.0.6073.0/AL2_x86_64/generic-flavor/src/src/sys/compression_analyzer.cpp', 'L': '710', 'R': 'XenGetLoadTableId'}