In [1]:
import boto3
import pandas as pd
from io import StringIO
import time

In [2]:
from dotenv import load_dotenv
import os
load_dotenv()
AWS_ACCESS_KEY = os.getenv('AWS_ACCESS_KEY')
AWS_SECRET_KEY = os.getenv('AWS_SECRET_KEY')
AWS_REGION = os.getenv('AWS_REGION')
SCHEMA_NAME = os.getenv('SCHEMA_NAME')
S3_STAGING_DIR = os.getenv('S3_STAGING_DIR')
S3_BUCKET_NAME = os.getenv('S3_BUCKET_NAME')
S3_OUTPUT_DIRETORY = os.getenv('S3_OUTPUT_DIRETORY')
S3_DIMENSION_CSV= os.getenv('S3_DIMENSION_CSV')

In [3]:
SCHEMA_NAME

'covid19_database'

In [4]:
S3_STAGING_DIR

's3://khanhnv-covid19-test-bucket/output_result/'

In [5]:
# creates an Athena client object with AWS credentials and region information.
athena_client = boto3.client(
    'athena',
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY,
    region_name=AWS_REGION
)

In [6]:
dict = {} 
def download_and_load_query_results (
    client: boto3.client, query_response: dict, table_name
) -> pd.DataFrame:
    """
    Download query results from Athena and load into a Pandas DataFrame.
    
    Args:
        client: boto3 client.
        query_response (dict): query response
        table_name: name of table in database
    Returns:
        pd.DataFrame: Pandas DataFrame containing the query results.
    """
    while True:
        try:
            # This func only loads the first 1000 rows
            client.get_query_results (
                QueryExecutionId=query_response["QueryExecutionId"]
            )
            break
        except Exception as err:
            if "not yet finished" in str(err):
                time.sleep(0.001)
            else:
                raise err
    temp_file_location: str = f"./query_result/{table_name}_results.csv"
    s3_client = boto3.client(
        "s3",
        aws_access_key_id=AWS_ACCESS_KEY,
        aws_secret_access_key=AWS_SECRET_KEY,
        region_name=AWS_REGION,
    )
    s3_client.download_file(
        S3_BUCKET_NAME,
        f"{S3_OUTPUT_DIRETORY}/{query_response['QueryExecutionId']}.csv",
        temp_file_location,
    )
    return pd.read_csv(temp_file_location)

In [8]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM cdc_pfizer_vaccine_distribution",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

In [9]:
response

{'QueryExecutionId': 'f3fcb88b-92e8-4cd9-8ba7-9212838cd8aa',
 'ResponseMetadata': {'RequestId': '459b9ee8-3677-4164-b517-7a2c35903ad7',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Tue, 25 Apr 2023 11:22:58 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '59',
   'connection': 'keep-alive',
   'x-amzn-requestid': '459b9ee8-3677-4164-b517-7a2c35903ad7'},
  'RetryAttempts': 0}}

In [10]:
def get_table_names_in_database():
    """
    Retrieves the names of all tables in the Athena database specified by the SCHEMA_NAME environment variable.

    Returns:

    table_names (list): A list of table names in the Athena database.
    """
    table_response = athena_client.start_query_execution(
        QueryString=f"SHOW TABLES IN {SCHEMA_NAME}",
        QueryExecutionContext={"Database": SCHEMA_NAME},
        ResultConfiguration={
            "OutputLocation": S3_STAGING_DIR,
            "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
        },
    )
    # Get the query execution ID
    query_execution_id = table_response['QueryExecutionId']

    # Wait for the query to complete
    while True:
        status = athena_client.get_query_execution(QueryExecutionId=query_execution_id)['QueryExecution']['Status']['State']
        if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
            break

    # Get the query results
    results = athena_client.get_query_results(QueryExecutionId=query_execution_id)

    # Parse the table names
    table_names = []
    for row in results['ResultSet']['Rows'][0:]:
        table_names.append(row['Data'][0]['VarCharValue'])

    # Print the table names
    return table_names

