In [0]:
# Databricks notebook source
from pyspark.sql import functions as F

In [0]:
# 1. Configuration
catalog = "olist_project"
spark.sql(f"USE CATALOG {catalog}")

In [0]:
# 2. Function to clean timestamps (Standardizing common Olist columns)
def clean_olist_dates(df):
    date_cols = [c for c in df.columns if "timestamp" in c or "date" in c]
    for col in date_cols:
        # Format for Olist is usually YYYY-MM-DD HH:MM:SS
        df = df.withColumn(col, F.to_timestamp(F.col(col), "yyyy-MM-dd HH:mm:ss"))
    return df


CLEANING ORDERS

In [0]:
print("ðŸ§¼ Cleaning olist_orders...")
orders_df = spark.read.table("bronze.bronze_orders")
# Apply cleaning
orders_clean = clean_olist_dates(orders_df)
orders_clean = orders_clean.dropDuplicates(["order_id"]) # Deduplicate
# Save to Silver
orders_clean.write.format("delta").mode("overwrite").saveAsTable("silver.orders")

CLEANING CUSTOMERS

In [0]:
print("ðŸ§¼ Cleaning olist_customers...")
customers_df = spark.read.table("bronze.bronze_customers")
customers_clean = customers_df.dropDuplicates(["customer_id"])
customers_clean.write.format("delta").mode("overwrite").saveAsTable("silver.customers")

ENRICHMENT: JOIN ORDERS + CUSTOMERS

In [0]:
print("ðŸ”— Creating enriched_orders...")
enriched_orders = orders_clean.join(customers_clean, on="customer_id", how="left")

# Save as a refined Silver table
enriched_orders.write.format("delta").mode("overwrite").saveAsTable("silver.enriched_orders")

print("âœ… Silver Layer Transformation Complete!")

In [0]:
# Databricks notebook source
from pyspark.sql import functions as F

# 1. Configuration
catalog = "olist_project"
spark.sql(f"USE CATALOG {catalog}")
spark.sql("CREATE SCHEMA IF NOT EXISTS silver")

# Get list of all tables in Bronze
tables = spark.catalog.listTables("bronze")

# 2. Generic Cleaning Function
def clean_table(df, table_name):
    # A. Standardize Column Names (lower case)
    df = df.toDF(*[c.lower() for c in df.columns])
    
    # B. Fix Timestamps
    # Any column with 'timestamp' or 'date' in the name gets converted
    date_cols = [c for c in df.columns if "timestamp" in c or "date" in c]
    for col in date_cols:
        df = df.withColumn(col, F.to_timestamp(F.col(col), "yyyy-MM-dd HH:mm:ss"))
    
    # C. Basic Deduplication
    # We try to guess the ID column (e.g., 'order_id' for 'orders')
    id_col = [c for c in df.columns if "_id" in c and table_name.split("_")[0] in c]
    if id_col:
        df = df.dropDuplicates([id_col[0]])
        
    return df

# 3. Automation Loop
for t in tables:
    source_table = f"bronze.{t.name}"
    target_table = f"silver.{t.name}"
    
    print(f"ðŸ§¹ Cleaning {source_table} -> {target_table}...")
    
    # Read, Clean, Write
    df_raw = spark.read.table(source_table)
    df_clean = clean_table(df_raw, t.name)
    
    df_clean.write.format("delta").mode("overwrite").saveAsTable(target_table)

# 4. Create the "Enriched" Master Table (Special Business Step)
# This joins orders, customers, and items into one big view for Gold layer prep
print("ðŸ”— Creating silver.enriched_orders...")
orders = spark.read.table("silver.orders")
customers = spark.read.table("silver.customers")
items = spark.read.table("silver.order_items")

enriched = orders.join(customers, "customer_id", "left") \
                 .join(items, "order_id", "left")

enriched.write.format("delta").mode("overwrite").saveAsTable("silver.enriched_orders")

print("âœ… All tables are now in Silver!")

In [0]:
for t in spark.catalog.listTables("silver"):
    if t.name.startswith("bronze_"):
        new_name = t.name.replace("bronze_", "silver_", 1)
        spark.sql(f"ALTER TABLE silver.{t.name} RENAME TO silver.{new_name}")


In [0]:
existing_tables = {t.name for t in spark.catalog.listTables("silver")}
for t in spark.catalog.listTables("silver"):
    if not t.name.startswith("silver_"):
        new_name = f"silver_{t.name}"
        if new_name not in existing_tables:
            spark.sql(f"ALTER TABLE silver.{t.name} RENAME TO silver.{new_name}")