In [None]:
import sagemaker
import boto3

sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
bucket = sagemaker_session.default_bucket()
region = boto3.Session().region_name

## Download the csv files from S3 and store in data folder

In [3]:
!aws s3 cp s3://raw-olist-ecommerce/olist_data/ ./Data/ --recursive --exclude "*" --include "*.csv"

download: s3://raw-olist-ecommerce/olist_data/order_payments_dataset.csv to Data/order_payments_dataset.csv
download: s3://raw-olist-ecommerce/olist_data/product_category_name_translation.csv to Data/product_category_name_translation.csv
download: s3://raw-olist-ecommerce/olist_data/customers_dataset.csv to Data/customers_dataset.csv
download: s3://raw-olist-ecommerce/olist_data/sellers_dataset.csv to Data/sellers_dataset.csv
download: s3://raw-olist-ecommerce/olist_data/order_reviews_dataset.csv to Data/order_reviews_dataset.csv
download: s3://raw-olist-ecommerce/olist_data/products_dataset.csv to Data/products_dataset.csv
download: s3://raw-olist-ecommerce/olist_data/order_items_dataset.csv to Data/order_items_dataset.csv
download: s3://raw-olist-ecommerce/olist_data/geolocation_dataset.csv to Data/geolocation_dataset.csv
download: s3://raw-olist-ecommerce/olist_data/orders_dataset.csv to Data/orders_dataset.csv


## Setup connection to Athena DB

In [9]:
# !pip install --disable-pip-version-check -q PyAthena==2.1.0

In [3]:
import pandas as pd
from pyathena import connect

In [4]:
# set check for creating athena DB
create_athena_db_passed = False
# set db name
database_name = "ads_508_team_5"

In [7]:
conn = connect(region_name=region, s3_staging_dir=s3_staging_dir)

In [6]:
# Set S3 staging directory -- this is a temporary directory used for Athena queries
s3_staging_dir = "s3://{0}/athena/staging".format(bucket)

In [7]:
statement = "CREATE DATABASE IF NOT EXISTS {}".format(database_name)
print(statement)

CREATE DATABASE IF NOT EXISTS ads_508_team_5


In [8]:
pd.read_sql(statement, conn)

## Verify The Database Has Been Created Succesfully

In [9]:
statement = "SHOW DATABASES"

db_show = pd.read_sql(statement, conn)
db_show.head(5)

Unnamed: 0,database_name
0,ads_508_team_5
1,default


In [10]:
if database_name in db_show.values:
    ingest_create_athena_db_passed = True

In [11]:
%store ingest_create_athena_db_passed

Stored 'ingest_create_athena_db_passed' (bool)


## Create tables in Athena DB Dynamically

* We will be using the CSV files in the data folder and storing them as table in Athena, so we can then query them in the Refined layer notebook via SQL

In [19]:
# create a Boto3 client for Athena
athena_client = boto3.client("athena", region_name = region)

In [10]:
# Bring down the database name again 
database_name = "ads_508_team_5"
# create my S3 path to the whole bucket
s3_path_olist = "s3://raw-olist-ecommerce/olist_data/"

In [None]:
customer_table_statement = """CREATE EXTERNAL TABLE IF NOT EXISTS {}.customers_table(
    customer_id string,
          customer_unique_id string,
          customer_zip_code_prefix int,
          customer_city string,
          customer_state string
          )
          ROW FORMAT DELIMITED
          FIELDS TERMINATED BY ','
          STORED AS TEXTFILE
          LOCATION 's3://raw-olist-ecommerce/customers_data/'
          TBLPROPERTIES ("skip.header.line.count"="1");""".format(database_name)

geolocation_table_statement = """CREATE EXTERNAL TABLE IF NOT EXISTS {}.geolocation_table(
          geolocation_zip_code_prefix int,
          geolocation_lat double,
          geolocation_lng double,
          geolocation_city string,
          geolocation_state string
          )
          ROW FORMAT DELIMITED
          FIELDS TERMINATED BY ','
          STORED AS TEXTFILE
          LOCATION 's3://raw-olist-ecommerce/geolocation_dataset/'
          TBLPROPERTIES ("skip.header.line.count"="1");""".format(database_name)

order_items_table_statement = """CREATE EXTERNAL TABLE IF NOT EXISTS {}.order_items_table(
          order_id string,
          order_item_id int,
          product_id string,
          seller_id string,
          shipping_limit_date timestamp,
          price double,
          freight_value double
          )
          ROW FORMAT DELIMITED
          FIELDS TERMINATED BY ','
          STORED AS TEXTFILE
          LOCATION 's3://raw-olist-ecommerce/order_items_data/'
          TBLPROPERTIES ("skip.header.line.count"="1");""".format(database_name)

order_reviews_table_statement = """CREATE EXTERNAL TABLE IF NOT EXISTS {}.order_reviews_table(
          review_id string,
          order_id string,
          review_score int,
          review_comment_message string,
          review_creation_date timestamp,
          review_answer_timestamp timestamp
          )
          ROW FORMAT DELIMITED
          FIELDS TERMINATED BY ','
          STORED AS TEXTFILE
          LOCATION 's3://raw-olist-ecommerce/order_reviews_data/'
          TBLPROPERTIES ("skip.header.line.count"="1");""".format(database_name)

