##DAY 3: GOLD LAYER & ML READY FEATURES

In [0]:
print("🎯 part 3: GOLD LAYER & ML FEATURE ENGINEERING")
print("="*70)

🎯 part 3: GOLD LAYER & ML FEATURE ENGINEERING


### SET CATALOG & CHECKPOINT

In [0]:
print("\n0️⃣ SETTING UP GOLD LAYER")

spark.sql("USE CATALOG brazil_project")
spark.sql("CREATE SCHEMA IF NOT EXISTS gold")
spark.sql("CREATE SCHEMA IF NOT EXISTS ml")

print("✅ Gold and ML schemas ready")


0️⃣ SETTING UP GOLD LAYER
✅ Gold and ML schemas ready


###1: ML-READY CUSTOMER FEATURES (GOLD)

In [0]:
print("\n1️⃣ CREATING ML-READY CUSTOMER FEATURES (GOLD)")

# Create comprehensive ML features
ml_customer_features = """
WITH customer_base AS (
    SELECT 
        c.customer_id,
        c.customer_city,
        c.customer_state,
        
        -- Basic metrics
        c.total_orders,
        c.total_spent,
        c.avg_order_value,
        c.recency_score,
        c.frequency_score, 
        c.monetary_score,
        c.churned_90d,
        
        -- Time-based features
        c.first_order_date,
        c.last_order_date,
        DATEDIFF(c.last_order_date, c.first_order_date) as customer_tenure_days,
        DATEDIFF(CURRENT_DATE(), c.last_order_date) as days_since_last_purchase,
        
        -- RFM composite score
        (c.recency_score + c.frequency_score + c.monetary_score) as rfm_total_score
        
    FROM silver.customer_360_simple c
    WHERE c.total_orders > 0  -- Only customers with at least one order
),
customer_behavior AS (
    SELECT 
        o.customer_id,
        
        -- Product diversity
        COUNT(DISTINCT p.product_category_name) as unique_categories_purchased,
        
        -- Payment behavior
        COUNT(DISTINCT op.payment_type) as unique_payment_methods,
        AVG(op.payment_installments) as avg_installments,
        
        -- Review behavior
        COUNT(DISTINCT rev.review_id) as total_reviews,
        AVG(rev.review_score) as avg_review_score,
        
        -- Delivery experience
        AVG(DATEDIFF(o.order_delivered_customer_date, o.order_purchase_timestamp)) as avg_delivery_days,
        
        -- Order status analysis
        SUM(CASE WHEN o.order_status = 'delivered' THEN 1 ELSE 0 END) as delivered_orders,
        SUM(CASE WHEN o.order_status = 'canceled' THEN 1 ELSE 0 END) as canceled_orders,
        SUM(CASE WHEN o.order_status = 'unavailable' THEN 1 ELSE 0 END) as unavailable_orders
        
    FROM bronze.orders o
    LEFT JOIN bronze.order_items oi ON o.order_id = oi.order_id
    LEFT JOIN bronze.products p ON oi.product_id = p.product_id
    LEFT JOIN bronze.order_payments op ON o.order_id = op.order_id
    LEFT JOIN bronze.order_reviews rev ON o.order_id = rev.order_id
    GROUP BY o.customer_id
),
customer_geography AS (
    SELECT 
        customer_state,
        COUNT(*) as state_total_customers,
        AVG(total_spent) as state_avg_spent
    FROM silver.customer_360_simple
    WHERE total_spent > 0
    GROUP BY customer_state
)

SELECT 
    cb.*,
    
    -- Behavioral features
    COALESCE(cbh.unique_categories_purchased, 0) as unique_categories_purchased,
    COALESCE(cbh.unique_payment_methods, 0) as unique_payment_methods,
    COALESCE(cbh.avg_installments, 0) as avg_installments,
    COALESCE(cbh.total_reviews, 0) as total_reviews,
    COALESCE(cbh.avg_review_score, 0) as avg_review_score,
    COALESCE(cbh.avg_delivery_days, 0) as avg_delivery_days,
    COALESCE(cbh.delivered_orders, 0) as delivered_orders,
    COALESCE(cbh.canceled_orders, 0) as canceled_orders,
    COALESCE(cbh.unavailable_orders, 0) as unavailable_orders,
    
    -- Geographic context
    COALESCE(cg.state_total_customers, 0) as state_total_customers,
    COALESCE(cg.state_avg_spent, 0) as state_avg_spent,
    
    -- Derived features
    CASE 
        WHEN cb.customer_tenure_days > 180 THEN 'Established'
        WHEN cb.customer_tenure_days > 90 THEN 'Growing'
        WHEN cb.customer_tenure_days > 30 THEN 'New'
        ELSE 'Very New'
    END as customer_segment,
    
    CASE 
        WHEN cb.avg_order_value > 200 THEN 'High Value'
        WHEN cb.avg_order_value > 100 THEN 'Medium Value'
        WHEN cb.avg_order_value > 50 THEN 'Low Value'
        ELSE 'Very Low Value'
    END as value_segment,
    
    -- Interaction features
    cb.total_orders * cb.avg_order_value as estimated_lifetime_value,
    
    -- Target variable for ML (next 30-day churn)
    CASE 
        WHEN cb.days_since_last_purchase > 60 AND cb.days_since_last_purchase <= 90 THEN 1
        ELSE 0
    END as churn_next_30d  -- ML TARGET
    
FROM customer_base cb
LEFT JOIN customer_behavior cbh ON cb.customer_id = cbh.customer_id
LEFT JOIN customer_geography cg ON cb.customer_state = cg.customer_state
"""

