# Olist bronze ingestion

In [60]:
%%sh

ls /var/lib/ngods/stage/olist

olist_customers_dataset.csv
olist_geolocation_dataset.csv
olist_order_items_dataset.csv
olist_order_payments_dataset.csv
olist_order_reviews_dataset.csv
olist_orders_dataset.csv
olist_products_dataset.csv
olist_sellers_dataset.csv
product_category_name_translation.csv


In [61]:
%%sql
show databases;

namespace
bronze
default
silver


In [62]:
%%sql
use database bronze;

## Table: customer

In [63]:
customer_df = (spark
     .read
     .option("header", "true")
     .csv("/var/lib/ngods/stage/olist/olist_customers_dataset.csv"))

In [64]:
customer_df.describe()

                                                                                

DataFrame[summary: string, customer_id: string, customer_unique_id: string, customer_zip_code_prefix: string, customer_city: string, customer_state: string]

In [65]:
customer_df.show()

+--------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------------------+--------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|                   14409|              franca|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|                   09790|sao bernardo do c...|            SP|
|4e7b3e00288586ebd...|060e732b5b29e8181...|                   01151|           sao paulo|            SP|
|b2b6027bc5c5109e5...|259dac757896d24d7...|                   08775|     mogi das cruzes|            SP|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|                   13056|            campinas|            SP|
|879864dab9bc30475...|4c93744516667ad3b...|                   89254|      jaragua do sul|            SC|
|fd826e7cf63160e53...|addec96d2e059c80c...|            

In [66]:
customer_df.createOrReplaceTempView("temp_customer")

In [67]:
%%sql
CREATE TABLE IF NOT EXISTS bronze.olist_customer (
    customer_id string,
    customer_unique_id string,
    customer_zip_code_prefix string,
    customer_city string,
    customer_state string
)
USING iceberg;

In [70]:
%%sql 
MERGE INTO bronze.olist_customer t
USING (
    SELECT
    customer_id,
    customer_unique_id,
    customer_zip_code_prefix,
    customer_city,
    customer_state
    FROM temp_customer) s
ON t.customer_unique_id = s.customer_unique_id
WHEN NOT MATCHED THEN INSERT *

                                                                                

In [72]:
%%sql
select count(*) from bronze.olist_customer
UNION ALL
select count(*) from temp_customer
;

count(1)
99441
99441


## Table: Geolocation

In [73]:
geo_df = (spark
     .read
     .option("header", "true")
     .csv("/var/lib/ngods/stage/olist/olist_geolocation_dataset.csv"))

In [74]:
geo_df.describe()

                                                                                

DataFrame[summary: string, geolocation_zip_code_prefix: string, geolocation_lat: string, geolocation_lng: string, geolocation_city: string, geolocation_state: string]

In [75]:
geo_df.show()

+---------------------------+-------------------+-------------------+----------------+-----------------+
|geolocation_zip_code_prefix|    geolocation_lat|    geolocation_lng|geolocation_city|geolocation_state|
+---------------------------+-------------------+-------------------+----------------+-----------------+
|                      01037| -23.54562128115268| -46.63929204800168|       sao paulo|               SP|
|                      01046|-23.546081127035535| -46.64482029837157|       sao paulo|               SP|
|                      01046| -23.54612896641469| -46.64295148361138|       sao paulo|               SP|
|                      01041|  -23.5443921648681| -46.63949930627844|       sao paulo|               SP|
|                      01035|-23.541577961711493| -46.64160722329613|       sao paulo|               SP|
|                      01012|-23.547762303364266| -46.63536053788448|       são paulo|               SP|
|                      01047|-23.546273112412678| -46.6

In [77]:
geo_df.createOrReplaceTempView("temp_geo")

In [80]:
%%sql
CREATE TABLE IF NOT EXISTS bronze.olist_geolocation (
    geolocation_zip_code_prefix string,
    geolocation_lat string,
    geolocation_lng string,
    geolocation_city string,
    geolocation_state string
)
USING iceberg;

In [81]:
%%sql 
MERGE INTO bronze.olist_geolocation t
USING (
    SELECT
    geolocation_zip_code_prefix,
    geolocation_lat,
    geolocation_lng,
    geolocation_city,
    geolocation_state
    FROM temp_geo) s
