**Data Generation & Data Model Setup for Customer Health & Subscriptions Insights Dashboard**

In [0]:
## 1. Setup & Configuration



import numpy as np
import pandas as pd
from datetime import datetime, timedelta
import random
import hashlib
import uuid
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window

spark = SparkSession.builder.getOrCreate()

# Set random seed for reproducibility
np.random.seed(42)
random.seed(42)

# Configuration
START_DATE = datetime(2024, 12, 1)
END_DATE = datetime(2026, 1, 31)
NUM_CUSTOMERS = 2800  # Enough for meaningful cohorts and segments

print("✅ Setup complete")
print(f"📅 Date range: {START_DATE.strftime('%Y-%m-%d')} to {END_DATE.strftime('%Y-%m-%d')}")
print(f"👥 Target customers: {NUM_CUSTOMERS}")

# COMMAND ----------

# MAGIC %md
# MAGIC ## 2. Product Catalog (dim_products)
# MAGIC Based on actual IM8 Health product lineup from im8health.com

# COMMAND ----------

products_data = [
    {
        "product_id": "PROD_001",
        "product_name": "IM8 Daily Ultimate Essentials",
        "product_category": "Core Nutrition",
        "subscription_price_usd": 84.00,  # mid-range of $79-$89
        "onetime_price_usd": 105.50,  # mid-range of $99-$112
        "subscription_price_low": 79.00,
        "subscription_price_high": 89.00,
        "onetime_price_low": 99.00,
        "onetime_price_high": 112.00,
        "product_description": "All-in-one nutrient drink with 92 ingredients replacing 16 supplements",
        "servings_per_unit": 30,
        "cost_per_serving_sub": 2.80,
        "launch_date": "2024-11-01",
        "is_active": True
    },
    {
        "product_id": "PROD_002",
        "product_name": "IM8 Daily Ultimate Longevity",
        "product_category": "Longevity",
        "subscription_price_usd": 119.00,
        "onetime_price_usd": 149.00,
        "subscription_price_low": 119.00,
        "subscription_price_high": 119.00,
        "onetime_price_low": 149.00,
        "onetime_price_high": 149.00,
        "product_description": "Supports 12 hallmarks of aging with NMN and Resveratrol",
        "servings_per_unit": 30,
        "cost_per_serving_sub": 3.97,
        "launch_date": "2024-11-01",
        "is_active": True
    },
    {
        "product_id": "PROD_003",
        "product_name": "The Beckham Stack (Essentials + Longevity)",
        "product_category": "Bundle",
        "subscription_price_usd": 182.67,  # mid of $169.33-$196
        "onetime_price_usd": 248.00,  # mid of $235-$261
        "subscription_price_low": 169.33,
        "subscription_price_high": 196.00,
        "onetime_price_low": 235.00,
        "onetime_price_high": 261.00,
        "product_description": "Complete daily nutrition bundle for 90-day transformation",
        "servings_per_unit": 30,
        "cost_per_serving_sub": 6.09,
        "launch_date": "2024-11-01",
        "is_active": True
    },
    {
        "product_id": "ACC_001",
        "product_name": "IM8 Limited Edition Vegan Leather Travel Pouch",
        "product_category": "Accessory",
        "subscription_price_usd": 0.0,
        "onetime_price_usd": 28.00,
        "subscription_price_low": 0.0,
        "subscription_price_high": 0.0,
        "onetime_price_low": 28.00,
        "onetime_price_high": 28.00,
        "product_description": "Premium vegan leather travel pouch",
        "servings_per_unit": 0,
        "cost_per_serving_sub": 0.0,
        "launch_date": "2024-11-01",
        "is_active": True
    },
    {
        "product_id": "ACC_002",
        "product_name": "Signature Red Cup (Mixer)",
        "product_category": "Accessory",
        "subscription_price_usd": 0.0,
        "onetime_price_usd": 18.00,
        "subscription_price_low": 0.0,
        "subscription_price_high": 0.0,
        "onetime_price_low": 18.00,
        "onetime_price_high": 18.00,
        "product_description": "Signature mixing cup – often free with first subscription month",
        "servings_per_unit": 0,
        "cost_per_serving_sub": 0.0,
        "launch_date": "2024-11-01",
        "is_active": True
    }
]

df_products = spark.createDataFrame(pd.DataFrame(products_data))
df_products.createOrReplaceTempView("dim_products")
df_products.display()
print(f"✅ Product catalog: {df_products.count()} products loaded")

# COMMAND ----------

# MAGIC %md
# MAGIC ## 3. Customer Generation (dim_customers)
# MAGIC 
# MAGIC **Assumptions documented:**
# MAGIC - IM8 launched Nov 2024; first customers Dec 2024
# MAGIC - Markets: Hong Kong, broader Asia (Singapore, Japan, South Korea, Australia), US
# MAGIC - Demographics skew 28-55 (health-conscious, premium product buyers)
# MAGIC - Acquisition channels reflect DTC health brand typical mix
# MAGIC - Growth pattern: slow start Dec 2024, accelerating through 2025

# COMMAND ----------