# Execute and save to Gold
ml_customer_df = spark.sql(ml_customer_features)
ml_customer_df.write.mode("overwrite").saveAsTable("brazil_project.gold.ml_customer_features")

print(f"✅ ML Customer Features created: {ml_customer_df.count():,} customers")
print("   Features: 25+ engineered features for ML")



1️⃣ CREATING ML-READY CUSTOMER FEATURES (GOLD)
✅ ML Customer Features created: 98,666 customers
   Features: 25+ engineered features for ML


###2: PRODUCT ML FEATURES (GOLD)

In [0]:
print("\n2️⃣ CREATING PRODUCT ML FEATURES")

product_ml_features = """
WITH product_sales AS (
    SELECT 
        p.product_id,
        p.product_category_name,
        
        -- Sales metrics
        COUNT(DISTINCT oi.order_id) as total_orders,
        COUNT(oi.order_item_id) as total_items_sold,
        SUM(oi.price) as total_revenue,
        AVG(oi.price) as avg_price,
        STDDEV(oi.price) as price_stddev,
        
        -- Temporal metrics
        MIN(o.order_purchase_timestamp) as first_sale_date,
        MAX(o.order_purchase_timestamp) as last_sale_date,
        
        -- Customer engagement
        COUNT(DISTINCT o.customer_id) as unique_customers,
        COUNT(DISTINCT oi.seller_id) as unique_sellers
        
    FROM bronze.products p
    LEFT JOIN bronze.order_items oi ON p.product_id = oi.product_id
    LEFT JOIN bronze.orders o ON oi.order_id = o.order_id
    GROUP BY p.product_id, p.product_category_name
),
product_reviews_agg AS (
    SELECT 
        p.product_id,
        COUNT(rev.review_id) as total_reviews,
        AVG(rev.review_score) as avg_review_score,
        STDDEV(rev.review_score) as review_score_stddev,
        SUM(CASE WHEN rev.review_score = 5 THEN 1 ELSE 0 END) as five_star_reviews,
        SUM(CASE WHEN rev.review_score <= 2 THEN 1 ELSE 0 END) as poor_reviews
        
    FROM bronze.products p
    LEFT JOIN bronze.order_items oi ON p.product_id = oi.product_id
    LEFT JOIN bronze.order_reviews rev ON oi.order_id = rev.order_id
    GROUP BY p.product_id
),
product_category_stats AS (
    SELECT 
        product_category_name,
        COUNT(DISTINCT product_id) as category_total_products,
        AVG(total_items_sold) as category_avg_items_sold
    FROM product_sales
    WHERE product_category_name IS NOT NULL
    GROUP BY product_category_name
)

SELECT 
    ps.*,
    
    -- Review metrics
    COALESCE(pr.total_reviews, 0) as total_reviews,
    COALESCE(pr.avg_review_score, 0) as avg_review_score,
    COALESCE(pr.review_score_stddev, 0) as review_score_stddev,
    COALESCE(pr.five_star_reviews, 0) as five_star_reviews,
    COALESCE(pr.poor_reviews, 0) as poor_reviews,
    
    -- Category context
    COALESCE(pcs.category_total_products, 0) as category_total_products,
    COALESCE(pcs.category_avg_items_sold, 0) as category_avg_items_sold,
    
    -- Derived features
    CASE 
        WHEN ps.total_items_sold = 0 THEN 0
        ELSE ps.total_revenue / ps.total_items_sold
    END as revenue_per_item,
    
    CASE 
        WHEN ps.total_items_sold = 0 THEN 0
        ELSE ps.unique_customers * 1.0 / ps.total_items_sold
    END as customer_penetration_rate,
    
    DATEDIFF(CURRENT_DATE(), ps.last_sale_date) as days_since_last_sale,
    
    -- Sales velocity
    CASE 
        WHEN DATEDIFF(ps.last_sale_date, ps.first_sale_date) = 0 THEN ps.total_items_sold
        ELSE ps.total_items_sold * 1.0 / DATEDIFF(ps.last_sale_date, ps.first_sale_date)
    END as daily_sales_rate,
    
    -- Product success score (composite)
    (COALESCE(ps.total_items_sold, 0) * 0.3 +
     COALESCE(pr.avg_review_score, 0) * 0.4 +
     COALESCE(ps.unique_customers, 0) * 0.3) as product_success_score,
    
    -- Target for ML: High demand product
    CASE 
        WHEN ps.total_items_sold > 50 AND pr.avg_review_score > 4.0 THEN 1
        ELSE 0
    END as high_demand_product
    
FROM product_sales ps
LEFT JOIN product_reviews_agg pr ON ps.product_id = pr.product_id
LEFT JOIN product_category_stats pcs ON ps.product_category_name = pcs.product_category_name
"""
# Execute and save
product_ml_df = spark.sql(product_ml_features)
product_ml_df.write.mode("overwrite").saveAsTable("brazil_project.gold.ml_product_features")

