### Libraries

#### Boto3: 
Python SDK for AWS. It allows you to directly create, update, and delete AWS resources from your Python scripts.

In [2]:
!pip install boto3

Collecting boto3
  Downloading boto3-1.26.96-py3-none-any.whl (135 kB)
     -------------------------------------- 135.5/135.5 KB 2.7 MB/s eta 0:00:00
Collecting botocore<1.30.0,>=1.29.96
  Downloading botocore-1.29.96-py3-none-any.whl (10.5 MB)
     ---------------------------------------- 10.5/10.5 MB 9.5 MB/s eta 0:00:00
Collecting s3transfer<0.7.0,>=0.6.0




  Downloading s3transfer-0.6.0-py3-none-any.whl (79 kB)
     ---------------------------------------- 79.6/79.6 KB ? eta 0:00:00
Installing collected packages: botocore, s3transfer, boto3
Successfully installed boto3-1.26.96 botocore-1.29.96 s3transfer-0.6.0



You should consider upgrading via the 'C:\Users\Acer\AppData\Local\Programs\Python\Python38\python.exe -m pip install --upgrade pip' command.


In [10]:
import boto3

In [11]:
import pandas as pd

#### io:
The io module in Python provides facilities for working with streams of data in memory or on disk. The StringIO class in the io module is used for creating a stream object that behaves like a file object, but is backed by a string buffer in memory instead of a physical file on disk.

In [12]:
from io import StringIO

### Access Case

In [13]:
AWS_ACCESS_KEY = 'your_access_key'
AWS_SECRET_KEY = 'your secret key'
AWS_REGION = 'ap-south-1'
SCHEMA_NAME = 'covid_dataset'
S3_STAGING_DIR = 's3 uri for staging output'
S3_BUCKET_NAME = 's3 bucket name'
S3_OUTPUT_DIRECTORY = 'ouput'

### Connect to Athena and Query Data

In [14]:
athena_client = boto3.client(
                "athena",
                aws_access_key_id=AWS_ACCESS_KEY,
                aws_secret_access_key=AWS_SECRET_KEY,
                region_name=AWS_REGION
)

### Function

In [15]:
import time

In [16]:
Dict = {}
def download_and_load_query_results(client: boto3.client, query_response: Dict) -> pd.DataFrame:
    while True:
        try:
            #This function only loads the first 1000 rows
            client.get_query_results(
                QueryExecutionId=query_response["QueryExecutionId"]
            )
            break
        except Exception as err:
            if "not yet finished" in str(err):
                time.sleep(0.001)
            else:
                raise err
    temp_file_location: str = "athena_query_results.csv"
    s3_client = boto3.client(
        "s3",
        aws_access_key_id = AWS_ACCESS_KEY,
        aws_secret_access_key = AWS_SECRET_KEY,
        region_name = AWS_REGION
    )
    s3_client.download_file(
        S3_BUCKET_NAME,
        f"{S3_OUTPUT_DIRECTORY}/{query_response['QueryExecutionId']}.csv",
        temp_file_location,
    )
    return pd.read_csv(temp_file_location)

### Query Response

In [17]:
response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM enigma_jhud",
    QueryExecutionContext = {"Database":SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    }
)

In [18]:
response

{'QueryExecutionId': 'ef9d52ae-8400-42af-830e-0a2f79f7106e',
 'ResponseMetadata': {'RequestId': 'a8b4b830-c156-44f8-a735-e95c77dc97a0',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Sat, 25 Mar 2023 15:02:38 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '59',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'a8b4b830-c156-44f8-a735-e95c77dc97a0'},
  'RetryAttempts': 0}}

In [19]:
enigma_jhud = download_and_load_query_results(athena_client, response)

In [20]:
enigma_jhud.head()

