In [2]:
from IPython.core.interactiveshell import InteractiveShell

InteractiveShell.ast_node_interactivity = "all"

import sys
import os

sys.path.append("../../../")
from src.athena import Athena
from src.utils import create_session

## Global

In [3]:
boto3_session = create_session(
    profile_name="dev",
    role_arn="arn:aws:iam::722696965592:role/athena-full-access-role",
)

wait = True
ctas_approach = False

database = "data_bank"
tables = ["regions", "customer_nodes", "customer_transactions"]
sql_path = "../sql/"

athena = Athena(boto3_session=boto3_session, s3_output=os.getenv("ATHENA_S3_OUTPUT"))
athena

Athena(boto3_session=Session(region_name='us-east-1'), s3_output=s3://sql-case-studies/query_results)

## Create Database & Table

In [5]:
response = athena.create_database(database=database, wait=wait)

response

Query executed successfully


In [6]:
ddls = {}
for table in tables:
    with open(os.path.join(sql_path, f"{table}_ddl.sql"), "r") as f:
        ddls[table] = f.read()

    print(ddls[table])
    print("-" * 50)
    print("\n")

CREATE EXTERNAL TABLE IF NOT EXISTS data_bank.regions (
  region_id INT,
  region_name VARCHAR(9)
)
COMMENT 'Regions table'
STORED AS PARQUET
LOCATION 's3://sql-case-studies/data_bank/regions/'
TBLPROPERTIES ('classification'='parquet', 'parquet.compress'='SNAPPY');

--------------------------------------------------


CREATE EXTERNAL TABLE IF NOT EXISTS data_bank.customer_nodes (
  customer_id INT,
  region_id INT,
  node_id INT,
  start_date TIMESTAMP,
  end_date TIMESTAMP
)
COMMENT 'Customer nodes table'
STORED AS PARQUET
LOCATION 's3://sql-case-studies/data_bank/customer_nodes/'
TBLPROPERTIES ('classification'='parquet', 'parquet.compress'='SNAPPY');

--------------------------------------------------


CREATE EXTERNAL TABLE IF NOT EXISTS data_bank.customer_transactions (
  customer_id INT,
  txn_date TIMESTAMP,
  txn_type VARCHAR(10),
  txn_amount DOUBLE
)
COMMENT 'Customer transactions table'
STORED AS PARQUET
LOCATION 's3://sql-case-studies/data_bank/customer_transactions/'
TBLP

In [7]:
for ddl in ddls.values():
    response = athena.create_table(database=database, query=ddl, wait=wait)
    response

Query executed successfully
Query executed successfully
Query executed successfully


In [8]:
for table in tables:
    athena.query(
        database=database,
        query=f""" 
                SELECT
                    *
                FROM
                    {database}.{table} 
                LIMIT 10;
              """,
        ctas_approach=ctas_approach,
    )

Unnamed: 0,region_id,region_name
0,1,Australia
1,2,America
2,3,Africa
3,4,Asia
4,5,Europe


Unnamed: 0,customer_id,region_id,node_id,start_date,end_date
0,1,3,4,2020-01-02,2020-01-03
1,2,3,5,2020-01-03,2020-01-17
2,3,5,4,2020-01-27,2020-02-18
3,4,5,4,2020-01-07,2020-01-19
4,5,3,3,2020-01-15,2020-01-23
5,6,1,1,2020-01-11,2020-02-06
6,7,2,5,2020-01-20,2020-02-04
7,8,1,2,2020-01-15,2020-01-28
8,9,4,5,2020-01-21,2020-01-25
9,10,3,4,2020-01-13,2020-01-14


Unnamed: 0,customer_id,txn_date,txn_type,txn_amount
0,1,2020-01-02,deposit,312.0
1,1,2020-03-19,purchase,664.0
2,1,2020-03-17,deposit,324.0
3,1,2020-03-05,purchase,612.0
4,2,2020-03-24,deposit,61.0
5,2,2020-01-03,deposit,549.0
6,3,2020-03-05,withdrawal,213.0
7,3,2020-02-22,purchase,965.0
8,3,2020-04-12,deposit,493.0
9,3,2020-03-19,withdrawal,188.0


## Drop Database & Table

In [9]:
for table in tables:
    athena.drop_table(database=database, table=table, wait=wait)

Query executed successfully
Query executed successfully
Query executed successfully


In [10]:
athena.drop_database(database=database, wait=wait)

Query executed successfully