ON t.geolocation_zip_code_prefix = s.geolocation_zip_code_prefix
AND t.geolocation_lat = s.geolocation_lat
AND t.geolocation_lng = s.geolocation_lng
AND t.geolocation_city = s.geolocation_city
AND t.geolocation_state = s.geolocation_state
WHEN NOT MATCHED THEN INSERT *

                                                                                

## Table: Order Items

In [82]:
order_items_df = (spark
     .read
     .option("header", "true")
     .csv("/var/lib/ngods/stage/olist/olist_order_items_dataset.csv"))

In [83]:
order_items_df.describe()

                                                                                

DataFrame[summary: string, order_id: string, order_item_id: string, product_id: string, seller_id: string, shipping_limit_date: string, price: string, freight_value: string]

In [84]:
order_items_df.show()

+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date| price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35| 58.90|        13.29|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13|239.90|        19.93|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48:30|199.00|        17.87|
|00024acbcdf0a6daa...|            1|7634da152a4610f15...|9d7a1d34a50524090...|2018-08-15 10:10:18| 12.99|        12.79|
|00042b26cf59d7ce6...|            1|ac6c3623068f30de0...|df560393f3a51e745...|2017-02-13 13:57:51|199.90|        18.14|
|00048cc3ae777c65d...|            1|ef92

In [85]:
order_items_df.createOrReplaceTempView("temp_order_items")

In [87]:
%%sql

CREATE TABLE IF NOT EXISTS bronze.olist_order_items (
    order_id string,
    order_item_id string,
    product_id string,
    seller_id string,
    shipping_limit_date string,
    price string,
    freight_value string
)
USING iceberg;

In [90]:
%%sql
MERGE INTO bronze.olist_order_items t
USING (
    SELECT
    order_id,
    order_item_id,
    product_id,
    seller_id,
    shipping_limit_date,
    price,
    freight_value
    FROM temp_order_items) s
ON t.order_id = s.order_id
AND t.order_item_id = s.order_item_id
WHEN NOT MATCHED THEN INSERT *

                                                                                

## Table: Order Payments

In [91]:
order_payments_df = (spark
     .read
     .option("header", "true")
     .csv("/var/lib/ngods/stage/olist/olist_order_payments_dataset.csv"))

In [92]:
order_payments_df.createOrReplaceTempView("temp_order_payments")

In [93]:
order_payments_df.describe()

                                                                                

DataFrame[summary: string, order_id: string, payment_sequential: string, payment_type: string, payment_installments: string, payment_value: string]

In [96]:
%%sql
CREATE TABLE IF NOT EXISTS bronze.olist_order_payments (
    order_id string,
    payment_sequential string,
    payment_type string,
    payment_installments string,
    payment_value string
)
USING iceberg;

In [99]:
%%sql
MERGE INTO bronze.olist_order_payments t
USING (
    SELECT
    order_id,
    payment_sequential,
    payment_type,
    payment_installments,
    payment_value
    FROM temp_order_payments) s
ON t.order_id = s.order_id
WHEN NOT MATCHED THEN INSERT *



## Table: Order Reviews

In [100]:
order_reviews_df = (spark
     .read
     .option("header", "true")
     .csv("/var/lib/ngods/stage/olist/olist_order_reviews_dataset.csv"))

In [101]:
order_reviews_df.createOrReplaceTempView("temp_order_reviews")

In [102]:
order_reviews_df.describe()

                                                                                

DataFrame[summary: string, review_id: string, order_id: string, review_score: string, review_comment_title: string, review_comment_message: string, review_creation_date: string, review_answer_timestamp: string]

In [103]:
%%sql
CREATE TABLE IF NOT EXISTS bronze.olist_order_reviews (
    review_id string,
    order_id string,
    review_score string,
    review_comment_title string,
    review_comment_message string,
    review_creation_date string,
    review_answer_timestamp string
)
USING iceberg;

In [104]:
%%sql
MERGE INTO bronze.olist_order_reviews t
USING (
    SELECT
    review_id,
    order_id,
    review_score,
    review_comment_title,
    review_comment_message,
    review_creation_date,
    review_answer_timestamp
    FROM temp_order_reviews) s
ON t.order_id = s.order_id
AND t.review_id = s.review_id
WHEN NOT MATCHED THEN INSERT *

                                                                                

## Table: Orders

In [105]:
orders_df = (spark
     .read
     .option("header", "true")
     .csv("/var/lib/ngods/stage/olist/olist_orders_dataset.csv"))

