# ELT Banxico & INEGI data to build Athena's database

Using the `fx`, `tiie` and `inpc`, create a database in Athena to analyze the behavior of the Mexican economy.

@roman avj

18 mar 24

---
# Settings

In [55]:
import os
import boto3
from pyathena import connect
import awswrangler as wr
import pandas as pd
import yaml
from dotenv import load_dotenv
from datetime import datetime
from tqdm import tqdm

In [56]:
# load environment variables
load_dotenv()

# get config file
with open('../config.yaml', 'r') as file:
    config = yaml.safe_load(file)

In [6]:
# AWS Settings
session = boto3.Session(profile_name="arquitectura")
s3 = session.client('s3')

# set region east-1 to boto3
os.environ['AWS_DEFAULT_REGION'] = 'us-east-1'

# Bucket
BUCKET_NAME = config['aws']['bucket']

# Sub Bucket
SUB_BUCKET = config['aws']['sub-bucket']

# Bucket Query
BUCKET_QUERY = config['aws']['bucket_queries']

---
# Configure Database at Athena

## Configure

### Create database

In [42]:
# create glue client with profile session
glue_client = session.client('glue')

In [43]:
# Create Database
# create database
response = glue_client.create_database(
    DatabaseInput={
        'Name': config['aws']['database'],
        'Description': 'Database for the banxico-inegi Database'
    }
)

print(response)

{'ResponseMetadata': {'RequestId': '091cb7a9-465d-4b04-aaaa-a5f6fc43e2d2', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Thu, 21 Mar 2024 14:40:16 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '2', 'connection': 'keep-alive', 'x-amzn-requestid': '091cb7a9-465d-4b04-aaaa-a5f6fc43e2d2'}, 'RetryAttempts': 0}}


### Configure Athena

In [14]:
# Create Query Bucket
response = s3.create_bucket(Bucket=BUCKET_QUERY)
print(response)

{'ResponseMetadata': {'RequestId': 'FD2B1Q0F95N8N2D9', 'HostId': 'VP5VjlBzsGi661B4KBlQq+O7OpsDwqoKjGiv3NTbRjNOdQ68OSc+x2k7WOILYOvLacAlmLTSW86MguJ2CQMEFA==', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'VP5VjlBzsGi661B4KBlQq+O7OpsDwqoKjGiv3NTbRjNOdQ68OSc+x2k7WOILYOvLacAlmLTSW86MguJ2CQMEFA==', 'x-amz-request-id': 'FD2B1Q0F95N8N2D9', 'date': 'Thu, 21 Mar 2024 14:17:52 GMT', 'location': '/itam-analytics-roman-queries', 'server': 'AmazonS3', 'content-length': '0'}, 'RetryAttempts': 0}, 'Location': '/itam-analytics-roman-queries'}


## Create Tables

### Create Athena Table

In [39]:
# read files from s3
def read_from_s3(folder, filename, s3_session):
    # remove .csv from filename
    filename_path = filename.replace('.csv', '')

    df = wr.s3.read_csv(
        path=f"s3://{BUCKET_NAME}/{SUB_BUCKET}/{folder}/{filename_path}/{filename}",
        boto3_session=s3_session
    )
    # set date as datetime
    df['date'] = pd.to_datetime(df['date'])

    return df

# read files from s3
dict_df = {}
for key, value in tqdm(config['aws']['filenames'].items()):
    dict_df[key] = read_from_s3('raw', filename=value, s3_session=session)

100%|██████████| 3/3 [00:01<00:00,  1.55it/s]


In [47]:
# look dataframes
for key, value in dict_df.items():
    print(f"\nDataframe: {key}")
    print(value.info())


Dataframe: inpc
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 75 entries, 0 to 74
Data columns (total 2 columns):
 #   Column  Non-Null Count  Dtype         
---  ------  --------------  -----         
 0   date    75 non-null     datetime64[ns]
 1   inpc    74 non-null     float64       
dtypes: datetime64[ns](1), float64(1)
memory usage: 1.3 KB
None

Dataframe: dollar_fx
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1623 entries, 0 to 1622
Data columns (total 2 columns):
 #   Column     Non-Null Count  Dtype         
---  ------     --------------  -----         
 0   date       1623 non-null   datetime64[ns]
 1   dollar_fx  1623 non-null   float64       
dtypes: datetime64[ns](1), float64(1)
memory usage: 25.5 KB
None

Dataframe: tiie
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1623 entries, 0 to 1622
Data columns (total 2 columns):
 #   Column  Non-Null Count  Dtype         
---  ------  --------------  -----         
 0   date    1623 non-null   datetime64[ns]
 1   