In [11]:
def response_result_athena_query(table_name):
    """
    Executes a query to retrieve all rows from a specified table in the Athena database specified by the SCHEMA_NAME environment variable.

    Args:
        table_name (str): Name of the table to query.
    Returns:
        csv file: A csv file will be stored in query_result folder.
    """
    response = athena_client.start_query_execution(
        QueryString=f"SELECT * FROM {table_name}",
        QueryExecutionContext={"Database": SCHEMA_NAME},
        ResultConfiguration={
            "OutputLocation": S3_STAGING_DIR,
            "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
        },
    )
    return download_and_load_query_results(athena_client, response, table_name)


In [12]:
query_result_dict = {}
table_names = get_table_names_in_database()
table_names

['cdc_moderna_vaccine_distribution',
 'cdc_pfizer_vaccine_distribution',
 'enigma_jhu',
 'nytimes_data_in_usa_us_county',
 'nytimes_data_in_usa_us_state',
 'rearc_covid19_testing_states_daily',
 'rearc_covid19_testing_us_daily',
 'rearc_covid19_testing_us_total_latest',
 'rearc_usa_hospital_beds',
 'static_dataset_countrycode',
 'static_dataset_countypopulation',
 'static_dataset_state_abv']

In [15]:
for table_name in table_names:
    query_result_dict[f"{table_name}"] = response_result_athena_query(table_name)

  return pd.read_csv(temp_file_location)


In [16]:
query_result_dict.keys()

dict_keys(['cdc_moderna_vaccine_distribution', 'cdc_pfizer_vaccine_distribution', 'enigma_jhu', 'nytimes_data_in_usa_us_county', 'nytimes_data_in_usa_us_state', 'rearc_covid19_testing_states_daily', 'rearc_covid19_testing_us_daily', 'rearc_covid19_testing_us_total_latest', 'rearc_usa_hospital_beds', 'static_dataset_countrycode', 'static_dataset_countypopulation', 'static_dataset_state_abv'])

In [17]:
query_result_dict['static_dataset_state_abv'].head()

Unnamed: 0,col0,col1
0,col0,col1
1,State,Abbreviation
2,Alabama,AL
3,Alaska,AK
4,Arizona,AZ


In [18]:
query_result_dict['static_dataset_state_abv'] = query_result_dict['static_dataset_state_abv'].drop(0)

In [19]:
query_result_dict['static_dataset_state_abv'].head()

Unnamed: 0,col0,col1
1,State,Abbreviation
2,Alabama,AL
3,Alaska,AK
4,Arizona,AZ
5,Arkansas,AR


In [20]:
# grab the first row for the new header
new_header = query_result_dict['static_dataset_state_abv'].iloc[0]
new_header

col0           State
col1    Abbreviation
Name: 1, dtype: object

In [21]:
# take the data less the header row
query_result_dict['static_dataset_state_abv'] = query_result_dict['static_dataset_state_abv'][1:]
query_result_dict['static_dataset_state_abv'].head()

Unnamed: 0,col0,col1
2,Alabama,AL
3,Alaska,AK
4,Arizona,AZ
5,Arkansas,AR
6,California,CA


In [22]:
# set the header row as the df header
query_result_dict['static_dataset_state_abv'].columns = new_header
query_result_dict['static_dataset_state_abv'].head()

1,State,Abbreviation
2,Alabama,AL
3,Alaska,AK
4,Arizona,AZ
5,Arkansas,AR
6,California,CA


In [24]:
query_result_dict['cdc_moderna_vaccine_distribution'] = query_result_dict['cdc_moderna_vaccine_distribution'].drop(0)
query_result_dict['cdc_moderna_vaccine_distribution'].head()