print(f"✅ ML Product Features created: {product_ml_df.count():,} products")
print("   Features: 20+ features for product recommendation ML")


2️⃣ CREATING PRODUCT ML FEATURES
✅ ML Product Features created: 32,951 products
   Features: 20+ features for product recommendation ML


##3: TIME-SERIES FEATURES (GOLD)

In [0]:
print("\n3️⃣ CREATING TIME-SERIES FEATURES")

time_series_features = """
SELECT 
    DATE(o.order_purchase_timestamp) as order_date,
    
    -- Daily metrics
    COUNT(DISTINCT o.order_id) as daily_orders,
    COUNT(DISTINCT o.customer_id) as daily_customers,
    COUNT(DISTINCT oi.seller_id) as daily_sellers,
    SUM(oi.price) as daily_revenue,
    AVG(oi.price) as avg_order_value,
    
    -- Product metrics
    COUNT(DISTINCT oi.product_id) as daily_unique_products,
    COUNT(DISTINCT p.product_category_name) as daily_categories,
    
    -- Payment metrics
    COUNT(DISTINCT op.payment_type) as payment_methods_used,
    AVG(op.payment_installments) as avg_installments,
    
    -- Review metrics
    COUNT(DISTINCT rev.review_id) as daily_reviews,
    AVG(rev.review_score) as avg_daily_review_score,
    
    -- Day of week features
    DAYOFWEEK(o.order_purchase_timestamp) as day_of_week,
    CASE 
        WHEN DAYOFWEEK(o.order_purchase_timestamp) IN (1, 7) THEN 1
        ELSE 0
    END as is_weekend,
    
    -- Month features
    MONTH(o.order_purchase_timestamp) as month,
    QUARTER(o.order_purchase_timestamp) as quarter,
    
    -- Rolling averages (for ML)
    AVG(COUNT(DISTINCT o.order_id)) OVER (
        ORDER BY DATE(o.order_purchase_timestamp) 
        ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
    ) as weekly_avg_orders,
    
    AVG(SUM(oi.price)) OVER (
        ORDER BY DATE(o.order_purchase_timestamp) 
        ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
    ) as weekly_avg_revenue
    
FROM bronze.orders o
JOIN bronze.order_items oi ON o.order_id = oi.order_id
LEFT JOIN bronze.products p ON oi.product_id = p.product_id
LEFT JOIN bronze.order_payments op ON o.order_id = op.order_id
LEFT JOIN bronze.order_reviews rev ON o.order_id = rev.order_id
WHERE o.order_purchase_timestamp IS NOT NULL
GROUP BY DATE(o.order_purchase_timestamp), 
         DAYOFWEEK(o.order_purchase_timestamp),
         MONTH(o.order_purchase_timestamp),
         QUARTER(o.order_purchase_timestamp)
ORDER BY order_date
"""