def generate_customers(n=2800):
    """Generate realistic customer data for IM8 Health"""
    
    customers = []
    
    # --- REGION & COUNTRY DISTRIBUTION ---
    # IM8 is HK-based, strong Asia presence, growing US market
    regions_config = {
        "Asia-Pacific": {
            "weight": 0.55,
            "countries": {
                "Hong Kong": 0.30,
                "Singapore": 0.20,
                "Japan": 0.15,
                "South Korea": 0.10,
                "Australia": 0.15,
                "Taiwan": 0.05,
                "Thailand": 0.05
            }
        },
        "North America": {
            "weight": 0.35,
            "countries": {
                "United States": 0.85,
                "Canada": 0.15
            }
        },
        "Europe": {
            "weight": 0.10,
            "countries": {
                "United Kingdom": 0.50,
                "Germany": 0.25,
                "France": 0.25
            }
        }
    }
    
    # --- US STATE DISTRIBUTION (for US customers) ---
    us_states = ["California", "New York", "Texas", "Florida", "Illinois", 
                 "Washington", "Colorado", "Massachusetts", "Arizona", "Oregon",
                 "New Jersey", "Georgia", "Virginia", "North Carolina", "Pennsylvania"]
    us_state_weights = [0.20, 0.15, 0.10, 0.08, 0.06,
                        0.06, 0.05, 0.05, 0.04, 0.04,
                        0.04, 0.04, 0.03, 0.03, 0.03]
    
    # --- ACQUISITION CHANNELS ---
    channels = {
        "Instagram Ads": 0.22,
        "Google Search": 0.15,
        "TikTok": 0.12,
        "Influencer/KOL": 0.14,
        "Referral": 0.10,
        "YouTube": 0.08,
        "Podcast": 0.06,
        "Organic/Direct": 0.08,
        "Facebook Ads": 0.05
    }
    
    # --- AGE DISTRIBUTION (health-conscious premium buyers) ---
    age_ranges = {
        "18-24": (18, 24, 0.08),
        "25-34": (25, 34, 0.30),
        "35-44": (35, 44, 0.28),
        "45-54": (45, 54, 0.20),
        "55-64": (55, 64, 0.10),
        "65+": (65, 75, 0.04)
    }
    
    # --- GENDER DISTRIBUTION ---
    genders = {"Male": 0.48, "Female": 0.45, "Non-binary": 0.04, "Prefer not to say": 0.03}
    
    # --- ACQUISITION DATE DISTRIBUTION (growth curve) ---
    # Simulate startup growth: slow start, accelerating
    month_weights = {
        "2024-12": 0.03,   # Launch month, small
        "2025-01": 0.04,
        "2025-02": 0.05,
        "2025-03": 0.06,
        "2025-04": 0.07,
        "2025-05": 0.08,
        "2025-06": 0.08,
        "2025-07": 0.09,
        "2025-08": 0.09,
        "2025-09": 0.10,
        "2025-10": 0.10,
        "2025-11": 0.11,
        "2025-12": 0.06,
        "2026-01": 0.04
    }
    
    # --- HEALTH GOALS ---
    health_goals = [
        "General Wellness", "Energy & Vitality", "Anti-Aging/Longevity",
        "Gut Health & Digestion", "Better Sleep", "Immune Support",
        "Athletic Performance", "Weight Management", "Cognitive Function",
        "Skin Health"
    ]
    health_goal_weights = [0.18, 0.16, 0.14, 0.12, 0.10, 0.08, 0.07, 0.06, 0.05, 0.04]
    
    for i in range(n):
        customer_id = f"CUST_{str(i+1).zfill(5)}"
        
        # Region & Country
        region = np.random.choice(list(regions_config.keys()), 
                                   p=[v["weight"] for v in regions_config.values()])
        country_config = regions_config[region]["countries"]
        country = np.random.choice(list(country_config.keys()), 
                                    p=list(country_config.values()))
        
        # State (US only)
        state = np.random.choice(us_states, p=us_state_weights) if country == "United States" else None
        
        # Demographics
        age_bucket = np.random.choice(list(age_ranges.keys()), 
                                       p=[v[2] for v in age_ranges.values()])
        age_min, age_max, _ = age_ranges[age_bucket]
        age = np.random.randint(age_min, age_max + 1)
        
        gender = np.random.choice(list(genders.keys()), p=list(genders.values()))
        
        # Acquisition
        channel = np.random.choice(list(channels.keys()), p=list(channels.values()))
        
        # Acquisition month
        acq_month = np.random.choice(list(month_weights.keys()), p=list(month_weights.values()))
        acq_year, acq_mo = map(int, acq_month.split("-"))
        acq_day = np.random.randint(1, 28)  # safe day range
        acquisition_date = datetime(acq_year, acq_mo, acq_day)
        
        # Health goal
        primary_health_goal = np.random.choice(health_goals, p=health_goal_weights)
        
        # Customer persona/segment (derived)
        if age <= 30 and channel in ["TikTok", "Instagram Ads"]:
            persona = "Young Digital Native"
        elif age >= 45 and primary_health_goal in ["Anti-Aging/Longevity", "Cognitive Function"]:
            persona = "Longevity Seeker"
        elif channel == "Referral":
            persona = "Word-of-Mouth Advocate"
        elif primary_health_goal in ["Athletic Performance", "Energy & Vitality"]:
            persona = "Performance Optimizer"
        elif primary_health_goal in ["General Wellness", "Immune Support"]:
            persona = "Wellness Maintainer"
        else:
            persona = "Health Explorer"
        
        # Email hash (privacy simulation)
        email_hash = hashlib.md5(f"customer_{i}@email.com".encode()).hexdigest()[:12]
        
        customers.append({
            "customer_id": customer_id,
            "email_hash": email_hash,
            "acquisition_date": acquisition_date.strftime("%Y-%m-%d"),
            "acquisition_month": acq_month,
            "acquisition_channel": channel,
            "region": region,
            "country": country,
            "state": state,
            "age": age,
            "age_group": age_bucket,
            "gender": gender,
            "primary_health_goal": primary_health_goal,
            "customer_persona": persona,
            "referred_by": f"CUST_{str(np.random.randint(1, max(i, 2))).zfill(5)}" if channel == "Referral" else None,
            "created_at": acquisition_date.strftime("%Y-%m-%d %H:%M:%S")
        })
    
    return pd.DataFrame(customers)

# Generate customers
customers_pdf = generate_customers(NUM_CUSTOMERS)
df_customers = spark.createDataFrame(customers_pdf)
df_customers.createOrReplaceTempView("dim_customers")

print(f"✅ Generated {df_customers.count()} customers")
print("\n📊 Distribution by Region:")
df_customers.groupBy("region").count().orderBy(F.desc("count")).display()

print("\n📊 Distribution by Acquisition Channel:")
df_customers.groupBy("acquisition_channel").count().orderBy(F.desc("count")).display()

