### Common setup

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, unix_timestamp, when

CATALOG = "olist_ecommerce"
BRONZE_SCHEMA = "bronze"
SILVER_SCHEMA = "silver"

### Customers Table

In [0]:
# Read from Bronze
customers_df = spark.table(f"{CATALOG}.{BRONZE_SCHEMA}.customers")

# Data cleaning and Standarising datatypes

customers_clean = (
    customers_df
    .filter(F.col("customer_id").isNotNull())
    .dropDuplicates(["customer_id"])
    .withColumn("city", F.trim(F.lower("customer_city")))
    .withColumn("state", F.upper("customer_state"))
    .withColumn('zip_code', col('customer_zip_code_prefix').cast('int'))
    .drop("customer_city","customer_state" ,"customer_zip_code_prefix")
)

# Write to Silver
customers_clean.write.mode("overwrite").saveAsTable(f"{CATALOG}.{SILVER_SCHEMA}.customers")

### Orders Table

In [0]:
# Read from Bronze
orders_df = spark.table(f"{CATALOG}.{BRONZE_SCHEMA}.orders")

# Data cleaning and standarising datatypes

orders_clean = (
    orders_df
    .filter(F.col("order_id").isNotNull())
    .dropDuplicates(["order_id"])
    .withColumn("purchase_timestamp", col("order_purchase_timestamp").cast("timestamp"))
    .withColumn("approved_at", col("order_approved_at").cast("timestamp"))
    .withColumn("delivered_carrier_date", col("order_delivered_carrier_date").cast("timestamp"))
    .withColumn("delivered_customer_date", col("order_delivered_customer_date").cast("timestamp"))
    .withColumn("estimated_delivery_date", col("order_estimated_delivery_date").cast("timestamp"))
    .filter(F.col("order_status").isin("delivered", "shipped", "processing","canceled"))
)

# Derived Columns
orders_clean = orders_clean\
    .withColumn("approval_delay_hours",
                (unix_timestamp("approved_at") - unix_timestamp("purchase_timestamp")) / 3600)\
    .withColumn("actual_delivery_days",F.datediff("delivered_customer_date", "purchase_timestamp"))\
    .withColumn("estimated_delivery_days",F.datediff("estimated_delivery_date", "purchase_timestamp"))\
    .withColumn("is_late_delivery",when(col("delivered_customer_date") > col("estimated_delivery_date"), 1).otherwise(0))\
    .withColumn("is_delivered_flag",when(col("order_status") == "delivered", 1).otherwise(0))\
    .withColumn("is_canceled_flag",when(col("order_status") == "canceled", 1).otherwise(0))

# Drop Unnecessary Columns
orders_clean = orders_clean.drop(
    "order_purchase_timestamp",
    "order_approved_at",
    "order_delivered_carrier_date",
    "order_delivered_customer_date",
    "order_estimated_delivery_date"
)

# Write to Silver
orders_clean.write.mode("overwrite").saveAsTable(f"{CATALOG}.{SILVER_SCHEMA}.orders")

### Order_Items Table

In [0]:
# Read from Bronze
order_items_df = spark.table(f"{CATALOG}.{BRONZE_SCHEMA}.order_items")

# Data cleaning and Standarising datatype
order_items_clean = (
    order_items_df
    .dropna(subset=["order_id", "order_item_id"])
    .dropDuplicates(["order_id", "order_item_id"])
    .withColumn("shipping_limit_date", col("shipping_limit_date").cast("timestamp"))
    .withColumn("price", F.col("price").cast("double"))
    .withColumn("freight_value", F.col("freight_value").cast("double"))
)

# Add Drived Columns
order_items_clean = order_items_clean\
    .withColumn("item_total_value", F.col("price") + F.col("freight_value"))\
    .withColumn('freight_ratio', F.when(F.col('price') > 0,F.col('freight_value') / F.col('price') ).otherwise(0))

# Write to Silver 
order_items_clean.write.mode("overwrite").saveAsTable(f"{CATALOG}.{SILVER_SCHEMA}.order_items")

### Products Table

In [0]:
# Read from Bronze
products_df = spark.table(f"{CATALOG}.{BRONZE_SCHEMA}.products")

