In [24]:
import boto3
import pandas as pd
from io import StringIO # python3: python2: BytesIO
import configparser 
import time


In [4]:
# Read the config file
config = configparser.ConfigParser()
config.read('https://github.com/ridwanxyzcloud/aws-ETL-pipeline/blob/main/config/config.ini')

# Extract configuration details
aws_access_key = config['aws']['AWS_ACCESS_KEY']
aws_access_secret = config['aws']['AWS_ACCESS_SECRET']
aws_region = config['aws']['AWS_REGION']
schema_name = config['aws']['SCHEMA_NAME']
s3_staging_dir = config['aws']['S3_STAGING_DIR']
s3_bucket_name = config['aws']['S3_BUCKET_NAME']
s3_output_directory = config['aws']['S3_OUTPUT_DIRECTORY']
password = config['aws']['PASSWORD']
host = config['aws']['HOST']
port = config['aws']['PORT']
dwh_iam_role_arn = config['aws']['DWH_IAM_ROLE_ARN']

In [81]:
# Create the Athena client
athena_client = boto3.client(
    'athena',
    aws_access_key_id=aws_access_key,
    aws_secret_access_key=aws_access_secret,
    region_name=aws_region
)

In [16]:
# Athena Query function

def execute_athena_query(query, database, output_location):
    response = athena_client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': database
        },
        ResultConfiguration={
            'OutputLocation': output_location
        }
    )
    return response


In [15]:
# Function to get Query Result

def get_query_results(query_execution_id):
    import time
    
    while True:
        response = athena_client.get_query_execution(QueryExecutionId=query_execution_id)
        state = response['QueryExecution']['Status']['State']
        if state == 'SUCCEEDED':
            break
        elif state == 'FAILED':
            raise Exception("Query failed")
        elif state == 'CANCELLED':
            raise Exception("Query was cancelled")
        time.sleep(2)
    result_response = athena_client.get_query_results(QueryExecutionId=query_execution_id)
    return result_response


#### it is important to give all required permissions
- access to athena 
- access to Amazon Glue
- access to s3

#### This code executes a query in AWS Athena, waits for it to finish, downloads the results from S3, and returns them as a pandas DataFrame. The response variable holds information about the query execution.

In [30]:
Dict = {}

# This function is defined to download the query results from Athena
# It takes two parameters 'client: boto3.client; and response
def download_and_query_results(client: boto3.client, query_response: Dict) -> pd.DataFrame:
    while True:
        try:
            # This function only loads the first 1000 rows
            client.get_query_results(
                QueryExecutionId=query_response["QueryExecutionId"]
            )
            break
        except Exception as err:
            if "not yet finished" in str(err):
                time.sleep(0.001)
            else:
                raise err
    
    temp_file_location: str = "athena_query_results.csv"
    s3_client = boto3.client(
        "s3",
        aws_access_key_id=aws_access_key,
        aws_secret_access_key=aws_access_secret,
        region_name=aws_region,
    )
    s3_client.download_file(
        s3_bucket_name,
        f"{s3_output_directory}/{query_response['QueryExecutionId']}.csv",
        temp_file_location,
    )
    return pd.read_csv(temp_file_location) 


In [32]:

response = athena_client.start_query_execution(
    QueryString="SELECT * FROM csv", 
    QueryExecutionContext={"Database": schema_name},
    ResultConfiguration={
        "OutputLocation": s3_staging_dir, 
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"}, 
    },
)

response

{'QueryExecutionId': '620b6fe4-67d5-4fe5-bab9-97b47fffe853',
 'ResponseMetadata': {'RequestId': 'ac20ec1f-36a1-4207-9f04-69e77cf4882d',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Mon, 27 May 2024 15:45:59 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '59',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'ac20ec1f-36a1-4207-9f04-69e77cf4882d'},
  'RetryAttempts': 0}}

In [33]:


enigma_jhu = download_and_query_results(athena_client, response)
enigma_jhu.head()