orders_table_statement = """CREATE EXTERNAL TABLE IF NOT EXISTS {}.orders_table(
          order_id string,
          customer_id string,
          order_status string,
          order_purchase_timestamp timestamp,
          order_approved_at timestamp,
          order_delivered_carrier_date timestamp,
          order_delivered_customer_date timestamp,
          order_estimated_delivery_date timestamp
          )
          ROW FORMAT DELIMITED
          FIELDS TERMINATED BY ','
          STORED AS TEXTFILE
          LOCATION 's3://raw-olist-ecommerce/orders_data/'
          TBLPROPERTIES ("skip.header.line.count"="1");""".format(database_name)

order_payments_statement = """CREATE EXTERNAL TABLE IF NOT EXISTS {}.order_payments_table(
          order_id string,
          payment_sequential int,
          payment_type string,
          payment_installments int,
          payment_value double
          )
          ROW FORMAT DELIMITED
          FIELDS TERMINATED BY ','
          STORED AS TEXTFILE
          LOCATION 's3://raw-olist-ecommerce/prder_payments_data/'
          TBLPROPERTIES ("skip.header.line.count"="1");""".format(database_name)


product_category_statement = """CREATE EXTERNAL TABLE IF NOT EXISTS {}.product_category_table(
          product_category_name string,
          product_category_name_english string
          )
          ROW FORMAT DELIMITED
          FIELDS TERMINATED BY ','
          STORED AS TEXTFILE
          LOCATION 's3://raw-olist-ecommerce/product_cat_name_data/'
          TBLPROPERTIES ("skip.header.line.count"="1");""".format(database_name)

products_statement = """CREATE EXTERNAL TABLE IF NOT EXISTS {}.product_table(
          product_category_name string,
          product_category_name_english string
          )
          ROW FORMAT DELIMITED
          FIELDS TERMINATED BY ','
          STORED AS TEXTFILE
          LOCATION 's3://raw-olist-ecommerce/products_data/'
          TBLPROPERTIES ("skip.header.line.count"="1");""".format(database_name)

sellers_statement = """CREATE EXTERNAL TABLE IF NOT EXISTS {}.sellers_table(
          seller_id string,
          seller_zip_code_prefix int,
          seller_city string,
          seller_state string
          )
          ROW FORMAT DELIMITED
          FIELDS TERMINATED BY ','
          STORED AS TEXTFILE
          LOCATION 's3://raw-olist-ecommerce/sellers_data/'
          TBLPROPERTIES ("skip.header.line.count"="1");""".format(database_name)

In [None]:
write_cust_table = pd.read_sql(customer_table_statement, conn)
write_geo_table = pd.read_sql(geolocation_table_statement, conn)
write_order_items_table = pd.read_sql(order_items_table_statement, conn)
write_order_reviews_table = pd.read_sql(order_reviews_table_statement, conn)
write_orders_table = pd.read_sql(orders_table_statement, conn)
write_order_payments_table = pd.read_sql(order_payments_statement, conn)
write_prod_cat_table = pd.read_sql(product_category_statement, conn)
write_product_table = pd.read_sql(products_statement, conn)
write_seller_table = pd.read_sql(sellers_statement, conn)

In [26]:
# Generate and print the DDL for each table
for table, columns in schemas.items():
    # add in teh .csv to the file_name or dataset name
    file_name = table + ".csv"
    # Create the full DDL statement for this table
    ddl_statement = base_ddl.format(
        database=database_name,
        table=table,
        columns=columns.strip(),  # Using .strip() to remove any leading/trailing whitespace
        s3_path=s3_path_olist
    )
        # Output the DDL statement
    response = athena_client.start_query_execution(
            QueryString=ddl_statement,
            QueryExecutionContext={
                'Database': database_name
            },
            ResultConfiguration={
                'OutputLocation': s3_path_olist,
            }
        )

        # Output the response from Athena (includes the query execution ID)
    # print(response)
    # print(ddl_statement)
    # print("\n---\n")

In [11]:
## Check to see if the tables are listed
tables_check = "SHOW TABLES in {}".format(database_name)
tables_show = pd.read_sql(tables_check, conn)
tables_show

Unnamed: 0,tab_name
0,customers_dataset
1,geolocation_dataset
2,order_items_dataset
3,order_payments_dataset
4,order_reviews_dataset
5,orders_dataset
6,product_category_name_translation
7,products_dataset
8,sellers_dataset


In [12]:
table_name = 'customers_dataset'
query = "SELECT * FROM {}.{} LIMIT 5".format(database_name, table_name)
tables_show = pd.read_sql(query, conn)
tables_show

Unnamed: 0,customer_id,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state
0,beleza_saude,health_beauty,,,
1,informatica_acessorios,computers_accessories,,,
2,automotivo,auto,,,
3,cama_mesa_banho,bed_bath_table,,,
4,moveis_decoracao,furniture_decor,,,


In [13]:
%%html

<p><b>Shutting down your kernel for this notebook to release resources.</b></p>
<button class="sm-command-button" data-commandlinker-command="kernelmenu:shutdown" style="display:none;">Shutdown Kernel</button>
        
<script>
try {
    els = document.getElementsByClassName("sm-command-button");
    els[0].click();
}
catch(err) {
    // NoOp
}    
</script>

In [None]:
%%javascript

try {
    Jupyter.notebook.save_checkpoint();
    Jupyter.notebook.session.delete();
}
catch(err) {
    // NoOp
}