# Data cleaning and standarising datatypes
products_clean = (
    products_df
    .filter(F.col("product_id").isNotNull())
    .dropDuplicates(["product_id"])
    .withColumn("product_category_name", F.lower("product_category_name"))
    .withColumn("product_weight_g", F.col("product_weight_g").cast("long"))
    .withColumn("product_length_cm", F.col("product_length_cm").cast("long"))
    .withColumn("product_height_cm", F.col("product_height_cm").cast("long"))
    .withColumn("product_width_cm", F.col("product_width_cm").cast("long"))
)

# Add Drived Columns
products_clean = products_clean\
    .withColumn("product_volume_cm3", F.col("product_length_cm") * F.col("product_height_cm") * F.col("product_width_cm"))\
    .withColumn("product_density", when(F.col("product_volume_cm3") > 0,
                                        F.col("product_weight_g") / F.col("product_volume_cm3")).otherwise(0))

# Drop Unnecessary Columns
products_clean = products_clean.drop("product_name_lenght", "product_description_lenght", "product_photos_qty") 

# Write to Silver
products_clean.write.mode("overwrite").saveAsTable(f"{CATALOG}.{SILVER_SCHEMA}.products")

### Product Category

In [0]:
# Read from Bronze
category_df = spark.table(f"{CATALOG}.{BRONZE_SCHEMA}.product_category_en")

# Data Formatting

category_clean = (
    category_df
    .withColumn("product_category_name", F.lower("product_category_name"))
    .withColumn("product_category_name_english", F.lower("product_category_name_english"))
)

# Write to Silver
category_clean.write.mode("overwrite").saveAsTable(f"{CATALOG}.{SILVER_SCHEMA}.product_category_en")

### Seller Table

In [0]:
# Read from Bronze
sellers_df = spark.table(f"{CATALOG}.{BRONZE_SCHEMA}.sellers")

# Data Cleaning 
sellers_clean = (
    sellers_df
    .filter(col("seller_id").isNotNull())
    .dropDuplicates(["seller_id"])
    .withColumn("seller_city", F.trim(F.lower("seller_city")))
    .withColumn("seller_state", F.upper("seller_state"))
    .withColumn('seller_zip_code', col('seller_zip_code_prefix').cast('int'))
    .drop('seller_zip_code_prefix')
)

# Write to Silver
sellers_clean.write.mode("overwrite").saveAsTable(f"{CATALOG}.{SILVER_SCHEMA}.sellers")

### Payments

In [0]:
# Read from Bronze
payments_df = spark.table(f"{CATALOG}.{BRONZE_SCHEMA}.payments")

# Data Cleaning
payments_clean = (
    payments_df
    .withColumn("payment_sequential", F.col("payment_sequential").cast("int"))
    .withColumn("payment_value", F.col("payment_value").cast("double"))
    .withColumn("payment_installments", F.col("payment_installments").cast("int"))
)

# Add Drived Columns
payments_clean = payments_clean\
    .withColumn("is_installment", F.col("payment_installments") > 1)

# Write to Silver
payments_clean.write.mode("overwrite").saveAsTable(f"{CATALOG}.{SILVER_SCHEMA}.payments")


### Reviews Table

In [0]:
# Read form Bronze
reviews_df = spark.table(f"{CATALOG}.{BRONZE_SCHEMA}.reviews")

# Data Cleaning
reviews_clean = (
    reviews_df
    .dropna(subset=["review_id","order_id"])
    .dropDuplicates(["review_id", "order_id"])
    .withColumn("review_score", F.col("review_score").cast("int"))
    .withColumn("review_creation_date", F.col("review_creation_date").cast("timestamp"))
    .withColumn("review_answer_timestamp", F.col("review_answer_timestamp").cast("timestamp"))

)
# Write to Silver
reviews_clean.write.mode("overwrite").saveAsTable(f"{CATALOG}.{SILVER_SCHEMA}.reviews")


### Geolocation Table

In [0]:
# Read from Bronze
geo_df = spark.table(f"{CATALOG}.{BRONZE_SCHEMA}.geolocation")

# Data Cleaning
geo_clean = (
    geo_df
    .withColumn("geolocation_city", F.trim(F.lower("geolocation_city")))
    .withColumn("geolocation_state", F.upper("geolocation_state"))
    .withColumn("zip_code", col("geolocation_zip_code_prefix").cast("int"))
    .dropDuplicates(["zip_code"])
    .drop("geolocation_zip_code_prefix", "geolocation_lat","geolocation_lng")
)

# Write to Silver
geo_clean.write.mode("overwrite").saveAsTable(f"{CATALOG}.{SILVER_SCHEMA}.geolocation")