print("\n📊 Distribution by Acquisition Month:")
df_customers.groupBy("acquisition_month").count().orderBy("acquisition_month").display()

# COMMAND ----------

# MAGIC %md
# MAGIC ## 4. Subscriptions Generation (fact_subscriptions)
# MAGIC 
# MAGIC **Assumptions:**
# MAGIC - ~70% of customers start with subscription, 30% one-time purchase first
# MAGIC - Product mix: Essentials 50%, Longevity 20%, Beckham Stack 25%, One-time only 5%
# MAGIC - Churn rates vary by tenure: Month 1→2: 18%, Month 2→3: 12%, Month 3→4: 8%, then ~5%/month
# MAGIC - Churned customers have cancellation reasons
# MAGIC - Some customers upgrade (Essentials → Stack)
# MAGIC - Some pause and resume

# COMMAND ----------

def generate_subscriptions(customers_pdf):
    """Generate subscription records with realistic churn patterns"""
    
    subscriptions = []
    
    # Subscription product distribution
    sub_products = {
        "PROD_001": 0.45,  # Essentials
        "PROD_002": 0.20,  # Longevity
        "PROD_003": 0.25,  # Beckham Stack
        "NONE": 0.10       # One-time purchase only (no subscription)
    }
    
    product_prices = {
        "PROD_001": {"low": 79.00, "mid": 84.00, "high": 89.00},
        "PROD_002": {"low": 119.00, "mid": 119.00, "high": 119.00},
        "PROD_003": {"low": 169.33, "mid": 182.67, "high": 196.00}
    }
    
    # Plan types
    plan_types = {"monthly": 0.65, "quarterly": 0.25, "annual": 0.10}
    
    # Churn rate by month (cumulative survival logic)
    # These are monthly churn probabilities
    monthly_churn_rates = {
        1: 0.18,   # Month 1→2: highest churn
        2: 0.12,   # Month 2→3
        3: 0.08,   # Month 3→4
        4: 0.06,   # Stabilizing
        5: 0.05,
        6: 0.04,
        7: 0.04,
        8: 0.03,
        9: 0.03,
        10: 0.03,
        11: 0.02,
        12: 0.02,
        13: 0.02
    }
    
    # Cancellation reasons
    cancel_reasons = {
        "Too expensive": 0.25,
        "Didn't see results": 0.22,
        "Taste/experience issues": 0.12,
        "Switched to competitor": 0.10,
        "Delivery issues": 0.08,
        "Health concerns": 0.05,
        "No longer needed": 0.08,
        "Financial constraints": 0.10
    }
    
    # Pause reasons
    pause_reasons = ["Traveling", "Overstock", "Budget pause", "Trying competitor temporarily"]
    
    for _, customer in customers_pdf.iterrows():
        customer_id = customer["customer_id"]
        acq_date = datetime.strptime(customer["acquisition_date"], "%Y-%m-%d")
        persona = customer["customer_persona"]
        health_goal = customer["primary_health_goal"]
        
        # Determine initial product
        product_id = np.random.choice(list(sub_products.keys()), p=list(sub_products.values()))
        
        if product_id == "NONE":
            # This customer only does one-time purchases, no subscription
            continue
        
        # Plan type
        plan_type = np.random.choice(list(plan_types.keys()), p=list(plan_types.values()))
        
        # Price tier (based on region/plan)
        region = customer["region"]
        if region == "Asia-Pacific":
            price_tier = np.random.choice(["low", "mid", "high"], p=[0.3, 0.5, 0.2])
        else:
            price_tier = np.random.choice(["low", "mid", "high"], p=[0.2, 0.4, 0.4])
        
        mrr = product_prices[product_id][price_tier]
        
        # Apply plan discount
        if plan_type == "quarterly":
            mrr = mrr * 0.95  # 5% discount
        elif plan_type == "annual":
            mrr = mrr * 0.90  # 10% discount
        
        mrr = round(mrr, 2)
        
        # Subscription start date (same as or shortly after acquisition)
        sub_start = acq_date + timedelta(days=np.random.randint(0, 3))
        
        # Determine subscription lifecycle
        subscription_id = f"SUB_{customer_id}_{str(1).zfill(2)}"
        
        # Churn simulation - persona affects churn rate
        churn_modifier = 1.0
        if persona == "Longevity Seeker":
            churn_modifier = 0.7  # Less likely to churn
        elif persona == "Word-of-Mouth Advocate":
            churn_modifier = 0.75
        elif persona == "Young Digital Native":
            churn_modifier = 1.3  # More likely to churn
        elif persona == "Performance Optimizer":
            churn_modifier = 0.85
        
        # Simulate month by month
        current_date = sub_start
        status = "active"
        cancel_date = None
        cancel_reason = None
        pause_date = None
        resume_date = None
        months_active = 0
        upgraded = False
        
        while current_date <= END_DATE:
            months_active += 1
            next_month = current_date + timedelta(days=30)
            
            if next_month > END_DATE:
                break
            
            # Get churn rate for this month
            base_churn = monthly_churn_rates.get(months_active, 0.02)
            actual_churn = min(base_churn * churn_modifier, 0.35)
            
            # Pause simulation (5% chance per month for months 2-6)
            if months_active >= 2 and months_active <= 6 and np.random.random() < 0.04:
                if pause_date is None:  # Only one pause
                    pause_date = (current_date + timedelta(days=np.random.randint(5, 25))).strftime("%Y-%m-%d")
                    # 70% resume after 1-2 months
                    if np.random.random() < 0.70:
                        resume_gap = np.random.randint(20, 60)
                        resume_date = (datetime.strptime(pause_date, "%Y-%m-%d") + timedelta(days=resume_gap)).strftime("%Y-%m-%d")
                    else:
                        # Pause becomes churn
                        status = "cancelled"
                        cancel_date = pause_date
                        cancel_reason = np.random.choice(list(cancel_reasons.keys()), p=list(cancel_reasons.values()))
                        break
            
            # Upgrade simulation (Essentials → Stack, months 3-6, 8% chance)
            if product_id == "PROD_001" and months_active >= 3 and months_active <= 6 and not upgraded:
                if np.random.random() < 0.08:
                    upgraded = True
                    product_id = "PROD_003"
                    mrr = round(product_prices["PROD_003"][price_tier] * (0.95 if plan_type == "quarterly" else 0.90 if plan_type == "annual" else 1.0), 2)
            
            # Churn check
            if np.random.random() < actual_churn:
                status = "cancelled"
                cancel_date = (current_date + timedelta(days=np.random.randint(1, 28))).strftime("%Y-%m-%d")
                cancel_reason = np.random.choice(list(cancel_reasons.keys()), p=list(cancel_reasons.values()))
                break
            
            current_date = next_month
        
        if status != "cancelled":
            status = "active"
        
        subscriptions.append({
            "subscription_id": subscription_id,
            "customer_id": customer_id,
            "product_id": product_id,
            "plan_type": plan_type,
            "status": status,
            "mrr_usd": mrr,
            "subscription_start_date": sub_start.strftime("%Y-%m-%d"),
            "cancel_date": cancel_date,
            "cancel_reason": cancel_reason,
            "pause_date": pause_date,
            "resume_date": resume_date,
            "months_active": months_active,
            "upgraded_from": "PROD_001" if upgraded else None,
            "billing_cycle_day": np.random.randint(1, 28),
            "created_at": sub_start.strftime("%Y-%m-%d %H:%M:%S")
        })
    
    return pd.DataFrame(subscriptions)