Unnamed: 0,fips,admin2,province_state,country_region,last_update,latitude,longitude,confirmed,deaths,recovered,active,combined_key,partition_0
0,,,Anhui,China,2020-01-22T17:00:00,31.826,117.226,1.0,,,,"""Anhui",csv
1,,,Beijing,China,2020-01-22T17:00:00,40.182,116.414,14.0,,,,"""Beijing",csv
2,,,Chongqing,China,2020-01-22T17:00:00,30.057,107.874,6.0,,,,"""Chongqing",csv
3,,,Fujian,China,2020-01-22T17:00:00,26.079,117.987,1.0,,,,"""Fujian",csv
4,,,Gansu,China,2020-01-22T17:00:00,36.061,103.834,,,,,"""Gansu",csv


In [28]:
response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM nytimes_data_in_usa_us_county",
    QueryExecutionContext = {"Database":SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    }
)
nytimes_data_in_usa_us_county = download_and_load_query_results(athena_client, response)

In [29]:
response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM nytimes_data_in_usa_us_states",
    QueryExecutionContext = {"Database":SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    }
)
nytimes_data_in_usa_us_states = download_and_load_query_results(athena_client, response)

In [30]:
response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM static_datasets_countrycode",
    QueryExecutionContext = {"Database":SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    }
)
static_datasets_countrycode = download_and_load_query_results(athena_client, response)

In [31]:
response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM static_datasets_countypopulation",
    QueryExecutionContext = {"Database":SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    }
)
static_datasets_countypopulation = download_and_load_query_results(athena_client, response)

In [32]:
static_datasets_countypopulation.head()

Unnamed: 0,id,id2,county,state,population estimate 2018
0,0500000US01001,1001,Autauga,Alabama,55601
1,0500000US01003,1003,Baldwin,Alabama,218022
2,0500000US01005,1005,Barbour,Alabama,24881
3,0500000US01007,1007,Bibb,Alabama,22400
4,0500000US01009,1009,Blount,Alabama,57840


In [34]:
response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM rearc_covid_19_testing_data_states_daily",
    QueryExecutionContext = {"Database":SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    }
)
rearc_covid_19_testing_data_states_daily = download_and_load_query_results(athena_client, response)

In [35]:
response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM rearc_covid_19_testing_data_us_daily",
    QueryExecutionContext = {"Database":SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    }
)
rearc_covid_19_testing_data_us_daily = download_and_load_query_results(athena_client, response)

In [36]:
response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM rearc_covid_19_testing_data_us_total_latest",
    QueryExecutionContext = {"Database":SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    }
)
rearc_covid_19_testing_data_us_total_latest = download_and_load_query_results(athena_client, response)

In [38]:
response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM rearc_usa_hospital_beds",
    QueryExecutionContext = {"Database":SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    }
)
rearc_usa_hospital_beds = download_and_load_query_results(athena_client, response)

In [39]:
response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM static_datasets_state_abv",
    QueryExecutionContext = {"Database":SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    }
)
static_datasets_state_abv = download_and_load_query_results(athena_client, response)

### Data Storage Error

In [40]:
static_datasets_state_abv.head()

Unnamed: 0,col0,col1
0,State,Abbreviation
1,Alabama,AL
2,Alaska,AK
3,Arizona,AZ
4,Arkansas,AR


In [41]:
new_header = static_datasets_state_abv.iloc[0] #grab the first row for the header

In [42]:
new_header

col0           State
col1    Abbreviation
Name: 0, dtype: object

#### Slicing Data Frame

In [43]:
static_datasets_state_abv = static_datasets_state_abv[1:] # take all the data except 0 index

In [44]:
static_datasets_state_abv.head()

Unnamed: 0,col0,col1
1,Alabama,AL
2,Alaska,AK
3,Arizona,AZ
4,Arkansas,AR
5,California,CA


In [45]:
static_datasets_state_abv.columns = new_header #set the header row as the df header

In [46]:
static_datasets_state_abv.head()

Unnamed: 0,State,Abbreviation
1,Alabama,AL
2,Alaska,AK
3,Arizona,AZ
4,Arkansas,AR
5,California,CA


### ETL job in python

#### Fact Table

In [55]:
factCovid_1 = enigma_jhud[['fips','province_state','country_region','confirmed','deaths','recovered','active']]
factCovid_2 = rearc_covid_19_testing_data_states_daily[['fips','date','positive','negative','hospitalizedcurrently','hospitalized','hospitalizeddischarged']]
factCovid = pd.merge(factCovid_1, factCovid_2, on='fips', how='inner')

In [56]:
factCovid.shape

(27992, 13)

#### Dimension Table (Region)

In [57]:
dimRegion_1 = enigma_jhud[['fips','province_state','country_region','latitude','longitude']]
dimRegion_2 = nytimes_data_in_usa_us_county[['fips','county','state']]
dimRegion = pd.merge(dimRegion_1, dimRegion_2, on='fips', how='inner')

#### Dimension Table (Hospital)

In [58]:
dimHospital = rearc_usa_hospital_beds[['fips','state_name','latitude','longtitude','hq_address','hospital_name','hospital_type','hq_city','hq_state']]

#### Dimension Table (Date)

In [59]:
dimDate = rearc_covid_19_testing_data_states_daily[['fips','date']]

In [60]:
dimDate.head()

Unnamed: 0,fips,date
0,2.0,20210307
1,1.0,20210307
2,5.0,20210307
3,60.0,20210307
4,4.0,20210307


In [61]:
# Providing proper Date format

dimDate['date'] = pd.to_datetime(dimDate['date'], format='%Y%m%d')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['date'] = pd.to_datetime(dimDate['date'], format='%Y%m%d')


In [62]:
dimDate.head()

Unnamed: 0,fips,date
0,2.0,2021-03-07
1,1.0,2021-03-07
2,5.0,2021-03-07
3,60.0,2021-03-07
4,4.0,2021-03-07


In [63]:
# Converting Date into year, month and day of week

dimDate['year'] = dimDate['date'].dt.year
dimDate['month'] = dimDate['date'].dt.month
dimDate['day_of_week'] = dimDate['date'].dt.dayofweek

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['year'] = dimDate['date'].dt.year
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['month'] = dimDate['date'].dt.month
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['day_of_week'] = dimDate['date'].dt.dayofweek


In [64]:
dimDate.head()


Unnamed: 0,fips,date,year,month,day_of_week
0,2.0,2021-03-07,2021,3,6
1,1.0,2021-03-07,2021,3,6
2,5.0,2021-03-07,2021,3,6
3,60.0,2021-03-07,2021,3,6
4,4.0,2021-03-07,2021,3,6


### Save results to S3

In [73]:
bucket = 'ravi-covid-de-project'

In [74]:
csv_buffer = StringIO()

In [75]:
csv_buffer

<_io.StringIO at 0x202c2941c10>

In [80]:
factCovid.to_csv(csv_buffer)

In [82]:
s3_resource = boto3.resource('s3',
                              aws_access_key_id=AWS_ACCESS_KEY,
                              aws_secret_access_key= AWS_SECRET_KEY)
s3_resource.Object(bucket, 'output/factCovid.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'KC9B9JGEMTCYH8S8',
  'HostId': '4WmmV1byEmLTPDa6ISEZyLMBYL+aH7tzIPbIziufpv6PpHQQPNN/SUDTS7kACVrw5AASfbNsYUY=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': '4WmmV1byEmLTPDa6ISEZyLMBYL+aH7tzIPbIziufpv6PpHQQPNN/SUDTS7kACVrw5AASfbNsYUY=',
   'x-amz-request-id': 'KC9B9JGEMTCYH8S8',
   'date': 'Wed, 22 Mar 2023 20:01:27 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"08b3e8102ca88949654bbde517e4b0b1"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 1},
 'ETag': '"08b3e8102ca88949654bbde517e4b0b1"',
 'ServerSideEncryption': 'AES256'}

In [86]:
csv_buffer = StringIO()
dimDate.to_csv(csv_buffer)
s3_resource = boto3.resource('s3',
                              aws_access_key_id=AWS_ACCESS_KEY,
                              aws_secret_access_key= AWS_SECRET_KEY)
s3_resource.Object(bucket, 'output/dimDate.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'KNN9J3AKFV6DRQ0G',
  'HostId': 'NyNcfn27mG/h8yjMxImoIjM6nWjy6+90PGBd73ejIEqI+oHI0gNL1Ncuv/zM+FcQKxR8f69uKXg=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'NyNcfn27mG/h8yjMxImoIjM6nWjy6+90PGBd73ejIEqI+oHI0gNL1Ncuv/zM+FcQKxR8f69uKXg=',
   'x-amz-request-id': 'KNN9J3AKFV6DRQ0G',
   'date': 'Wed, 22 Mar 2023 20:13:31 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"19eb0b77e7f7441c686829bc3fd1a906"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 1},
 'ETag': '"19eb0b77e7f7441c686829bc3fd1a906"',
 'ServerSideEncryption': 'AES256'}

In [87]:
csv_buffer = StringIO()
dimHospital.to_csv(csv_buffer)
s3_resource = boto3.resource('s3',
                              aws_access_key_id=AWS_ACCESS_KEY,
                              aws_secret_access_key= AWS_SECRET_KEY)
s3_resource.Object(bucket, 'output/dimHospital.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'E93E7DDSV6SWXDMY',
  'HostId': 'n8LaA0OtBbHc07upObu5QEVvsF4m8SsiHtUS4jMNzRcNEbiEmIU2qu3f/THTfJhdBXHrUietQh8=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'n8LaA0OtBbHc07upObu5QEVvsF4m8SsiHtUS4jMNzRcNEbiEmIU2qu3f/THTfJhdBXHrUietQh8=',
   'x-amz-request-id': 'E93E7DDSV6SWXDMY',
   'date': 'Wed, 22 Mar 2023 20:13:34 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"a26c4e35d128fe6f64955ba9aac1d221"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 1},
 'ETag': '"a26c4e35d128fe6f64955ba9aac1d221"',
 'ServerSideEncryption': 'AES256'}

In [88]:
csv_buffer = StringIO()
dimRegion.to_csv(csv_buffer)
s3_resource = boto3.resource('s3',
                              aws_access_key_id=AWS_ACCESS_KEY,
                              aws_secret_access_key= AWS_SECRET_KEY)
s3_resource.Object(bucket, 'output/dimRegion.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': '7V0KAW5B2CXHZRMM',
  'HostId': 'banSLdb0CUC2RbMur/XtkgenN0MHzARBPZKRavrUUnx/S0n6rhkAl3BoYgek7raxuq3gSRvVSIWUE8fA8LEDZQ==',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'banSLdb0CUC2RbMur/XtkgenN0MHzARBPZKRavrUUnx/S0n6rhkAl3BoYgek7raxuq3gSRvVSIWUE8fA8LEDZQ==',
   'x-amz-request-id': '7V0KAW5B2CXHZRMM',
   'date': 'Wed, 22 Mar 2023 20:16:03 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"2fdf65265c31f0f6e8c02b4bd066d670"',
   'server': 'AmazonS3',
   'content-length': '0',
   'connection': 'close'},
  'RetryAttempts': 1},
 'ETag': '"2fdf65265c31f0f6e8c02b4bd066d670"',
 'ServerSideEncryption': 'AES256'}

#### Extract Schema out of the DataFrame

In [89]:
dimDatesql = pd.io.sql.get_schema(dimDate.reset_index(), 'dimDate')
print(''.join(dimDatesql))

CREATE TABLE "dimDate" (
"index" INTEGER,
  "fips" REAL,
  "date" TIMESTAMP,
  "year" INTEGER,
  "month" INTEGER,
  "day_of_week" INTEGER
)


In [90]:
factCovidsql = pd.io.sql.get_schema(factCovid.reset_index(), 'factCovid')
print(''.join(factCovidsql))

CREATE TABLE "factCovid" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "confirmed" REAL,
  "deaths" REAL,
  "recovered" REAL,
  "active" REAL,
  "date" INTEGER,
  "positive" INTEGER,
  "negative" REAL,
  "hospitalizedcurrently" REAL,
  "hospitalized" REAL,
  "hospitalizeddischarged" REAL
)


In [91]:
dimRegionsql = pd.io.sql.get_schema(dimRegion.reset_index(), 'dimRegion')
print(''.join(dimRegionsql))

CREATE TABLE "dimRegion" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "latitude" REAL,
  "longitude" REAL,
  "county" TEXT,
  "state" TEXT
)


In [92]:
dimHospitalsql = pd.io.sql.get_schema(dimHospital.reset_index(), 'dimHospital')
print(''.join(dimHospitalsql))

CREATE TABLE "dimHospital" (
"index" INTEGER,
  "fips" REAL,
  "state_name" TEXT,
  "latitude" REAL,
  "longtitude" REAL,
  "hq_address" TEXT,
  "hospital_name" TEXT,
  "hospital_type" TEXT,
  "hq_city" TEXT,
  "hq_state" TEXT
)
