In [0]:
# GOLD LAYER - Business Metrics & Customer Analytics
# Purpose: Create aggregated business-ready tables for Power BI

from pyspark.sql import functions as F
from pyspark.sql.window import Window
from datetime import datetime, timedelta

# ============================================
# 1. CUSTOMER METRICS & RFM ANALYSIS
# ============================================

master = spark.table("silver_orders_master")

# Calculate reference date (latest order date in dataset)
max_date = master.agg(F.max("order_purchase_timestamp")).collect()[0][0]
print(f"Reference date for RFM: {max_date}")

# Customer-level aggregations
customer_metrics = master.groupBy("customer_id", "customer_unique_id").agg(
    # Recency (days since last purchase)
    F.datediff(F.lit(max_date), F.max("order_purchase_timestamp")).alias("recency_days"),
    
    # Frequency (number of orders)
    F.count("order_id").alias("frequency"),
    
    # Monetary (total amount spent)
    F.sum("order_total_value").alias("monetary_value"),
    
    # Additional metrics
    F.avg("order_total_value").alias("avg_order_value"),
    F.avg("delivery_days").alias("avg_delivery_days"),
    F.avg("delivery_delay_days").alias("avg_delivery_delay"),
    F.avg("avg_review_score").alias("avg_review_score"),
    F.min("order_purchase_timestamp").alias("first_order_date"),
    F.max("order_purchase_timestamp").alias("last_order_date"),
    F.sum("items_count").alias("total_items_purchased"),
    F.first("customer_city").alias("customer_city"),
    F.first("customer_state").alias("customer_state")
)

# Calculate customer lifetime (days between first and last order)
customer_metrics = customer_metrics.withColumn(
    "customer_lifetime_days",
    F.datediff(F.col("last_order_date"), F.col("first_order_date"))
)

# Calculate RFM scores (quintiles: 1-5, where 5 is best)
for metric in ["recency_days", "frequency", "monetary_value"]:
    customer_metrics = customer_metrics.withColumn(
        f"{metric}_score",
        F.ntile(5).over(Window.orderBy(
            F.col(metric).desc() if metric == "recency_days" else F.col(metric).asc()
        ))
    )

# Invert recency score (lower recency = better)
customer_metrics = customer_metrics.withColumn(
    "recency_score",
    6 - F.col("recency_days_score")
)

# Calculate combined RFM score
customer_metrics = customer_metrics.withColumn(
    "rfm_score",
    F.col("recency_score") + F.col("frequency_score") + F.col("monetary_value_score")
)

# Create RFM segments
customer_metrics = customer_metrics.withColumn(
    "customer_segment",
    F.when((F.col("rfm_score") >= 13), "Champions")
    .when((F.col("rfm_score") >= 11), "Loyal Customers")
    .when((F.col("rfm_score") >= 9) & (F.col("recency_score") >= 4), "Potential Loyalists")
    .when((F.col("rfm_score") >= 9), "Recent Customers")
    .when((F.col("rfm_score") >= 7) & (F.col("recency_score") >= 3), "Promising")
    .when((F.col("rfm_score") >= 6), "Customers Needing Attention")
    .when((F.col("recency_score") >= 4), "About To Sleep")
    .when((F.col("rfm_score") >= 4), "At Risk")
    .when((F.col("monetary_value_score") >= 4), "Can't Lose Them")
    .otherwise("Lost")
)

# Calculate Customer Lifetime Value (CLV) - simplified projection
customer_metrics = customer_metrics.withColumn(
    "clv_12_months",
    F.when(F.col("customer_lifetime_days") > 0,
        (F.col("monetary_value") / F.col("customer_lifetime_days")) * 365
    ).otherwise(F.col("monetary_value") * 12)
)

customer_metrics.write.format("delta").mode("overwrite").saveAsTable("gold_customer_metrics")
print("âœ“ Created gold_customer_metrics")

# ============================================
# 2. PRODUCT PERFORMANCE METRICS
# ============================================

order_items = spark.table("silver_order_items")

product_metrics = order_items.groupBy(
    "product_id",
    "category_english"
).agg(
    F.count("order_id").alias("total_orders"),
    F.sum("price").alias("total_revenue"),
    F.avg("price").alias("avg_price"),
    F.sum("freight_value").alias("total_freight"),
    F.avg("freight_value").alias("avg_freight")
)

# Add ranking
product_metrics = product_metrics.withColumn(
    "revenue_rank",
    F.row_number().over(Window.orderBy(F.col("total_revenue").desc()))
)

product_metrics.write.format("delta").mode("overwrite").saveAsTable("gold_product_metrics")
print("âœ“ Created gold_product_metrics")

# ============================================
# 3. CATEGORY PERFORMANCE
# ============================================

category_metrics = order_items.groupBy("category_english").agg(
    F.count("order_id").alias("total_orders"),
    F.sum("price").alias("total_revenue"),
    F.avg("price").alias("avg_item_price"),
    F.countDistinct("product_id").alias("unique_products"),
    F.countDistinct("seller_id").alias("unique_sellers")
)

category_metrics = category_metrics.withColumn(
    "revenue_rank",
    F.row_number().over(Window.orderBy(F.col("total_revenue").desc()))
)

category_metrics.write.format("delta").mode("overwrite").saveAsTable("gold_category_metrics")
print("âœ“ Created gold_category_metrics")

# ============================================
# 4. GEOGRAPHIC PERFORMANCE
# ============================================

geo_metrics = master.groupBy("customer_state", "customer_city").agg(
    F.count("order_id").alias("total_orders"),
    F.sum("order_total_value").alias("total_revenue"),
    F.avg("order_total_value").alias("avg_order_value"),
    F.countDistinct("customer_unique_id").alias("unique_customers"),
    F.avg("delivery_days").alias("avg_delivery_days"),
    F.avg("delivery_delay_days").alias("avg_delivery_delay"),
    F.avg("avg_review_score").alias("avg_review_score")
)

geo_metrics.write.format("delta").mode("overwrite").saveAsTable("gold_geographic_metrics")
print("âœ“ Created gold_geographic_metrics")

# ============================================
# 5. TIME-SERIES METRICS (DAILY)
# ============================================

daily_metrics = master.withColumn(
    "order_date",
    F.to_date("order_purchase_timestamp")
).groupBy("order_date").agg(
    F.count("order_id").alias("orders_count"),
    F.sum("order_total_value").alias("revenue"),
    F.avg("order_total_value").alias("avg_order_value"),
    F.countDistinct("customer_unique_id").alias("unique_customers"),
    F.avg("delivery_days").alias("avg_delivery_days"),
    F.avg("avg_review_score").alias("avg_review_score")
)

daily_metrics = daily_metrics.orderBy("order_date")

daily_metrics.write.format("delta").mode("overwrite").saveAsTable("gold_daily_metrics")
print("âœ“ Created gold_daily_metrics")

print("\nðŸŽ‰ Gold layer complete! Business metrics ready for Power BI.")