subscriptions_pdf = generate_subscriptions(customers_pdf)
df_subscriptions = spark.createDataFrame(subscriptions_pdf)
df_subscriptions.createOrReplaceTempView("fact_subscriptions")

print(f"✅ Generated {df_subscriptions.count()} subscriptions")
print("\n📊 Status Distribution:")
df_subscriptions.groupBy("status").count().display()
print("\n📊 Product Distribution:")
df_subscriptions.groupBy("product_id").count().display()
print("\n📊 Plan Type Distribution:")
df_subscriptions.groupBy("plan_type").count().display()
print("\n📊 Cancel Reasons (for churned):")
df_subscriptions.filter(F.col("status") == "cancelled").groupBy("cancel_reason").count().orderBy(F.desc("count")).display()

# COMMAND ----------

# MAGIC %md
# MAGIC ## 5. Orders Generation (fact_orders)
# MAGIC 
# MAGIC **Assumptions:**
# MAGIC - Subscription customers get recurring orders each billing cycle
# MAGIC - One-time purchasers get 1-3 orders
# MAGIC - Some customers buy accessories alongside subscriptions
# MAGIC - Order values reflect actual product pricing
# MAGIC - Free Red Cup included with first subscription order

# COMMAND ----------

def generate_orders(customers_pdf, subscriptions_pdf):
    """Generate order history based on subscriptions and one-time purchases"""
    
    orders = []
    order_counter = 0
    
    # Get subscription customers
    sub_customer_ids = set(subscriptions_pdf["customer_id"].values)
    
    product_prices_onetime = {
        "PROD_001": (99.00, 112.00),
        "PROD_002": (149.00, 149.00),
        "PROD_003": (235.00, 261.00),
        "ACC_001": (28.00, 28.00),
        "ACC_002": (18.00, 18.00)
    }
    
    for _, customer in customers_pdf.iterrows():
        customer_id = customer["customer_id"]
        acq_date = datetime.strptime(customer["acquisition_date"], "%Y-%m-%d")
        
        if customer_id in sub_customer_ids:
            # Subscription customer - generate recurring orders
            sub_row = subscriptions_pdf[subscriptions_pdf["customer_id"] == customer_id].iloc[0]
            sub_start = datetime.strptime(sub_row["subscription_start_date"], "%Y-%m-%d")
            product_id = sub_row["product_id"]
            mrr = sub_row["mrr_usd"]
            months_active = sub_row["months_active"]
            status = sub_row["status"]
            cancel_date = sub_row["cancel_date"]
            pause_date = sub_row["pause_date"]
            resume_date = sub_row["resume_date"]
            
            # Generate monthly orders
            for month in range(months_active):
                order_date = sub_start + timedelta(days=30 * month + np.random.randint(0, 3))
                
                if order_date > END_DATE:
                    break
                
                # Skip orders during pause
                if pause_date and resume_date:
                    p_date = datetime.strptime(pause_date, "%Y-%m-%d")
                    r_date = datetime.strptime(resume_date, "%Y-%m-%d")
                    if p_date <= order_date <= r_date:
                        continue
                
                order_counter += 1
                order_id = f"ORD_{str(order_counter).zfill(6)}"
                
                # First order might include free Red Cup
                includes_free_cup = (month == 0 and np.random.random() < 0.80)
                
                # Occasional accessory add-on (10% chance)
                accessory_addon = np.random.random() < 0.10
                addon_amount = 0
                addon_product = None
                if accessory_addon:
                    addon_product = np.random.choice(["ACC_001", "ACC_002"])
                    addon_amount = product_prices_onetime[addon_product][0]
                
                order_total = mrr + addon_amount
                
                orders.append({
                    "order_id": order_id,
                    "customer_id": customer_id,
                    "product_id": product_id,
                    "order_date": order_date.strftime("%Y-%m-%d"),
                    "order_month": order_date.strftime("%Y-%m"),
                    "order_type": "subscription",
                    "order_total_usd": round(order_total, 2),
                    "product_amount_usd": mrr,
                    "addon_product_id": addon_product,
                    "addon_amount_usd": addon_amount,
                    "discount_usd": 0.0,
                    "shipping_usd": 0.0 if customer["region"] != "Europe" else round(np.random.choice([0, 5.99, 9.99], p=[0.5, 0.3, 0.2]), 2),
                    "order_status": "delivered",
                    "includes_free_cup": includes_free_cup,
                    "order_sequence": month + 1,
                    "created_at": order_date.strftime("%Y-%m-%d %H:%M:%S")
                })
            
        else:
            # Non-subscription customer: one-time purchases
            num_orders = np.random.choice([1, 2, 3], p=[0.60, 0.30, 0.10])
            
            for j in range(num_orders):
                order_date = acq_date + timedelta(days=30 * j + np.random.randint(0, 14))
                if order_date > END_DATE:
                    break
                
                order_counter += 1
                order_id = f"ORD_{str(order_counter).zfill(6)}"
                
                # One-time purchase product selection
                ot_product = np.random.choice(["PROD_001", "PROD_002", "PROD_003"], p=[0.50, 0.25, 0.25])
                price_range = product_prices_onetime[ot_product]
                price = round(np.random.uniform(price_range[0], price_range[1]), 2)
                
                orders.append({
                    "order_id": order_id,
                    "customer_id": customer_id,
                    "product_id": ot_product,
                    "order_date": order_date.strftime("%Y-%m-%d"),
                    "order_month": order_date.strftime("%Y-%m"),
                    "order_type": "one-time",
                    "order_total_usd": price,
                    "product_amount_usd": price,
                    "addon_product_id": None,
                    "addon_amount_usd": 0.0,
                    "discount_usd": round(price * 0.10, 2) if np.random.random() < 0.15 else 0.0,
                    "shipping_usd": round(np.random.choice([0, 5.99, 9.99], p=[0.4, 0.35, 0.25]), 2),
                    "order_status": "delivered",
                    "includes_free_cup": False,
                    "order_sequence": j + 1,
                    "created_at": order_date.strftime("%Y-%m-%d %H:%M:%S")
                })
    
    return pd.DataFrame(orders)