In [111]:
orders_df.createOrReplaceTempView("temp_orders")

In [107]:
orders_df.describe()

                                                                                

DataFrame[summary: string, order_id: string, customer_id: string, order_status: string, order_purchase_timestamp: string, order_approved_at: string, order_delivered_carrier_date: string, order_delivered_customer_date: string, order_estimated_delivery_date: string]

In [109]:
%%sql
CREATE TABLE IF NOT EXISTS bronze.olist_orders (
    order_id string,
    customer_id string,
    order_status string,
    order_purchase_timestamp string,
    order_approved_at string,
    order_delivered_carrier_date string,
    order_delivered_customer_date string,
    order_estimated_delivery_date string
)
USING iceberg;

In [113]:
%%sql
MERGE INTO bronze.olist_orders t
USING (
    SELECT
    order_id,
    customer_id,
    order_status,
    order_purchase_timestamp,
    order_approved_at,
    order_delivered_carrier_date,
    order_delivered_customer_date,
    order_estimated_delivery_date
    FROM temp_orders) s
ON t.order_id = s.order_id
WHEN NOT MATCHED THEN INSERT *

                                                                                

## Table: Products

In [114]:
products_df = (spark
     .read
     .option("header", "true")
     .csv("/var/lib/ngods/stage/olist/olist_products_dataset.csv"))

In [115]:
products_df.createOrReplaceTempView("temp_products")

In [116]:
products_df.describe()

                                                                                

DataFrame[summary: string, product_id: string, product_category_name: string, product_name_lenght: string, product_description_lenght: string, product_photos_qty: string, product_weight_g: string, product_length_cm: string, product_height_cm: string, product_width_cm: string]

In [118]:
%%sql
CREATE TABLE IF NOT EXISTS bronze.olist_products (
    product_id string,
    product_category_name string,
    product_name_lenght string,
    product_description_lenght string,
    product_photos_qty string,
    product_weight_g string,
    product_length_cm string,
    product_height_cm string,
    product_width_cm string
)
USING iceberg;

In [119]:
%%sql
MERGE INTO bronze.olist_products t
USING (
    SELECT
    product_id,
    product_category_name,
    product_name_lenght,
    product_description_lenght,
    product_photos_qty,
    product_weight_g,
    product_length_cm,
    product_height_cm,
    product_width_cm
    FROM temp_products) s
ON t.product_id = s.product_id
WHEN NOT MATCHED THEN INSERT *

## Table: Sellers

In [124]:
sellers_df = (spark
     .read
     .option("header", "true")
     .csv("/var/lib/ngods/stage/olist/olist_sellers_dataset.csv"))

In [125]:
sellers_df.createOrReplaceTempView("temp_sellers")

In [126]:
sellers_df.describe()

DataFrame[summary: string, seller_id: string, seller_zip_code_prefix: string, seller_city: string, seller_state: string]

In [128]:
%%sql
CREATE TABLE IF NOT EXISTS bronze.olist_sellers (
    seller_id string,
    seller_zip_code_prefix string,
    seller_city string,
    seller_state string
)
USING iceberg;

In [132]:
%%sql
MERGE INTO bronze.olist_sellers t
USING (
    SELECT
    seller_id,
    seller_zip_code_prefix,
    seller_city,
    seller_state
    FROM temp_sellers) s
ON t.seller_id = s.seller_id
WHEN NOT MATCHED THEN INSERT *

## Table: Product Category Name Translation

In [133]:
pcnt_df = (spark
     .read
     .option("header", "true")
     .csv("/var/lib/ngods/stage/olist/product_category_name_translation.csv"))

In [134]:
pcnt_df.createOrReplaceTempView("temp_pcnt")

In [135]:
pcnt_df.describe()

DataFrame[summary: string, product_category_name: string, product_category_name_english: string]

In [136]:
%%sql
CREATE TABLE IF NOT EXISTS bronze.olist_product_category_name_translation (
    product_category_name string,
    product_category_name_english string
)
USING iceberg;

In [137]:
%%sql
MERGE INTO bronze.olist_product_category_name_translation t
USING (
    SELECT
    product_category_name,
    product_category_name_english
    FROM temp_pcnt) s
ON t.product_category_name = s.product_category_name
WHEN NOT MATCHED THEN INSERT *