# Execute and save
time_series_df = spark.sql(time_series_features)
time_series_df.write.mode("overwrite").saveAsTable("brazil_project.gold.time_series_features")

print(f"✅ Time-Series Features created: {time_series_df.count():,} days")
print("   Features: Daily metrics with rolling averages for forecasting")



3️⃣ CREATING TIME-SERIES FEATURES
✅ Time-Series Features created: 616 days
   Features: Daily metrics with rolling averages for forecasting


###4: CREATE ML DATASET FOR CHURN PREDICTION

In [0]:
print("\n4️⃣ CREATING ML DATASET FOR CHURN PREDICTION")

# Prepare final ML dataset
ml_dataset = spark.sql("""
SELECT 
    -- Customer demographics
    customer_state,
    customer_segment,
    value_segment,
    
    -- Behavioral features
    total_orders,
    total_spent,
    avg_order_value,
    unique_categories_purchased,
    unique_payment_methods,
    avg_installments,
    total_reviews,
    avg_review_score,
    avg_delivery_days,
    
    -- Time-based features
    customer_tenure_days,
    days_since_last_purchase,
    
    -- RFM features
    recency_score,
    frequency_score,
    monetary_score,
    rfm_total_score,
    
    -- Order status
    delivered_orders,
    canceled_orders,
    unavailable_orders,
    
    -- Geographic context
    state_total_customers,
    state_avg_spent,
    
    -- Derived features
    estimated_lifetime_value,
    
    -- Target variable
    churn_next_30d as label
    
FROM gold.ml_customer_features
WHERE customer_tenure_days IS NOT NULL
  AND total_orders > 0
""")

# Save to ML schema
ml_dataset.write.mode("overwrite").saveAsTable("brazil_project.ml.churn_prediction_dataset")

print(f"✅ ML Dataset created: {ml_dataset.count():,} samples")
print("   Features: 25+ features for churn prediction")
print("   Label: churn_next_30d (1=will churn, 0=won't churn)")