In [48]:
# Create Tables
def create_external_table_athena(df, database_name, table_name, s3_location, boto3_session):
    # Body Query
    query = """
        CREATE EXTERNAL TABLE IF NOT EXISTS `{database_name}`.`{table_name}` ({columns})
        COMMENT "{comment}"
        ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
        WITH SERDEPROPERTIES ('field.delim' = ',')
        STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
        LOCATION {s3_location}
        TBLPROPERTIES ('classification' = 'csv', "skip.header.line.count"="1")
    """

    # get columns
    dict_dtype_map = {'datetime64[ns]': 'DATE', 'int64': 'INT', 'float64': 'FLOAT', 'object': 'STRING'}

    # create columns
    columns = []
    for column, dtype in df.dtypes.items():
        columns.append(f"{column} {dict_dtype_map.get(str(dtype), 'STRING')}")
    columns_str = ',\n'.join(columns)

    # create comment
    comment_str = f"Table for the {table_name} dataset"

    # create query
    query = query.format(
        database_name=database_name,
        table_name=table_name,
        columns=columns_str,
        comment=comment_str,
        s3_location=f"'s3://{BUCKET_NAME}/{s3_location}'"
    )
    print(query)

    # execute query
    response = wr.athena.read_sql_query(
        sql=query,
        database=database_name,
        ctas_approach=False, 
        boto3_session=boto3_session
    )
    return response


In [49]:
# create tables
for key, value in dict_df.items():
    # get filename_key
    filename_key = config['aws']['filenames'][key].replace('.csv', '')

    print(f"\nCreating table for: {filename_key} {'='*20}")
    # create table
    response = create_external_table_athena(
        df=value,
        database_name=config['aws']['database'],
        table_name=filename_key,
        s3_location=f"{SUB_BUCKET}/raw/{filename_key}/",
        boto3_session=session
    )
    print(response)




        CREATE EXTERNAL TABLE IF NOT EXISTS `econ`.`inflacion` (date DATE,
inpc FLOAT)
        COMMENT "Table for the inflacion dataset"
        ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
        WITH SERDEPROPERTIES ('field.delim' = ',')
        STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
        LOCATION 's3://itam-analytics-roman/homeworks/banxico-inegi/raw/inflacion/'
        TBLPROPERTIES ('classification' = 'csv', "skip.header.line.count"="1")
    
Empty DataFrame
Columns: []
Index: []


        CREATE EXTERNAL TABLE IF NOT EXISTS `econ`.`tipo_de_cambio` (date DATE,
dollar_fx FLOAT)
        COMMENT "Table for the tipo_de_cambio dataset"
        ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
        WITH SERDEPROPERTIES ('field.delim' = ',')
        STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apach

---
# Extract & Load & Transform

In [57]:
# Query to join tables
query = open('sql/ELT.sql', 'r').read()

# add CREATE TABLE AS to query
query = "CREATE TABLE {database_name}.{table_names} AS (\n{query_str})".format(
    database_name=config['aws']['database'],
    table_names=config['aws']['elt_table_name'],
    query_str=query
)
print(query)

CREATE TABLE econ.banxico_inegi AS (
WITH monthly_fx AS (
    SELECT
        -- get year and month from date nad set it to the first day of the month
        DATE_TRUNC('month', date) AS date,
        dollar_fx,
        ROW_NUMBER() OVER (PARTITION BY EXTRACT(YEAR FROM date), EXTRACT(MONTH FROM date) ORDER BY date) AS row_num
    FROM econ.tipo_de_cambio
), monthly_interest_rate AS (
    SELECT
        -- get year and month from date nad set it to the first day of the month
        DATE_TRUNC('month', date) AS date,
        tiie,
        ROW_NUMBER() OVER (PARTITION BY EXTRACT(YEAR FROM date), EXTRACT(MONTH FROM date) ORDER BY date) AS row_num
    FROM econ.tasa_de_interes
)
SELECT
    i.date,
    i.inpc,
    fx.dollar_fx,
    ir.tiie
FROM econ.inflacion AS i
LEFT JOIN monthly_fx AS fx
    ON i.date = fx.date
    AND fx.row_num = 1
LEFT JOIN monthly_interest_rate AS ir
    ON i.date = ir.date
    AND ir.row_num = 1
)


In [58]:
# Run query in athena
response = wr.athena.read_sql_query(
    sql=query,
    database=config['aws']['database'],
    ctas_approach=False, 
    boto3_session=session
)
print(response)

Empty DataFrame
Columns: []
Index: []


In [59]:
# Download data
# download data
df_banxico_inegi = wr.athena.read_sql_query(
    sql=f"SELECT * FROM {config['aws']['database']}.{config['aws']['elt_table_name']}",
    database=config['aws']['database'],
    ctas_approach=False, 
    boto3_session=session
)

df_banxico_inegi

Unnamed: 0,date,inpc,dollar_fx,tiie
0,2018-01-01,98.794998,19.489901,7.6311
1,2018-02-01,99.171371,18.400400,7.6600
2,2018-03-01,99.492157,18.861000,7.8294
3,2018-04-01,99.154846,18.296700,7.8503
4,2018-05-01,98.994080,18.787800,7.8455
...,...,...,...,...
70,2023-11-01,131.445007,17.930500,11.5035
71,2023-12-01,132.373001,17.214300,11.5033
72,2024-01-01,133.554993,16.919001,11.5035
73,2024-02-01,133.681000,17.133499,11.5012