orders_pdf = generate_orders(customers_pdf, subscriptions_pdf)
df_orders = spark.createDataFrame(orders_pdf)
df_orders.createOrReplaceTempView("fact_orders")

print(f"✅ Generated {df_orders.count()} orders")
print("\n📊 Order Type Distribution:")
df_orders.groupBy("order_type").count().display()
print("\n📊 Orders by Month:")
df_orders.groupBy("order_month").count().orderBy("order_month").display()

# COMMAND ----------

# MAGIC %md
# MAGIC ## 6. Engagement Data (fact_engagement)
# MAGIC 
# MAGIC **Assumptions:**
# MAGIC - IM8 has a companion app for tracking health progress
# MAGIC - Engagement signals: app opens, surveys completed, referrals made
# MAGIC - Higher engagement correlates with lower churn (key insight)
# MAGIC - Engagement tends to decrease over time for churners
# MAGIC - Active users: 5-25 app opens/month; churners trail off

# COMMAND ----------

def generate_engagement(customers_pdf, subscriptions_pdf):
    """Generate monthly engagement records per customer"""
    
    engagement_records = []
    sub_lookup = subscriptions_pdf.set_index("customer_id").to_dict("index") if len(subscriptions_pdf) > 0 else {}
    
    for _, customer in customers_pdf.iterrows():
        customer_id = customer["customer_id"]
        acq_date = datetime.strptime(customer["acquisition_date"], "%Y-%m-%d")
        persona = customer["customer_persona"]
        
        # Check if subscriber
        is_subscriber = customer_id in sub_lookup
        sub_status = sub_lookup[customer_id]["status"] if is_subscriber else "none"
        months_active_sub = sub_lookup[customer_id]["months_active"] if is_subscriber else 0
        cancel_date_str = sub_lookup[customer_id].get("cancel_date") if is_subscriber else None
        
        # Base engagement level by persona
        engagement_base = {
            "Longevity Seeker": 0.85,
            "Word-of-Mouth Advocate": 0.80,
            "Performance Optimizer": 0.75,
            "Wellness Maintainer": 0.65,
            "Health Explorer": 0.55,
            "Young Digital Native": 0.60
        }.get(persona, 0.60)
        
        # Generate monthly engagement from acquisition to end date
        current_month_start = datetime(acq_date.year, acq_date.month, 1)
        month_counter = 0
        
        while current_month_start <= END_DATE:
            month_counter += 1
            month_str = current_month_start.strftime("%Y-%m")
            
            # Engagement decay for churners
            if sub_status == "cancelled" and cancel_date_str:
                cancel_date = datetime.strptime(cancel_date_str, "%Y-%m-%d")
                cancel_month = datetime(cancel_date.year, cancel_date.month, 1)
                if current_month_start > cancel_month:
                    # Post-churn: dramatic engagement drop
                    engagement_multiplier = max(0.05, 0.3 * (0.5 ** (month_counter - months_active_sub)))
                elif current_month_start == cancel_month:
                    engagement_multiplier = 0.4
                else:
                    # Pre-churn: gradual decline in last 2 months
                    months_to_cancel = (cancel_month - current_month_start).days // 30
                    if months_to_cancel <= 2:
                        engagement_multiplier = engagement_base * (0.6 + 0.2 * months_to_cancel)
                    else:
                        engagement_multiplier = engagement_base
            elif not is_subscriber:
                # Non-subscribers have lower engagement, trail off fast
                engagement_multiplier = max(0.1, engagement_base * 0.5 * (0.7 ** month_counter))
            else:
                # Active subscriber: slight natural decay, loyal customers stabilize
                engagement_multiplier = engagement_base * max(0.6, 1 - 0.03 * month_counter)
            
            # App opens per month
            base_opens = int(np.random.normal(15, 5) * engagement_multiplier)
            app_opens = max(0, min(45, base_opens))
            
            # Surveys completed (health check-ins, 0-4 per month)
            survey_base = 2.5 * engagement_multiplier
            surveys_completed = max(0, min(4, int(np.random.poisson(survey_base))))
            
            # Referrals (rare event, higher for advocates)
            referral_base = 0.15 if persona == "Word-of-Mouth Advocate" else 0.05
            referrals_made = 1 if np.random.random() < referral_base * engagement_multiplier else 0
            
            # Content interactions (blogs, recipes, etc.)
            content_views = max(0, int(np.random.normal(8, 3) * engagement_multiplier))
            
            # Support tickets (inverse correlation with satisfaction)
            support_tickets = 1 if np.random.random() < (0.15 - 0.1 * engagement_multiplier) else 0
            
            # NPS score (collected occasionally, ~20% of months)
            nps_score = None
            if np.random.random() < 0.20:
                if engagement_multiplier > 0.7:
                    nps_score = np.random.choice(range(8, 11), p=[0.2, 0.35, 0.45])
                elif engagement_multiplier > 0.4:
                    nps_score = np.random.choice(range(5, 10), p=[0.1, 0.15, 0.25, 0.30, 0.20])
                else:
                    nps_score = np.random.choice(range(1, 8), p=[0.10, 0.10, 0.15, 0.20, 0.20, 0.15, 0.10])
            
            engagement_records.append({
                "customer_id": customer_id,
                "month": month_str,
                "app_opens": app_opens,
                "surveys_completed": surveys_completed,
                "referrals_made": referrals_made,
                "content_views": content_views,
                "support_tickets": support_tickets,
                "nps_score": nps_score,
                "engagement_score": round(engagement_multiplier * 100, 1),  # 0-100 scale
                "created_at": current_month_start.strftime("%Y-%m-%d %H:%M:%S")
            })
            
            # Move to next month
            if current_month_start.month == 12:
                current_month_start = datetime(current_month_start.year + 1, 1, 1)
            else:
                current_month_start = datetime(current_month_start.year, current_month_start.month + 1, 1)
    
    return pd.DataFrame(engagement_records)