Unnamed: 0,fips,admin2,province_state,country_region,last_update,latitude,longitude,confirmed,deaths,recovered,active,combined_key
0,,,Anhui,China,2020-01-22T17:00:00,31.826,117.226,1.0,,,,"""Anhui"
1,,,Beijing,China,2020-01-22T17:00:00,40.182,116.414,14.0,,,,"""Beijing"
2,,,Chongqing,China,2020-01-22T17:00:00,30.057,107.874,6.0,,,,"""Chongqing"
3,,,Fujian,China,2020-01-22T17:00:00,26.079,117.987,1.0,,,,"""Fujian"
4,,,Gansu,China,2020-01-22T17:00:00,36.061,103.834,,,,,"""Gansu"


In [36]:
# confirm imported data
enigma_jhu.shape

(222804, 12)

#### Repeat the same for every other tables too 

In [37]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM us_county", 
    QueryExecutionContext={"Database": schema_name},
    ResultConfiguration={
        "OutputLocation": s3_staging_dir, 
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"}, 
    },
)

us_county = download_and_query_results(athena_client, response)
us_county.shape

(129747, 6)

In [38]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM us_states", 
    QueryExecutionContext={"Database": schema_name},
    ResultConfiguration={
        "OutputLocation": s3_staging_dir, 
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"}, 
    },
)

us_states = download_and_query_results(athena_client, response)
us_states.shape

(3754, 5)

In [51]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM state_abv", 
    QueryExecutionContext={"Database": schema_name},
    ResultConfiguration={
        "OutputLocation": s3_staging_dir, 
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"}, 
    },
)

static_state_abv = download_and_query_results(athena_client, response)
static_state_abv.shape

(52, 2)

In [56]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM countrycode", 
    QueryExecutionContext={"Database": schema_name},
    ResultConfiguration={
        "OutputLocation": s3_staging_dir, 
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"}, 
    },
)

static_countrycode = download_and_query_results(athena_client, response)
static_countrycode.shape

(256, 6)

In [57]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM countypopulation", 
    QueryExecutionContext={"Database": schema_name},
    ResultConfiguration={
        "OutputLocation": s3_staging_dir, 
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"}, 
    },
)

static_countypopulation = download_and_query_results(athena_client, response)
static_countypopulation.shape

(3220, 5)

In [55]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM us_daily", 
    QueryExecutionContext={"Database": schema_name},
    ResultConfiguration={
        "OutputLocation": s3_staging_dir, 
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"}, 
    },
)

rearc_us_daily = download_and_query_results(athena_client, response)
rearc_us_daily.shape

(420, 25)

In [54]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM states_daily", 
    QueryExecutionContext={"Database": schema_name},
    ResultConfiguration={
        "OutputLocation": s3_staging_dir, 
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"}, 
    },
)

rearc_states_daily = download_and_query_results(athena_client, response)
rearc_states_daily.shape

(20780, 56)

In [53]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM json", 
    QueryExecutionContext={"Database": schema_name},
    ResultConfiguration={
        "OutputLocation": s3_staging_dir, 
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"}, 
    },
)

rearc_us_hospital_beds = download_and_query_results(athena_client, response)
rearc_us_hospital_beds.shape

(6637, 23)

In [52]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM us_total_latest", 
    QueryExecutionContext={"Database": schema_name},
    ResultConfiguration={
        "OutputLocation": s3_staging_dir, 
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"}, 
    },
)

rearc_us_total_latest = download_and_query_results(athena_client, response)
rearc_us_total_latest.shape

(1, 18)

### Inspect the data and do some cleaning or transforming 

In [58]:
# for table static_states_abv, the header is wrong
static_state_abv.head(3)

Unnamed: 0,col0,col1
0,State,Abbreviation
1,Alabama,AL
2,Alaska,AK


In [59]:
# use 'iloc' slicing to change the column header 
new_header = static_state_abv.iloc[0]
# assign it to the dataframe columns so the change is saved
static_state_abv.columns = new_header

static_state_abv.columns

Index(['State', 'Abbreviation'], dtype='object', name=0)

#### Extract the 'fact_covid' from the existing tables created 


In [61]:
factCovid_1 = enigma_jhu[['fips','province_state','country_region','confirmed','deaths','recovered','active']]
factCovid_2 = rearc_states_daily[['fips','date','positive','negative','hospitalizedcurrently','hospitalized','hospitalizeddischarged']]
#merge the two derivative tables
fact_covid = pd.merge(factCovid_1, factCovid_2, on='fips', how='inner')

fact_covid.shape

(26418, 13)

#### Extract 'dim_region'

In [None]:

dimRegion_1 = enigma_jhu[['fips','province_state','country_region','latitude','longtitude']]
dimRegion_2 = us_county[['fips','county','state']]
#merge 
dim_region = pd.merge(dimRegion_1,dimRegion_2, on='fips', how='inner')


In [82]:

dim_region.shape

(45101020, 7)

### Extract the 'dim_hospital' table

In [68]:


dim_hospital = rearc_us_hospital_beds[['fips','state_name','latitude','longtitude','hq_address','hospital_name','hospital_type','hq_city','hq_state']]
dim_hospital.shape

(6637, 9)

In [70]:
dim_date = rearc_states_daily[['fips','date']]

# transform 'dim_date' to extract needed data and columns
#split 
dim_date.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 20780 entries, 0 to 20779
Data columns (total 2 columns):
 #   Column  Non-Null Count  Dtype
---  ------  --------------  -----
 0   fips    20780 non-null  int64
 1   date    20780 non-null  int64
dtypes: int64(2)
memory usage: 324.8 KB


In [71]:
dim_date.head(2)

Unnamed: 0,fips,date
0,2,20210307
1,1,20210307


In [78]:
# Convert the 'date' column from int64 datatype to datetime format
dim_date['date'] = pd.to_datetime(dim_date['date'], format='%Y%m%d')


dim_date['year'] = dim_date['date'].dt.year
dim_date['month'] = dim_date['date'].dt.month
dim_date['day_of_week'] = dim_date['date'].dt.dayofweek

# redefine the columns and assign back to 'dim_date'
dim_date = dim_date[['fips', 'date', 'month', 'year', 'day_of_week']]

dim_date.shape

(20780, 5)

### save all outputs and transformed data to s3 bucket

In [None]:
# saving collectively
# DataFrames to save
data_frames = {
    'fact_covid': fact_covid,
    'dim_hospital': dim_hospital,
    'dim_region': dim_region,
    'dim_date': dim_date
}
# Create an S3 resource
s3 = boto3.resource('s3')

# Upload CSV files to S3 after buffering to binary format
for df_name, df in data_frames.items():
    csv_buffer = StringIO()
    df.to_csv(csv_buffer, index=False)
    s3.Object(s3_bucket_name, f"{s3_output_directory}/{df_name}.csv").put(Body=csv_buffer.getvalue())

print(f"DataFrames successfully uploaded to {s3_output_directory} on Amazon S3.")

# OR use this code 
### NOTE: The second code is faster and use less resources. 

In [83]:
import boto3
import pandas as pd
from io import StringIO

# Create S3 resource
s3 = boto3.resource('s3')

# Function to upload DataFrame to S3
def save_df_to_s3(df, bucket_name, file_path):
    csv_buffer = StringIO()
    df.to_csv(csv_buffer, index=False)
    s3.Object(bucket_name, file_path).put(Body=csv_buffer.getvalue())


In [84]:


#DataFrame to S3 
save_df_to_s3(fact_covid, s3_bucket_name, f"{s3_output_directory}/fact_covid.csv")
print(f"DataFrames successfully uploaded to {s3_output_directory} on Amazon S3.")

DataFrames successfully uploaded to output-data on Amazon S3.


In [85]:

save_df_to_s3(dim_hospital, s3_bucket_name, f"{s3_output_directory}/dim_hospital.csv")
print(f"DataFrames successfully uploaded to {s3_output_directory} on Amazon S3.")

DataFrames successfully uploaded to output-data on Amazon S3.


In [87]:

save_df_to_s3(dim_region, s3_bucket_name, f"{s3_output_directory}/dim_region.csv")
print(f"DataFrames successfully uploaded to {s3_output_directory} on Amazon S3.")

DataFrames successfully uploaded to output-data on Amazon S3.


In [86]:

save_df_to_s3(dim_date, s3_bucket_name, f"{s3_output_directory}/dim_date.csv")

print(f"DataFrames successfully uploaded to {s3_output_directory} on Amazon S3.")


DataFrames successfully uploaded to output-data on Amazon S3.


## Next step is to use our model and extracted schema to create a data warehouse 

In [90]:
# extract DataFrame Schema
fact_covid_sql = pd.io.sql.get_schema(fact_covid.reset_index(), 'factCovid')
dim_hospital_sql = pd.io.sql.get_schema(dim_hospital.reset_index(), 'dimHospital')
dim_region_sql = pd.io.sql.get_schema(dim_region.reset_index(), 'dimRegion')
dim_date_sql = pd.io.sql.get_schema(dim_date.reset_index(), 'dimDate')

print(''.join(fact_covid_sql))
print(''.join(dim_hospital_sql))
print(''.join(dim_region_sql))
print(''.join(dim_date_sql))

CREATE TABLE "factCovid" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "confirmed" REAL,
  "deaths" REAL,
  "recovered" REAL,
  "active" REAL,
  "date" INTEGER,
  "positive" REAL,
  "negative" REAL,
  "hospitalizedcurrently" REAL,
  "hospitalized" REAL,
  "hospitalizeddischarged" REAL
)
CREATE TABLE "dimHospital" (
"index" INTEGER,
  "fips" REAL,
  "state_name" TEXT,
  "latitude" REAL,
  "longtitude" REAL,
  "hq_address" TEXT,
  "hospital_name" TEXT,
  "hospital_type" TEXT,
  "hq_city" TEXT,
  "hq_state" TEXT
)
CREATE TABLE "dimRegion" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "latitude" REAL,
  "longitude" REAL,
  "county" TEXT,
  "state" TEXT
)
CREATE TABLE "dimDate" (
"index" INTEGER,
  "fips" INTEGER,
  "date" TIMESTAMP,
  "month" INTEGER,
  "year" INTEGER,
  "day_of_week" INTEGER
)


# Connect to Redshift and Create Table 

In [7]:
#
!pip install redshift_connector
import redshift_connector

In [35]:
# connect to redshift using the 'redshift_connector', create a cursor and create table using the extracted schema

conn = redshift_connector.connect(
    host = host,
    database = 'aws-etl-redshift',
    user ='admin',
    password=password
)
conn.autocommit = True
cur = conn.cursor()


In [99]:
cur.execute('''
  CREATE TABLE "dimDate" (
"index" INTEGER,
  "fips" INTEGER,
  "date" TIMESTAMP,
  "month" INTEGER,
  "year" INTEGER,
  "day_of_week" INTEGER
);  
''')

<redshift_connector.cursor.Cursor at 0x1354a0fd0>

In [100]:
cur.execute('''
CREATE TABLE "dimRegion" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "latitude" REAL,
  "longitude" REAL,
  "county" TEXT,
  "state" TEXT
);  
''')

<redshift_connector.cursor.Cursor at 0x1354a0fd0>

In [101]:
cur.execute('''
CREATE TABLE "dimHospital" (
"index" INTEGER,
  "fips" REAL,
  "state_name" TEXT,
  "latitude" REAL,
  "longtitude" REAL,
  "hq_address" TEXT,
  "hospital_name" TEXT,
  "hospital_type" TEXT,
  "hq_city" TEXT,
  "hq_state" TEXT
);  
''')

<redshift_connector.cursor.Cursor at 0x1354a0fd0>

In [102]:
cur.execute('''
CREATE TABLE "factCovid" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "confirmed" REAL,
  "deaths" REAL,
  "recovered" REAL,
  "active" REAL,
  "date" INTEGER,
  "positive" REAL,
  "negative" REAL,
  "hospitalizedcurrently" REAL,
  "hospitalized" REAL,
  "hospitalizeddischarged" REAL
);  
''')

<redshift_connector.cursor.Cursor at 0x1354a0fd0>

## Loading Data into Redshift
- Give all the required permission
- Load data using COPY command

In [106]:
dwh_iam_role_arn = 'arn:aws:iam::339713018722:role/redshift-role'

In [None]:
# Define the COPY commands for each table
copy_factCovid = f"""
    COPY factCovid
    FROM 's3://covid-lake-bucket/output-data/fact_covid.csv'
    IAM_ROLE '{dwh_iam_role_arn}'
    CSV
    IGNOREHEADER 1;
"""

copy_dimHospital = f"""
    COPY dimHospital
    FROM 's3://covid-lake-bucket/output-data/dim_hospital.csv'
    IAM_ROLE '{dwh_iam_role_arn}'
    CSV
    IGNOREHEADER 1;
"""

copy_dimDate = f"""
    COPY dimDate
    FROM 's3://covid-lake-bucket/output-data/dim_date.csv'
    IAM_ROLE '{dwh_iam_role_arn}'
    CSV
    IGNOREHEADER 1;
"""

copy_dimRegion = f"""
    COPY dimRegion
    FROM 's3://covid-lake-bucket/output-data/dim_region.csv'
    IAM_ROLE '{dwh_iam_role_arn}'
    CSV
    IGNOREHEADER 1;
"""

# Execute the COPY commands
cur.execute(copy_factCovid)
cur.execute(copy_dimHospital)
cur.execute(copy_dimDate)
cur.execute(copy_dimRegion)

# Close cursor and connection
cur.close()
conn.close()
print('Data loaded into Redshift Successfully')

# Data Loaded Successfully 