### #ETL PIPELINE IN BRONZE, SILVER, AND GOLD

In [0]:
from pyspark.sql.functions import col, trim, lower, current_timestamp, sum, avg, count

# ============================================
# BRONZE LAYER - Raw data ingestion
# ============================================
spark.sql("CREATE DATABASE IF NOT EXISTS bronze")
spark.sql("CREATE DATABASE IF NOT EXISTS silver")
spark.sql("CREATE DATABASE IF NOT EXISTS gold")

# Load raw data to bronze (already done)
print("✓ Bronze layer exists")

# ============================================
# SILVER LAYER - Data Cleaning & Validation
# ============================================
print("Creating Silver layer...")

# Read from Bronze
customer = spark.table("bronze.customer").drop("injection_date")
product = spark.table("bronze.product").drop("injection_date")
order = spark.table("bronze.order").drop("injection_date")
orders = spark.table("bronze.orders").drop("injection_date")

# Join
sales_joined = (
    order
    .join(orders, "order_id", "left")
    .join(customer, "customer_id", "left")
    .join(product, "product_id", "left")
)

# Clean in Silver
sales_clean = (sales_joined
    .withColumn("price", col("price").cast("double"))
    .withColumn("freight_value", col("freight_value").cast("double"))
    .dropDuplicates()
    .dropna(subset=["order_id", "customer_id"])
    .filter((col("price").isNull()) | (col("price") >= 0))
    .withColumn("order_status", trim(lower(col("order_status"))))
    .withColumn("processed_date", current_timestamp())
)

# Save Silver
# Add the overwriteSchema option
sales_clean.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("silver.sales_cleaned")

print("✓ Silver layer created with schema overwrite")
print(f"✓ Silver layer created: {sales_clean.count()} rows")

# ============================================
# GOLD LAYER - Business Aggregations
# ============================================
print("Creating Gold layer...")

silver_df = spark.table("silver.sales_cleaned")

# Business metric 1: Sales by category
(silver_df.groupBy("product_category_name")
    .agg(
        count("order_id").alias("total_orders"),
        sum("price").alias("total_revenue"),
        avg("price").alias("avg_order_value")
    )
    .write.format("delta").mode("overwrite")
    .saveAsTable("gold.sales_by_category"))

# Business metric 2: Customer lifetime value
(silver_df.groupBy("customer_id", "customer_unique_id", "customer_state")
    .agg(
        count("order_id").alias("total_orders"),
        sum("price").alias("lifetime_value")
    )
    .write.format("delta").mode("overwrite")
    .saveAsTable("gold.customer_lifetime_value"))

print("✓ Gold layer created")

# Verify the pipeline
print("\n=== Pipeline Summary ===")
print(f"Bronze.customer: {spark.table('bronze.customer').count()} rows")
print(f"Silver.sales_cleaned: {spark.table('silver.sales_cleaned').count()} rows")
print(f"Gold.sales_by_category: {spark.table('gold.sales_by_category').count()} rows")

✓ Bronze layer exists
Creating Silver layer...
✓ Silver layer created with schema overwrite
✓ Silver layer created: 112650 rows
Creating Gold layer...
✓ Gold layer created

=== Pipeline Summary ===
Bronze.customer: 99441 rows
Silver.sales_cleaned: 112650 rows
Gold.sales_by_category: 74 rows


In [0]:
from pyspark.sql.functions import count, sum, avg, countDistinct, month, year

silver_df = spark.table("silver.sales_cleaned")

# Gold: Monthly sales trend
print("Creating gold.monthly_sales...")
monthly_sales = silver_df.groupBy(
    year("order_purchase_timestamp").alias("year"),
    month("order_purchase_timestamp").alias("month")
).agg(
    count("order_id").alias("total_orders"),
    sum("price").alias("total_revenue"),
    avg("price").alias("avg_order_value"),
    countDistinct("customer_id").alias("unique_customers")
).orderBy("year", "month")

monthly_sales.write.format("delta").mode("overwrite").saveAsTable("gold.monthly_sales")
print(f"✓ gold.monthly_sales: {monthly_sales.count()} months")

# Gold: Top customers
print("\nCreating gold.top_customers...")
top_customers = silver_df.groupBy("customer_id", "customer_unique_id", "customer_state").agg(
    count("order_id").alias("total_orders"),
    sum("price").alias("total_spent"),
    avg("price").alias("avg_order_value")
).orderBy(col("total_spent").desc()).limit(1000)

top_customers.write.format("delta").mode("overwrite").saveAsTable("gold.top_customers")
print(f"✓ gold.top_customers: {top_customers.count()} customers")

print("\n✅ Additional Gold tables created!")

Creating gold.monthly_sales...
✓ gold.monthly_sales: 24 months

Creating gold.top_customers...
✓ gold.top_customers: 1000 customers

✅ Additional Gold tables created!


In [0]:
print("\n" + "="*60)
print("FINAL PIPELINE STATUS")
print("="*60)
print("\n📦 BRONZE (Raw):")
print(f"  bronze.customer: {spark.table('bronze.customer').count():,} rows")
print(f"  bronze.product: {spark.table('bronze.product').count():,} rows")
print(f"  bronze.order: {spark.table('bronze.order').count():,} rows")
print(f"  bronze.orders: {spark.table('bronze.orders').count():,} rows")

print("\n🔧 SILVER (Cleaned):")
print(f"  silver.sales_cleaned: {spark.table('silver.sales_cleaned').count():,} rows")

print("\n⭐ GOLD (Business Metrics):")
print(f"  gold.sales_by_category: {spark.table('gold.sales_by_category').count():,} categories")
print(f"  gold.customer_metrics: {spark.table('gold.customer_metrics').count():,} customers")
print("\n✅ Medallion Architecture Complete!")
print("="*60)


FINAL PIPELINE STATUS

📦 BRONZE (Raw):
  bronze.customer: 99,441 rows
  bronze.product: 32,951 rows
  bronze.order: 112,650 rows
  bronze.orders: 99,441 rows

🔧 SILVER (Cleaned):
  silver.sales_cleaned: 112,650 rows

⭐ GOLD (Business Metrics):
  gold.sales_by_category: 74 categories
  gold.customer_metrics: 98,666 customers

✅ Medallion Architecture Complete!