engagement_pdf = generate_engagement(customers_pdf, subscriptions_pdf)
df_engagement = spark.createDataFrame(engagement_pdf)
df_engagement.createOrReplaceTempView("fact_engagement")

print(f"✅ Generated {df_engagement.count()} engagement records")
print("\n📊 Average Engagement by Month (sample):")
df_engagement.groupBy("month").agg(
    F.round(F.avg("app_opens"), 1).alias("avg_app_opens"),
    F.round(F.avg("engagement_score"), 1).alias("avg_engagement_score"),
    F.sum("referrals_made").alias("total_referrals")
).orderBy("month").display()

# COMMAND ----------

# MAGIC %md
# MAGIC ## 7. Health Outcomes Data (fact_health_outcomes)
# MAGIC 
# MAGIC **Assumptions:**
# MAGIC - Customers self-report energy, sleep quality, and digestion on 1-10 scale
# MAGIC - Reports collected via app surveys (monthly)
# MAGIC - Customers who experience improvement retain longer (key hypothesis to validate)
# MAGIC - Realistic improvement trajectory: gradual, not instant
# MAGIC - Some customers show no improvement (potential churners)
# MAGIC - Baseline varies by individual

# COMMAND ----------

def generate_health_outcomes(customers_pdf, subscriptions_pdf):
    """Generate self-reported health outcomes over time"""
    
    outcomes = []
    sub_lookup = subscriptions_pdf.set_index("customer_id").to_dict("index") if len(subscriptions_pdf) > 0 else {}
    
    for _, customer in customers_pdf.iterrows():
        customer_id = customer["customer_id"]
        acq_date = datetime.strptime(customer["acquisition_date"], "%Y-%m-%d")
        health_goal = customer["primary_health_goal"]
        persona = customer["customer_persona"]
        
        is_subscriber = customer_id in sub_lookup
        sub_status = sub_lookup[customer_id]["status"] if is_subscriber else "none"
        cancel_date_str = sub_lookup[customer_id].get("cancel_date") if is_subscriber else None
        
        # Determine if customer is a "responder" (sees improvement)
        # Active subscribers more likely to be responders (survivorship)
        if sub_status == "active":
            is_responder = np.random.random() < 0.75  # 75% of active subs improve
        elif sub_status == "cancelled":
            is_responder = np.random.random() < 0.35  # Only 35% of churners saw improvement
        else:
            is_responder = np.random.random() < 0.30
        
        # Baseline scores (before product effect)
        baseline_energy = np.random.randint(3, 7)
        baseline_sleep = np.random.randint(3, 7)
        baseline_digestion = np.random.randint(3, 7)
        
        # Improvement trajectory
        if is_responder:
            # Gradual improvement: biggest gains in months 2-4, plateau by month 6
            max_improvement = np.random.uniform(1.5, 3.5)
        else:
            # No improvement or slight decline
            max_improvement = np.random.uniform(-0.5, 0.5)
        
        # Goal-specific boost
        goal_boost = {
            "Energy & Vitality": ("energy", 0.5),
            "Better Sleep": ("sleep", 0.5),
            "Gut Health & Digestion": ("digestion", 0.5),
            "General Wellness": (None, 0.3),
            "Anti-Aging/Longevity": ("energy", 0.3),
            "Athletic Performance": ("energy", 0.4),
            "Immune Support": (None, 0.2),
            "Weight Management": ("digestion", 0.3),
            "Cognitive Function": ("energy", 0.3),
            "Skin Health": (None, 0.2)
        }
        
        boosted_metric, boost_amount = goal_boost.get(health_goal, (None, 0.2))
        
        # Generate monthly outcomes
        current_month_start = datetime(acq_date.year, acq_date.month, 1)
        month_counter = 0
        
        # Only ~70% of customers fill out health surveys
        if np.random.random() > 0.70:
            continue
        
        while current_month_start <= END_DATE:
            month_counter += 1
            month_str = current_month_start.strftime("%Y-%m")
            
            # Skip some months (not everyone fills surveys every month)
            if np.random.random() < 0.25:  # 25% skip rate
                if current_month_start.month == 12:
                    current_month_start = datetime(current_month_start.year + 1, 1, 1)
                else:
                    current_month_start = datetime(current_month_start.year, current_month_start.month + 1, 1)
                continue
            
            # Calculate improvement curve (logarithmic)
            if is_responder:
                improvement = max_improvement * min(1.0, np.log1p(month_counter) / np.log1p(6))
            else:
                improvement = max_improvement * month_counter / 12  # Very slight change
            
            # Post-churn: scores may decline
            if sub_status == "cancelled" and cancel_date_str:
                cancel_date = datetime.strptime(cancel_date_str, "%Y-%m-%d")
                if current_month_start > datetime(cancel_date.year, cancel_date.month, 1):
                    months_post_churn = month_counter - int((cancel_date - acq_date).days / 30)
                    improvement = max(-1, improvement - 0.3 * months_post_churn)
            
            # Calculate scores
            energy = min(10, max(1, round(baseline_energy + improvement + (boost_amount if boosted_metric == "energy" else 0) + np.random.normal(0, 0.5))))
            sleep = min(10, max(1, round(baseline_sleep + improvement + (boost_amount if boosted_metric == "sleep" else 0) + np.random.normal(0, 0.5))))
            digestion = min(10, max(1, round(baseline_digestion + improvement + (boost_amount if boosted_metric == "digestion" else 0) + np.random.normal(0, 0.5))))
            
            # Overall wellness score
            overall_wellness = round((energy + sleep + digestion) / 3, 1)
            
            # Improvement flag (compared to baseline)
            avg_current = (energy + sleep + digestion) / 3
            avg_baseline = (baseline_energy + baseline_sleep + baseline_digestion) / 3
            improvement_flag = "Improved" if avg_current > avg_baseline + 0.5 else "Stable" if avg_current >= avg_baseline - 0.5 else "Declined"
            
            outcomes.append({
                "customer_id": customer_id,
                "month": month_str,
                "energy_score": energy,
                "sleep_score": sleep,
                "digestion_score": digestion,
                "overall_wellness_score": overall_wellness,
                "improvement_vs_baseline": improvement_flag,
                "baseline_energy": baseline_energy,
                "baseline_sleep": baseline_sleep,
                "baseline_digestion": baseline_digestion,
                "survey_date": (current_month_start + timedelta(days=np.random.randint(1, 28))).strftime("%Y-%m-%d"),
                "created_at": current_month_start.strftime("%Y-%m-%d %H:%M:%S")
            })
            
            # Move to next month
            if current_month_start.month == 12:
                current_month_start = datetime(current_month_start.year + 1, 1, 1)
            else:
                current_month_start = datetime(current_month_start.year, current_month_start.month + 1, 1)
    
    return pd.DataFrame(outcomes)