Unnamed: 0,jurisdiction,week_of_allocations,_1st_dose_allocations,_2nd_dose_allocations
1,Connecticut,2021-06-21T00:00:00.000,41220,41220
2,Maine,2021-06-21T00:00:00.000,15800,15800
3,Massachusetts,2021-06-21T00:00:00.000,79500,79500
4,New Hampshire,2021-06-21T00:00:00.000,15800,15800
5,Rhode Island,2021-06-21T00:00:00.000,12480,12480


In [25]:
query_result_dict['cdc_pfizer_vaccine_distribution'] = query_result_dict['cdc_pfizer_vaccine_distribution'].drop(0)
query_result_dict['cdc_pfizer_vaccine_distribution'].head()

Unnamed: 0,jurisdiction,week_of_allocations,_1st_dose_allocations,_2nd_dose_allocations
1,Connecticut,2021-06-21T00:00:00.000,54360,54360
2,Maine,2021-06-21T00:00:00.000,21420,21420
3,Massachusetts,2021-06-21T00:00:00.000,104580,104580
4,New Hampshire,2021-06-21T00:00:00.000,21420,21420
5,Rhode Island,2021-06-21T00:00:00.000,17280,17280


In [30]:
query_result_dict['enigma_jhu'] = query_result_dict['enigma_jhu'].drop(0)
query_result_dict['enigma_jhu'].head()

Unnamed: 0,county,province_state,country_region,last_update,latitude,longitude,confirmed,active,combined_key,recovered,deaths,county_fips
1,San Benito,California,US,2020-02-03T03:53:02,36.576,-120.988,2,0,"San Benito, California, US",0,0,6069
2,King,Washington,US,2020-03-02T20:23:16,47.548,-121.984,14,0,"King, Washington, US",1,5,53033
3,King,Washington,US,2020-03-04T19:53:02,47.548,-121.984,31,0,"King, Washington, US",1,9,53033
4,Snohomish,Washington,US,2020-03-04T19:53:02,48.033,-121.834,8,0,"Snohomish, Washington, US",0,1,53061
5,Contra Costa,California,US,2020-03-06T19:43:02,37.853,-121.902,3,0,"Contra Costa, California, US",0,0,6013


### Handle missing value 

In [31]:
# check through dataframes include in query_result_dict
# whether dataframe has columns that contain null/NaN value
# then replace with 0
for df_name, df in query_result_dict.items():
    if df.isnull().values.any():
        df = df.fillna(0)
        query_result_dict[df_name] = df

In [32]:
for df_name, df in query_result_dict.items():
    query_result_dict[df_name].to_csv(f'./query_result/{df_name}_results.csv', index=False)

#### Handle quotes symbol in data

In [33]:
for key in query_result_dict:
    df = query_result_dict[key]
    df = df.applymap(lambda x: x.replace('"', '') if isinstance(x, str) else x)
    query_result_dict[key] = df

In [34]:
query_result_dict['enigma_jhu'].head()

Unnamed: 0,county,province_state,country_region,last_update,latitude,longitude,confirmed,active,combined_key,recovered,deaths,county_fips
1,San Benito,California,US,2020-02-03T03:53:02,36.576,-120.988,2,0,"San Benito, California, US",0,0,6069
2,King,Washington,US,2020-03-02T20:23:16,47.548,-121.984,14,0,"King, Washington, US",1,5,53033
3,King,Washington,US,2020-03-04T19:53:02,47.548,-121.984,31,0,"King, Washington, US",1,9,53033
4,Snohomish,Washington,US,2020-03-04T19:53:02,48.033,-121.834,8,0,"Snohomish, Washington, US",0,1,53061
5,Contra Costa,California,US,2020-03-06T19:43:02,37.853,-121.902,3,0,"Contra Costa, California, US",0,0,6013


In [35]:
# write down to csv
for df_name, df in query_result_dict.items():
    query_result_dict[df_name].to_csv(f'./query_result/{df_name}_results.csv', index=False)