4️⃣ CREATING ML DATASET FOR CHURN PREDICTION
✅ ML Dataset created: 98,666 samples
   Features: 25+ features for churn prediction
   Label: churn_next_30d (1=will churn, 0=won't churn)


## 5: OPTIMIZE GOLD TABLES

In [0]:
print("\n5️⃣ OPTIMIZING GOLD TABLES")

# Optimize all gold tables
tables_to_optimize = [
    "gold.ml_customer_features",
    "gold.ml_product_features", 
    "gold.time_series_features",
    "ml.churn_prediction_dataset"
]

for table in tables_to_optimize:
    print(f"🔧 Optimizing {table}...")
    try:
        spark.sql(f"OPTIMIZE {table} ZORDER BY (customer_state, label)")
        print(f"   ✅ Optimized")
    except:
        print(f"   ⚠️ Could not optimize")



5️⃣ OPTIMIZING GOLD TABLES
🔧 Optimizing gold.ml_customer_features...
   ⚠️ Could not optimize
🔧 Optimizing gold.ml_product_features...
   ⚠️ Could not optimize
🔧 Optimizing gold.time_series_features...
   ⚠️ Could not optimize
🔧 Optimizing ml.churn_prediction_dataset...
   ✅ Optimized


###6: CREATE BUSINESS INSIGHTS (GOLD)

In [0]:
print("\n6️⃣ GENERATING BUSINESS INSIGHTS")

print("\n📊 KEY BUSINESS INSIGHTS FROM GOLD LAYER")
print("="*70)

# Insight 1: Churn Analysis
print("\n🔍 CHURN ANALYSIS:")
spark.sql("""
    SELECT 
        customer_segment,
        COUNT(*) as total_customers,
        SUM(CASE WHEN label = 1 THEN 1 ELSE 0 END) as churn_risk_customers,
        ROUND(SUM(CASE WHEN label = 1 THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as churn_rate_percent,
        ROUND(AVG(total_spent), 2) as avg_customer_value,
        ROUND(AVG(days_since_last_purchase), 1) as avg_days_since_purchase
    FROM ml.churn_prediction_dataset
    GROUP BY customer_segment
    ORDER BY churn_rate_percent DESC
""").show()

# Insight 2: High-Value Customer Segments
print("\n💰 HIGH-VALUE CUSTOMER SEGMENTS:")
spark.sql("""
    SELECT 
        value_segment,
        customer_state,
        COUNT(*) as customers,
        ROUND(SUM(total_spent), 2) as total_revenue,
        ROUND(AVG(total_spent), 2) as avg_spent_per_customer,
        ROUND(AVG(avg_review_score), 2) as avg_satisfaction
    FROM gold.ml_customer_features
    WHERE total_spent > 0
    GROUP BY value_segment, customer_state
    ORDER BY total_revenue DESC
    LIMIT 10
""").show()

# Insight 3: Product Performance
print("\n🏆 TOP PERFORMING PRODUCT CATEGORIES:")
spark.sql("""
    SELECT 
        product_category_name,
        COUNT(DISTINCT product_id) as products,
        SUM(total_items_sold) as total_sales,
        ROUND(SUM(total_revenue), 2) as total_revenue,
        ROUND(AVG(avg_review_score), 2) as avg_rating,
        ROUND(SUM(CASE WHEN high_demand_product = 1 THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as high_demand_percent
    FROM gold.ml_product_features
    WHERE product_category_name IS NOT NULL
    GROUP BY product_category_name
    ORDER BY total_revenue DESC
    LIMIT 10
""").show()



6️⃣ GENERATING BUSINESS INSIGHTS

📊 KEY BUSINESS INSIGHTS FROM GOLD LAYER

🔍 CHURN ANALYSIS:
+----------------+---------------+--------------------+------------------+------------------+-----------------------+
|customer_segment|total_customers|churn_risk_customers|churn_rate_percent|avg_customer_value|avg_days_since_purchase|
+----------------+---------------+--------------------+------------------+------------------+-----------------------+
|        Very New|          98666|                   0|              0.00|            137.75|                 2947.7|
+----------------+---------------+--------------------+------------------+------------------+-----------------------+


💰 HIGH-VALUE CUSTOMER SEGMENTS:
+--------------+--------------+---------+-------------+----------------------+----------------+
| value_segment|customer_state|customers|total_revenue|avg_spent_per_customer|avg_satisfaction|
+--------------+--------------+---------+-------------+----------------------+------------

In [0]:
print("🎯 ARCHITECTURE DIAGRAM")
print("="*60)
print("""
🏗️ MEDALLION ARCHITECTURE IMPLEMENTED:

    ┌─────────────────────────────────────┐
    │        BRONZE LAYER (Raw)           │
    │  ┌─────────────────────────────┐    │
    │  │ • 8 CSV files from Kaggle   │    │
    │  │ • customers (99K rows)      │    │
    │  │ • orders (99K rows)         │    │
    │  │ • order_items (112K rows)   │    │
    │  │ • products (32K rows)       │    │
    │  │ • + 4 more tables           │    │
    │  └─────────────────────────────┘    │
    │               ↓                     │
    │        Clean & Validate             │
    └─────────────────────────────────────┘
                     ↓
    ┌─────────────────────────────────────┐
    │        SILVER LAYER (Cleaned)       │
    │  ┌─────────────────────────────┐    │
    │  │ • customer_360_simple       │    │
    │  │ • RFM Features:             │    │
    │  │   - Recency Score (1-5)     │    │
    │  │   - Frequency Score (1-5)   │    │
    │  │   - Monetary Score (1-5)    │    │
    │  │ • Churn Flag: churned_90d   │    │
    │  └─────────────────────────────┘    │
    │               ↓                     │
    │        Feature Engineering          │
    └─────────────────────────────────────┘
                     ↓
    ┌─────────────────────────────────────┐
    │        GOLD LAYER (ML-ready)        │
    │  ┌─────────────────────────────┐    │
    │  │ • ml_customer_features      │    │
    │  │   - 25+ engineered features │    │
    │  │   - Customer segments       │    │
    │  │   - Value segments          │    │
    │  │ • ml_product_features       │    │
    │  │ • time_series_features      │    │
    │  │ • churn_prediction_dataset  │    │
    │  │   (98,666 samples)          │    │
    │  └─────────────────────────────┘    │
    │               ↓                     │
    │        Ready for ML Training        │
    └─────────────────────────────────────┘

🎯 NEXT: Day 4 → Machine Learning & Predictions
""")

print("\n📊 SCHEMA STRUCTURE:")
print("="*40)
print("brazil_project (Catalog)")
print("├── bronze/              # Raw ingested data")
print("│   ├── customers")
print("│   ├── orders")
print("│   ├── order_items")
print("│   └── ... (5 more)")
print("├── silver/              # Cleaned, joined")
print("│   └── customer_360_simple")
print("├── gold/                # Business & ML ready")
print("│   ├── ml_customer_features")
print("│   ├── ml_product_features")
print("│   └── time_series_features")
print("└── ml/                  # Machine Learning")
print("    ├── churn_prediction_dataset")
print("    └── customer_churn_predictions (Day 4 output)")

🎯 ARCHITECTURE DIAGRAM

🏗️ MEDALLION ARCHITECTURE IMPLEMENTED:

    ┌─────────────────────────────────────┐
    │        BRONZE LAYER (Raw)           │
    │  ┌─────────────────────────────┐    │
    │  │ • 8 CSV files from Kaggle   │    │
    │  │ • customers (99K rows)      │    │
    │  │ • orders (99K rows)         │    │
    │  │ • order_items (112K rows)   │    │
    │  │ • products (32K rows)       │    │
    │  │ • + 4 more tables           │    │
    │  └─────────────────────────────┘    │
    │               ↓                     │
    │        Clean & Validate             │
    └─────────────────────────────────────┘
                     ↓
    ┌─────────────────────────────────────┐
    │        SILVER LAYER (Cleaned)       │
    │  ┌─────────────────────────────┐    │
    │  │ • customer_360_simple       │    │
    │  │ • RFM Features:             │    │
    │  │   - Recency Score (1-5)     │    │
    │  │   - Frequency Score (1-5)   │    │
    │  │   - Monetary Score (1-5)

In [0]:
print("📁 ACTUAL DATABRICKS STRUCTURE")
print("="*50)

print("\n1️⃣ CATALOGS:")
catalogs = spark.sql("SHOW CATALOGS").collect()
for cat in catalogs:
    if cat.catalog == "brazil_project":
        print(f"   ✅ {cat.catalog} (Our project catalog)")

print("\n2️⃣ SCHEMAS in brazil_project:")
spark.sql("USE CATALOG brazil_project")
schemas = spark.sql("SHOW SCHEMAS").collect()
for schema in schemas:
    print(f"   • {schema.databaseName}")

print("\n3️⃣ TABLES in Each Schema:")
for schema in ['bronze', 'silver', 'gold', 'ml']:
    try:
        print(f"\n   📂 {schema.upper()}:")
        tables = spark.sql(f"SHOW TABLES IN {schema}").collect()
        for table in tables:
            print(f"      └ {table.tableName}")
    except:
        print(f"   ⚠️  Could not access {schema}")

📁 ACTUAL DATABRICKS STRUCTURE

1️⃣ CATALOGS:
   ✅ brazil_project (Our project catalog)

2️⃣ SCHEMAS in brazil_project:
   • bronze
   • data
   • default
   • gold
   • information_schema
   • ml
   • silver

3️⃣ TABLES in Each Schema:

   📂 BRONZE:
      └ customers
      └ geolocation
      └ order_items
      └ order_payments
      └ order_reviews
      └ orders
      └ products
      └ sellers

   📂 SILVER:
      └ customer_360_simple

   📂 GOLD:
      └ ml_customer_features
      └ ml_product_features
      └ time_series_features

   📂 ML:
      └ churn_prediction_dataset
      └ customer_churn_predictions
      └ customer_predictions