health_pdf = generate_health_outcomes(customers_pdf, subscriptions_pdf)
df_health = spark.createDataFrame(health_pdf)
df_health.createOrReplaceTempView("fact_health_outcomes")

print(f"✅ Generated {df_health.count()} health outcome records")
print("\n📊 Improvement Distribution:")
df_health.groupBy("improvement_vs_baseline").count().display()
print("\n📊 Average Scores by Month:")
df_health.groupBy("month").agg(
    F.round(F.avg("energy_score"), 1).alias("avg_energy"),
    F.round(F.avg("sleep_score"), 1).alias("avg_sleep"),
    F.round(F.avg("digestion_score"), 1).alias("avg_digestion"),
    F.round(F.avg("overall_wellness_score"), 1).alias("avg_overall")
).orderBy("month").display()

# COMMAND ----------

# MAGIC %md
# MAGIC ## 8. Save All Tables to Delta Lake

# COMMAND ----------

# Save to Delta tables
database_name = "im8_health"
spark.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")

df_products.write.mode("overwrite").saveAsTable(f"{database_name}.dim_products")
df_customers.write.mode("overwrite").saveAsTable(f"{database_name}.dim_customers")
df_subscriptions.write.mode("overwrite").saveAsTable(f"{database_name}.fact_subscriptions")
df_orders.write.mode("overwrite").saveAsTable(f"{database_name}.fact_orders")
df_engagement.write.mode("overwrite").saveAsTable(f"{database_name}.fact_engagement")
df_health.write.mode("overwrite").saveAsTable(f"{database_name}.fact_health_outcomes")

print("✅ All tables saved to Delta Lake")
print(f"\n📦 Database: {database_name}")
for table in ["dim_products", "dim_customers", "fact_subscriptions", "fact_orders", "fact_engagement", "fact_health_outcomes"]:
    count = spark.table(f"{database_name}.{table}").count()
    print(f"  📋 {table}: {count:,} records")

# COMMAND ----------

# MAGIC %md
# MAGIC ## 9. Data Quality Validation

# COMMAND ----------

# Validate data quality and relationships
print("=" * 60)
print("DATA QUALITY VALIDATION REPORT")
print("=" * 60)

# 1. Customer count
total_customers = spark.table(f"{database_name}.dim_customers").count()
print(f"\n✅ Total Customers: {total_customers:,}")

# 2. Subscription coverage
sub_customers = spark.table(f"{database_name}.fact_subscriptions").select("customer_id").distinct().count()
print(f"✅ Customers with Subscriptions: {sub_customers:,} ({sub_customers/total_customers*100:.1f}%)")

# 3. Churn rate
total_subs = spark.table(f"{database_name}.fact_subscriptions").count()
churned = spark.table(f"{database_name}.fact_subscriptions").filter(F.col("status") == "cancelled").count()
print(f"✅ Overall Churn Rate: {churned/total_subs*100:.1f}%")

# 4. Revenue check
total_revenue = spark.table(f"{database_name}.fact_orders").agg(F.sum("order_total_usd")).collect()[0][0]
print(f"✅ Total Revenue: ${total_revenue:,.2f}")

# 5. Current MRR
current_mrr = spark.table(f"{database_name}.fact_subscriptions").filter(F.col("status") == "active").agg(F.sum("mrr_usd")).collect()[0][0]
print(f"✅ Current MRR: ${current_mrr:,.2f}")

# 6. Avg engagement
avg_engagement = spark.table(f"{database_name}.fact_engagement").agg(F.avg("engagement_score")).collect()[0][0]
print(f"✅ Avg Engagement Score: {avg_engagement:.1f}/100")

# 7. Health outcomes coverage
health_customers = spark.table(f"{database_name}.fact_health_outcomes").select("customer_id").distinct().count()
print(f"✅ Customers with Health Data: {health_customers:,} ({health_customers/total_customers*100:.1f}%)")

# 8. Date range validation
min_date = spark.table(f"{database_name}.fact_orders").agg(F.min("order_date")).collect()[0][0]
max_date = spark.table(f"{database_name}.fact_orders").agg(F.max("order_date")).collect()[0][0]
print(f"✅ Order Date Range: {min_date} to {max_date}")

print("\n" + "=" * 60)
print("ALL VALIDATIONS PASSED ✅")
print("=" * 60)

✅ Setup complete
📅 Date range: 2024-12-01 to 2026-01-31
👥 Target customers: 2800


product_id,product_name,product_category,subscription_price_usd,onetime_price_usd,subscription_price_low,subscription_price_high,onetime_price_low,onetime_price_high,product_description,servings_per_unit,cost_per_serving_sub,launch_date,is_active
PROD_001,IM8 Daily Ultimate Essentials,Core Nutrition,84.0,105.5,79.0,89.0,99.0,112.0,All-in-one nutrient drink with 92 ingredients replacing 16 supplements,30,2.8,2024-11-01,True
PROD_002,IM8 Daily Ultimate Longevity,Longevity,119.0,149.0,119.0,119.0,149.0,149.0,Supports 12 hallmarks of aging with NMN and Resveratrol,30,3.97,2024-11-01,True
PROD_003,The Beckham Stack (Essentials + Longevity),Bundle,182.67,248.0,169.33,196.0,235.0,261.0,Complete daily nutrition bundle for 90-day transformation,30,6.09,2024-11-01,True
ACC_001,IM8 Limited Edition Vegan Leather Travel Pouch,Accessory,0.0,28.0,0.0,0.0,28.0,28.0,Premium vegan leather travel pouch,0,0.0,2024-11-01,True
ACC_002,Signature Red Cup (Mixer),Accessory,0.0,18.0,0.0,0.0,18.0,18.0,Signature mixing cup – often free with first subscription month,0,0.0,2024-11-01,True


✅ Product catalog: 5 products loaded
✅ Generated 2800 customers

📊 Distribution by Region:


region,count
Asia-Pacific,1545
North America,968
Europe,287



📊 Distribution by Acquisition Channel:


acquisition_channel,count
Instagram Ads,573
Google Search,448
Influencer/KOL,362
TikTok,357
Referral,275
Organic/Direct,233
YouTube,223
Podcast,166
Facebook Ads,163



📊 Distribution by Acquisition Month:


acquisition_month,count
2024-12,74
2025-01,116
2025-02,115
2025-03,184
2025-04,191
2025-05,211
2025-06,238
2025-07,250
2025-08,253
2025-09,288


✅ Generated 2505 subscriptions

📊 Status Distribution:


status,count
cancelled,959
active,1546



📊 Product Distribution:


product_id,count
PROD_001,1114
PROD_003,824
PROD_002,567



📊 Plan Type Distribution:


plan_type,count
monthly,1636
annual,250
quarterly,619



📊 Cancel Reasons (for churned):


cancel_reason,count
Too expensive,257
Didn't see results,200
Switched to competitor,119
Taste/experience issues,102
Financial constraints,82
No longer needed,79
Delivery issues,77
Health concerns,43


✅ Generated 12223 orders

📊 Order Type Distribution:


order_type,count
one-time,431
subscription,11792



📊 Orders by Month:


order_month,count
2024-12,73
2025-01,176
2025-02,250
2025-03,387
2025-04,510
2025-05,654
2025-06,788
2025-07,943
2025-08,1076
2025-09,1216


✅ Generated 18890 engagement records

📊 Average Engagement by Month (sample):


month,avg_app_opens,avg_engagement_score,total_referrals
2024-12,7.2,54.7,0
2025-01,7.7,52.4,5
2025-02,6.6,47.7,7
2025-03,6.6,48.7,12
2025-04,6.4,46.8,21
2025-05,6.4,45.3,27
2025-06,6.3,44.2,25
2025-07,6.0,42.7,38
2025-08,5.6,40.8,46
2025-09,5.4,39.5,43


✅ Generated 9891 health outcome records

📊 Improvement Distribution:


improvement_vs_baseline,count
Stable,3145
Declined,1579
Improved,5167



📊 Average Scores by Month:


month,avg_energy,avg_sleep,avg_digestion,avg_overall
2024-12,4.8,5.4,5.2,5.1
2025-01,5.3,5.5,5.2,5.3
2025-02,5.0,5.1,5.1,5.1
2025-03,5.2,5.2,5.2,5.2
2025-04,5.2,5.3,5.2,5.2
2025-05,5.2,5.2,5.3,5.2
2025-06,5.3,5.2,5.3,5.3
2025-07,5.3,5.3,5.2,5.3
2025-08,5.3,5.3,5.3,5.3
2025-09,5.3,5.3,5.2,5.3


✅ All tables saved to Delta Lake

📦 Database: im8_health
  📋 dim_products: 5 records
  📋 dim_customers: 2,800 records
  📋 fact_subscriptions: 2,505 records
  📋 fact_orders: 12,223 records
  📋 fact_engagement: 18,890 records
  📋 fact_health_outcomes: 9,891 records
DATA QUALITY VALIDATION REPORT

✅ Total Customers: 2,800
✅ Customers with Subscriptions: 2,505 (89.5%)
✅ Overall Churn Rate: 38.3%
✅ Total Revenue: $1,584,221.10
✅ Current MRR: $192,620.23
✅ Avg Engagement Score: 39.4/100
✅ Customers with Health Data: 1,928 (68.9%)
✅ Order Date Range: 2024-12-02 to 2026-01-31

ALL VALIDATIONS PASSED ✅
