# üé≠ AI-Powered Themed Star Schema Generator

Generate synthetic analytical datasets with Azure OpenAI and Spark.

**Features:**
- ü§ñ GPT-4 generates creative themed content
- ‚ö° Spark builds fact tables efficiently
- üîê Secure Key Vault credential management
- üì¶ Fallback to preloaded themes

---

## üéØ Step 1: Configuration

**Set your demo parameters here!**

In [None]:
# ============================================================================
# CONFIGURATION
# ============================================================================

# Lakehouse Configuration
lakehouse_name = ""  # Leave blank for default

# Business Type: "Retail", "Restaurant", "Healthcare"
business_type = "Restaurant"

# Theme: "Gourmet Fantasy Food", "Space Colony", etc.
theme = "Roadkill"

# Generation Mode: "AI" (use Azure OpenAI) or "Preloaded" (no API needed)
generation_mode = "AI"

# Credential Source: "direct" 
credential_source = "direct"

# Direct credentials - if you  use a direct key like this - regenerate it right away after a demo!
direct_endpoint = "Your open ai endpoint"
direct_key = "Your openai key"  
direct_deployment = "gpt4o-demo"

# Data Settings
random_seed = 42
record_scale = "medium"  # "small" (10K), "medium" (25K), "large" (500K)

print(f"üé¨ Generating {record_scale.upper()} {theme} {business_type} Dataset")
print(f"üóÑÔ∏è  Lakehouse: {lakehouse_name if lakehouse_name else 'Default'}")
print(f"ü§ñ Mode: {generation_mode}")
if generation_mode == "AI":
    print(f"üîë Credentials: Direct (from configuration)")
print(f"üé≤ Seed: {random_seed}")
print("\n" + "="*70)

StatementMeta(, e3fa5b02-20b6-4b8f-a663-589c44bbfbe0, 3, Finished, Available, Finished)

üé¨ Generating MEDIUM Roadkill Restaurant Dataset
üóÑÔ∏è  Lakehouse: Default
ü§ñ Mode: AI
üîë Credentials: Direct (from configuration)
üé≤ Seed: 42



## üìö Step 2: Import Libraries & Setup Spark

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import random
import json
import os

# Set random seeds
random.seed(random_seed)
np.random.seed(random_seed)

# Initialize Spark
spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", "8")

# Set lakehouse context
if lakehouse_name:
    print(f"üóÑÔ∏è  Using lakehouse: {lakehouse_name}")
    spark.sql(f"USE {lakehouse_name}")
else:
    print("üóÑÔ∏è  Using default lakehouse")

print(f"‚úÖ Spark initialized (v{spark.version})")

StatementMeta(, e3fa5b02-20b6-4b8f-a663-589c44bbfbe0, 4, Finished, Available, Finished)

üóÑÔ∏è  Using default lakehouse
‚úÖ Spark initialized (v3.5.5.5.4.20251103.2)


## üîê Step 3: Load Credentials from Key Vault

**This securely loads your Azure OpenAI credentials.**

In [4]:
azure_openai_endpoint = None
azure_openai_key = None
azure_openai_deployment = None

if generation_mode == "AI":
    if credential_source == "direct":
        print("üîë Loading direct credentials\n")
        
        azure_openai_endpoint = direct_endpoint
        azure_openai_key = direct_key
        azure_openai_deployment = direct_deployment
        
        print("‚úÖ Credentials loaded successfully!")
        print(f"   Endpoint: {azure_openai_endpoint}")
        print(f"   Deployment: {azure_openai_deployment}")
        print(f"   Key: {'*' * 20} (hidden)")
        
        # Debug info
        print(f"\nüîç Debug Info:")
        print(f"   Key length: {len(azure_openai_key)}")
        print(f"   Key starts with: {azure_openai_key[:5]}...")
        
    else:
        print("‚ùå Key Vault not configured for this environment")
        print("üì¶ Falling back to Preloaded mode...")
        generation_mode = "Preloaded"
else:
    print("üì¶ Using Preloaded mode - no credentials needed")

print("\n" + "="*70)

StatementMeta(, e3fa5b02-20b6-4b8f-a663-589c44bbfbe0, 6, Finished, Available, Finished)

üîë Loading direct credentials

‚úÖ Credentials loaded successfully!
   Endpoint: https://pmcai-openai.openai.azure.com/
   Deployment: gpt4o-demo
   Key: ******************** (hidden)

üîç Debug Info:
   Key length: 84
   Key starts with: 2ilMR...



## ü§ñ Step 4: AI Theme Generation

**Azure OpenAI generates creative themed content, or we use preloaded themes.**

In [5]:
def generate_themed_content_with_azure_openai(business_type, theme):
    """Use Azure OpenAI to generate creative themed content."""
    print(f"ü§ñ Asking Azure OpenAI to generate {theme} content...\n")
    
    if not all([azure_openai_endpoint, azure_openai_key, azure_openai_deployment]):
        print("‚ö†Ô∏è  Credentials not available")
        return None
    
    # Determine terminology
    if business_type == "Restaurant":
        product_term = "menu items"
        service_term = "dining services"
    elif business_type == "Healthcare":
        product_term = "medical procedures and treatments"
        service_term = "medical services"
    else:
        product_term = "products"
        service_term = "customer services"
    
    prompt = f"""Generate creative, themed content for a {business_type} business with a {theme} theme.

Create these lists with UNIQUE, CREATIVE names:
1. product_names: 20 {product_term}
2. categories: 8 categories
3. brands: 8 brand names
4. locations: 10 location names
5. services: 6 {service_term}
6. adjectives: 8 descriptive adjectives
7. first_names: 12 character first names
8. last_names: 10 character last names

Respond ONLY with valid JSON (no markdown, no explanation):
{{
  "product_names": ["name1", "name2", ...],
  "categories": [...],
  "brands": [...],
  "locations": [...],
  "services": [...],
  "adjectives": [...],
  "first_names": [...],
  "last_names": [...]
}}"""
    
    try:
        import requests
        
        # Try multiple API versions for compatibility
        api_versions = ["2024-08-01-preview", "2024-06-01", "2024-02-15-preview"]
        
        for api_version in api_versions:
            url = f"{azure_openai_endpoint}openai/deployments/{azure_openai_deployment}/chat/completions?api-version={api_version}"
            
            headers = {
                "Content-Type": "application/json",
                "api-key": azure_openai_key
            }
            
            data = {
                "messages": [
                    {"role": "system", "content": "You generate creative themed content for data demos. Always respond with valid JSON only."},
                    {"role": "user", "content": prompt}
                ],
                "temperature": 0.8,
                "max_tokens": 2000
            }
            
            print(f"   Trying API version: {api_version}...")
            response = requests.post(url, headers=headers, json=data, timeout=30)
            
            if response.status_code == 200:
                content = response.json()['choices'][0]['message']['content']
                content = content.replace('```json', '').replace('```', '').strip()
                themed_content = json.loads(content)
                
                print("\n‚úÖ Azure OpenAI generated creative content!")
                print(f"   Sample products: {themed_content['product_names'][:3]}")
                print(f"   Sample brands: {themed_content['brands'][:2]}")
                
                return themed_content
            elif response.status_code == 401:
                print(f"   ‚ùå 401 Authentication failed")
                print(f"   Message: {response.text[:200]}")
                break  # Don't try other API versions for auth errors
            else:
                print(f"   ‚ö†Ô∏è  {response.status_code} - trying next version...")
        
        print(f"\n‚ùå All API versions failed")
        return None
        
    except Exception as e:
        print(f"\n‚ùå Error: {str(e)}")
        return None


def get_preloaded_content(theme):
    """Fallback: Use pre-generated content."""
    preloaded_data = {
        "Gourmet Fantasy Food": {
            "product_names": ["Dragon's Breath Soup", "Moonlit Truffle Risotto", "Phoenix Flame Steak", 
                             "Elvish Honey Cake", "Wizard's Whiskey Glaze", "Unicorn Tears Sorbet",
                             "Mermaid's Pearl Pasta", "Griffin Wing Roast", "Enchanted Forest Salad",
                             "Fairy Dust Souffl√©", "Troll's Mushroom Medley", "Celestial Berry Tart",
                             "Kraken Ink Risotto", "Starlight Nectar", "Moon-Baked Tart",
                             "Goblin's Gold Curry", "Siren Song Seafood", "Pegasus Pear Tart", "Basilisk Bite", "Chimera Chowder"],
            "categories": ["Mystical Appetizers", "Legendary Entrees", "Enchanted Desserts", 
                          "Magical Beverages", "Fantasy Sides", "Divine Soups", "Ethereal Salads", "Mythical Mains"],
            "brands": ["Ivory Tower Cuisine", "Mystic Pantry", "Enchanted Eats", "Arcane Flavors", 
                      "Celestial Kitchen", "Wizard's Table", "Dragon's Feast", "Elven Delights"],
            "locations": ["Castle Keep", "Mystic Grove", "Enchanted Garden", "Crystal Palace", 
                         "Dragon's Lair", "Fairy Circle", "Wizard's Tower", "Moonlight Pavilion", "Starfall Inn", "Phoenix Nest"],
            "services": ["Potion Pairing", "Spell-Infused Cooking", "Mystical Wine Selection", "Enchantment Experience", "Magic Tasting Menu", "Alchemical Desserts"],
            "adjectives": ["Enchanted", "Mystical", "Legendary", "Ethereal", "Bewitched", "Arcane", "Celestial", "Divine"],
            "first_names": ["Merlin", "Galadriel", "Aragorn", "Luna", "Oberon", "Titania", "Elric", "Morgana", "Theron", "Selene", "Orion", "Aurora"],
            "last_names": ["Starweaver", "Moonwhisper", "Dragonheart", "Spellbinder", "Frostborne", "Shadowmere", "Nightshade", "Stormcaller", "Brightflame", "Silverwind"]
        },
        "Space Colony": {
            "product_names": ["Nebula Nutrient Pack", "Asteroid Mining Gear", "Gravity Stabilizer", "Oxygen Recycler Pro",
                             "Plasma Shield Generator", "Hyperdrive Fuel Cell", "Zero-G Coffee Maker", "Mars Habitat Module",
                             "Stellar Navigation Kit", "Cosmic Radiation Suit", "Ion Propulsion Unit", "Terraform Toolkit",
                             "Quantum Communicator", "Solar Panel Array", "Cryosleep Pod", "Antimatter Reactor",
                             "Meteor Defense System", "Lunar Rover Kit", "Warp Core", "Space Station Hub"],
            "categories": ["Life Support", "Mining Equipment", "Habitation", "Transportation", "Communication", "Power Systems", "Safety Gear", "Colony Infrastructure"],
            "brands": ["StellarTech", "GalaxyCorp", "NebulaWorks", "CosmicSolutions", "OrbitTech", "VoidIndustries", "AstroSystems", "Quantum Dynamics"],
            "locations": ["Mars Station Alpha", "Lunar Base Prime", "Asteroid Belt Outpost", "Jupiter Transit Hub",
                         "Saturn Ring Station", "Titan Colony", "Orbital Platform 7", "Europa Research Base", "Io Mining Station", "Ganymede Port"],
            "services": ["Gravity Adjustment", "Atmosphere Calibration", "Radiation Shielding", "Hypersleep Monitoring", "Terraforming Consultation", "Space Walk Training"],
            "adjectives": ["Advanced", "Cosmic", "Interstellar", "Zero-Gravity", "Quantum", "Galactic", "Stellar", "Orbital"],
            "first_names": ["Nova", "Orion", "Stella", "Cosmo", "Astrid", "Apollo", "Luna", "Atlas", "Vega", "Sirius", "Andromeda", "Phoenix"],
            "last_names": ["Stardust", "Nebula", "Cosmos", "Skywalker", "Astral", "Galaxy", "Void", "Quasar", "Comet", "Pulsar"]
        }
    }
    
    return preloaded_data.get(theme, preloaded_data["Gourmet Fantasy Food"])


# Generate themed content
print("\n" + "="*70)
if generation_mode == "AI":
    theme_data = generate_themed_content_with_azure_openai(business_type, theme)
    if theme_data is None:
        print("\nüì¶ Falling back to preloaded content...")
        theme_data = get_preloaded_content(theme)
else:
    print("üì¶ Using preloaded themed content")
    theme_data = get_preloaded_content(theme)

theme_first_names = theme_data['first_names']
theme_last_names = theme_data['last_names']

print(f"\n‚úÖ Themed content ready!")
print(f"   Products: {len(theme_data['product_names'])}")
print(f"   Categories: {len(theme_data['categories'])}")
print(f"   Locations: {len(theme_data['locations'])}")
print(f"   Brands: {len(theme_data['brands'])}")

StatementMeta(, e3fa5b02-20b6-4b8f-a663-589c44bbfbe0, 7, Finished, Available, Finished)


ü§ñ Asking Azure OpenAI to generate Roadkill content...

   Trying API version: 2024-08-01-preview...

‚úÖ Azure OpenAI generated creative content!
   Sample products: ['Armadillo Appetizer', 'Possum Pie', 'Raccoon Ribs']
   Sample brands: ['Treaded Treats', 'Pavement Platters']

‚úÖ Themed content ready!
   Products: 20
   Categories: 8
   Locations: 10
   Brands: 8


## üìÖ Step 5: Generate Date Dimension

In [6]:
print("\n" + "="*70)
print("üìÖ Generating Date Dimension...\n")

end_date = datetime.now().date()
start_date = end_date - timedelta(days=364)
date_range = pd.date_range(start=start_date, end=end_date, freq='D')

dim_date_pd = pd.DataFrame({
    'date_key': range(1, len(date_range) + 1),
    'date': date_range,
    'year': date_range.year.astype('int32'),
    'quarter': date_range.quarter.astype('int32'),
    'month': date_range.month.astype('int32'),
    'month_name': date_range.strftime('%B'),
    'day': date_range.day.astype('int32'),
    'day_of_week': (date_range.dayofweek + 1).astype('int32'),
    'day_name': date_range.strftime('%A'),
    'week_of_year': date_range.isocalendar().week.astype('int32'),
    'is_weekend': (date_range.dayofweek >= 5).astype('int32'),
    'is_month_start': date_range.is_month_start.astype('int32'),
    'is_month_end': date_range.is_month_end.astype('int32'),
    'is_quarter_start': date_range.is_quarter_start.astype('int32'),
    'is_quarter_end': date_range.is_quarter_end.astype('int32')
})

dim_date = spark.createDataFrame(dim_date_pd)

print(f"‚úÖ Date Dimension: {dim_date.count()} days")
print(f"   Range: {start_date} to {end_date}")

StatementMeta(, e3fa5b02-20b6-4b8f-a663-589c44bbfbe0, 8, Finished, Available, Finished)


üìÖ Generating Date Dimension...

‚úÖ Date Dimension: 365 days
   Range: 2024-11-16 to 2025-11-15


## üè¢ Step 6: Generate Dimension Tables

In [7]:
print("\n" + "="*70)
print("üè¢ Generating Dimension Tables...\n")

def generate_product_dimension(n_records=200):
    print(f"üè∑Ô∏è  Products ({n_records} records)...")
    product_pool = theme_data['product_names'] * (n_records // len(theme_data['product_names']) + 2)
    np.random.shuffle(product_pool)
    
    dim_product_pd = pd.DataFrame({
        'product_key': range(1, n_records + 1),
        'product_name': [f"{np.random.choice(theme_data['adjectives'])} {product_pool[i]}" 
                        for i in range(n_records)],
        'category': np.random.choice(theme_data['categories'], n_records),
        'brand': np.random.choice(theme_data['brands'], n_records),
        'unit_price': np.round(np.random.uniform(5, 500, n_records), 2),
        'unit_cost': np.round(np.random.uniform(2, 250, n_records), 2),
        'is_active': np.random.choice([1, 1, 1, 0], n_records)
    })
    dim_product_pd['unit_cost'] = np.minimum(dim_product_pd['unit_cost'], 
                                              dim_product_pd['unit_price'] * 0.7)
    return spark.createDataFrame(dim_product_pd)

def generate_location_dimension(n_records=100):
    loc_name = {"Retail": "Store", "Restaurant": "Restaurant", "Healthcare": "Facility"}[business_type]
    print(f"üìç Locations ({n_records} {loc_name}s)...")
    location_pool = theme_data['locations'] * (n_records // len(theme_data['locations']) + 2)
    np.random.shuffle(location_pool)
    
    dim_location_pd = pd.DataFrame({
        'location_key': range(1, n_records + 1),
        'location_name': [f"{location_pool[i]} {loc_name} #{i+1}" for i in range(n_records)],
        'city': np.random.choice(['New York', 'Los Angeles', 'Chicago', 'Houston', 'Phoenix',
                                 'Philadelphia', 'San Antonio', 'San Diego', 'Dallas', 'Austin'], n_records),
        'state': np.random.choice(['NY', 'CA', 'IL', 'TX', 'AZ', 'PA', 'FL'], n_records),
        'region': np.random.choice(['Northeast', 'Southeast', 'Midwest', 'Southwest', 'West'], n_records),
        'size_sqft': np.random.randint(1000, 10000, n_records),
        'opened_date': pd.to_datetime(
            np.random.choice(pd.date_range('2015-01-01', '2023-12-31', freq='D'), n_records)
        )
    })
    return spark.createDataFrame(dim_location_pd)

def generate_customer_dimension(n_records=300):
    entity = "Patient" if business_type == "Healthcare" else "Customer"
    print(f"üë• {entity}s ({n_records} records)...")
    
    dim_customer_pd = pd.DataFrame({
        'customer_key': range(1, n_records + 1),
        'first_name': np.random.choice(theme_first_names, n_records),
        'last_name': np.random.choice(theme_last_names, n_records),
        'email_domain': np.random.choice(['gmail.com', 'yahoo.com', 'outlook.com', 'company.com'], n_records),
        'age': np.random.randint(18, 80, n_records),
        'gender': np.random.choice(['M', 'F', 'O'], n_records),
        'loyalty_tier': np.random.choice(['Bronze', 'Silver', 'Gold', 'Platinum', None], n_records),
        'join_date': pd.to_datetime(
            np.random.choice(pd.date_range('2018-01-01', '2024-12-31', freq='D'), n_records)
        ),
        'lifetime_value': np.round(np.random.uniform(50, 5000, n_records), 2)
    })
    dim_customer_pd['email'] = (
        dim_customer_pd['first_name'].str.lower() + '.' + 
        dim_customer_pd['last_name'].str.lower() + '@' + 
        dim_customer_pd['email_domain']
    )
    dim_customer_pd = dim_customer_pd.drop(columns=['email_domain'])
    return spark.createDataFrame(dim_customer_pd)

def generate_employee_dimension(n_records=150):
    print(f"üëî Employees ({n_records} records)...")
    roles = {
        "Healthcare": ['Physician', 'Nurse', 'Technician', 'Specialist', 'Therapist', 'Administrator'],
        "Restaurant": ['Chef', 'Server', 'Bartender', 'Host', 'Manager', 'Cook'],
        "Retail": ['Sales Associate', 'Manager', 'Cashier', 'Stock Clerk', 'Supervisor']
    }[business_type]
    
    dim_employee_pd = pd.DataFrame({
        'employee_key': range(1, n_records + 1),
        'first_name': np.random.choice(theme_first_names, n_records),
        'last_name': np.random.choice(theme_last_names, n_records),
        'role': np.random.choice(roles, n_records),
        'department': np.random.choice(['Operations', 'Sales', 'Management', 'Support'], n_records),
        'hire_date': pd.to_datetime(
            np.random.choice(pd.date_range('2015-01-01', '2024-12-31', freq='D'), n_records)
        ),
        'hourly_rate': np.round(np.random.uniform(15, 75, n_records), 2),
        'is_full_time': np.random.choice([1, 1, 0], n_records)
    })
    return spark.createDataFrame(dim_employee_pd)

# Generate dimensions
dim_product = generate_product_dimension(700)
dim_location = generate_location_dimension(100)
dim_customer = generate_customer_dimension(800)
dim_employee = generate_employee_dimension(150)

print("\n‚úÖ All dimensions generated!")

StatementMeta(, e3fa5b02-20b6-4b8f-a663-589c44bbfbe0, 9, Finished, Available, Finished)


üè¢ Generating Dimension Tables...

üè∑Ô∏è  Products (700 records)...
üìç Locations (100 Restaurants)...
üë• Customers (800 records)...
üëî Employees (150 records)...

‚úÖ All dimensions generated!


## üî• Step 7: Generate Fact Table

In [8]:
print("\n" + "="*70)
scale_sizes = {"small": 10000, "medium": 25000, "large": 50000}
fact_row_count = scale_sizes[record_scale]

print(f"üî• Generating Fact Table ({fact_row_count:,} rows)...")
print(f"   This takes 15-30 seconds...\n")

n_products = dim_product.count()
n_locations = dim_location.count()
n_customers = dim_customer.count()
n_employees = dim_employee.count()
n_dates = dim_date.count()

# Create base with foreign keys (pure Spark!)
fact_base = spark.range(1, fact_row_count + 1).toDF("transaction_id")
fact_with_keys = fact_base \
    .withColumn("product_key", (F.col("transaction_id") * 7 + random_seed) % n_products + 1) \
    .withColumn("location_key", (F.col("transaction_id") * 11 + random_seed) % n_locations + 1) \
    .withColumn("customer_key", (F.col("transaction_id") * 13 + random_seed) % n_customers + 1) \
    .withColumn("employee_key", (F.col("transaction_id") * 17 + random_seed) % n_employees + 1) \
    .withColumn("date_key", (F.col("transaction_id") * 19 + random_seed) % n_dates + 1) \
    .withColumn("hour_of_day", (F.col("transaction_id") * 23 + random_seed) % 24) \
    .withColumn("_seed", (F.col("transaction_id") + random_seed))

# Add business-specific metrics
if business_type == "Retail":
    fact_table = fact_with_keys \
        .withColumn("quantity", (F.col("_seed") % 10) + 1) \
        .withColumn("discount_percent", F.when(F.col("_seed") % 3 == 0, (F.col("_seed") % 20) + 5).otherwise(0)) \
        .withColumn("is_online", (F.col("_seed") % 4 == 0).cast("int")) \
        .withColumn("is_return", (F.col("_seed") % 20 == 0).cast("int"))
    fact_name = "sales"
elif business_type == "Restaurant":
    fact_table = fact_with_keys \
        .withColumn("quantity", (F.col("_seed") % 5) + 1) \
        .withColumn("table_number", (F.col("_seed") % 30) + 1) \
        .withColumn("party_size", (F.col("_seed") % 8) + 1) \
        .withColumn("is_takeout", (F.col("_seed") % 5 == 0).cast("int")) \
        .withColumn("tip_percent", F.when(F.col("is_takeout") == 0, (F.col("_seed") % 10) + 15).otherwise(0))
    fact_name = "orders"
else:  # Healthcare
    fact_table = fact_with_keys \
        .withColumn("quantity", F.lit(1)) \
        .withColumn("visit_type", 
                   F.when(F.col("_seed") % 4 == 0, F.lit("Emergency"))
                   .when(F.col("_seed") % 4 == 1, F.lit("Routine"))
                   .when(F.col("_seed") % 4 == 2, F.lit("Follow-up"))
                   .otherwise(F.lit("Specialist"))) \
        .withColumn("visit_duration_minutes", (F.col("_seed") % 120) + 15) \
        .withColumn("is_insured", (F.col("_seed") % 10 != 0).cast("int")) \
        .withColumn("insurance_copay", F.when(F.col("is_insured") == 1, (F.col("_seed") % 50) + 10).otherwise(0))
    fact_name = "visits"

fact_table = fact_table.drop("_seed")

# Join with product for pricing
fact_table = fact_table.join(
    dim_product.select("product_key", "unit_price", "unit_cost"),
    on="product_key", how="left"
)

# Calculate amounts
fact_table = fact_table.withColumn("gross_amount", F.col("quantity") * F.col("unit_price"))

if business_type == "Retail":
    fact_table = fact_table \
        .withColumn("discount_amount", F.col("gross_amount") * (F.col("discount_percent") / 100)) \
        .withColumn("net_amount", F.col("gross_amount") - F.col("discount_amount"))
elif business_type == "Restaurant":
    fact_table = fact_table \
        .withColumn("tip_amount", F.col("gross_amount") * (F.col("tip_percent") / 100)) \
        .withColumn("net_amount", F.col("gross_amount") + F.col("tip_amount"))
else:
    fact_table = fact_table \
        .withColumn("insurance_covered", F.when(F.col("is_insured") == 1, F.col("gross_amount") * 0.8).otherwise(0)) \
        .withColumn("patient_responsibility", F.col("gross_amount") - F.col("insurance_covered") + F.col("insurance_copay")) \
        .withColumn("net_amount", F.col("gross_amount"))

fact_table = fact_table \
    .withColumn("cost_amount", F.col("quantity") * F.col("unit_cost")) \
    .withColumn("profit_amount", F.col("net_amount") - F.col("cost_amount"))

# Round monetary columns
money_cols = [c for c in fact_table.columns if "amount" in c or "price" in c or "cost" in c]
for col in money_cols:
    fact_table = fact_table.withColumn(col, F.round(F.col(col), 2))

print(f"‚úÖ Fact table generated: {fact_table.count():,} rows")

StatementMeta(, e3fa5b02-20b6-4b8f-a663-589c44bbfbe0, 10, Finished, Available, Finished)


üî• Generating Fact Table (25,000 rows)...
   This takes 15-30 seconds...

‚úÖ Fact table generated: 25,000 rows


In [9]:
# ============================================================================
# DROP EXISTING TABLES (Clean Slate)
# ============================================================================

print("üóëÔ∏è  Dropping existing demo tables...")
print("="*70 + "\n")

# Get all existing tables
tables = spark.sql("SHOW TABLES").select("tableName").rdd.flatMap(lambda x: x).collect()

# Drop all demo tables and materialized views
tables_to_drop = [t for t in tables if t.startswith("demo_") or t.startswith("mv_")]

if tables_to_drop:
    for table in tables_to_drop:
        try:
            # Check if it's a materialized view or regular table
            if table.startswith("mv_"):
                spark.sql(f"DROP MATERIALIZED VIEW IF EXISTS {table}")
                print(f"   Dropped MV: {table}")
            else:
                spark.sql(f"DROP TABLE IF EXISTS {table}")
                print(f"   Dropped table: {table}")
        except Exception as e:
            print(f"   ‚ö†Ô∏è  Couldn't drop {table}: {str(e)}")
    
    print(f"\n‚úÖ Dropped {len(tables_to_drop)} objects")
else:
    print("   No existing tables found")

print("="*70 + "\n")

StatementMeta(, e3fa5b02-20b6-4b8f-a663-589c44bbfbe0, 11, Finished, Available, Finished)

üóëÔ∏è  Dropping existing demo tables...

   Dropped table: demo_dim_customer
   Dropped table: demo_dim_date
   Dropped table: demo_dim_employee
   Dropped table: demo_dim_location
   Dropped table: demo_dim_product
   Dropped table: demo_fact_orders
   Dropped table: demo_fact_visits

‚úÖ Dropped 7 objects



## üíæ Step 8: Write to Lakehouse

In [10]:
theme_clean = theme.lower().replace(" ", "_")[:20]  # Limit length
table_prefix = f"{theme_clean}"

# Results in tables like:
# sci_fi_formalwear_dim_product
# sci_fi_formalwear_fact_sales

print("\n" + "="*70)
print("üíæ Writing tables to Lakehouse...\n")

table_prefix = "demo"
tables_to_write = [
    (dim_date, f"{table_prefix}_dim_date"),
    (dim_product, f"{table_prefix}_dim_product"),
    (dim_location, f"{table_prefix}_dim_location"),
    (dim_customer, f"{table_prefix}_dim_customer"),
    (dim_employee, f"{table_prefix}_dim_employee"),
    (fact_table, f"{table_prefix}_fact_{fact_name}")
]

for df, table_name in tables_to_write:
    print(f"   Writing {table_name}...")
    df.write.format("delta").mode("overwrite").saveAsTable(table_name)
    print(f"      ‚úÖ {df.count():,} rows\n")

print("="*70)
print("üéâ All tables written to Lakehouse!\n")
print("üìã Tables created:")
for _, table_name in tables_to_write:
    print(f"   - {table_name}")

StatementMeta(, e3fa5b02-20b6-4b8f-a663-589c44bbfbe0, 12, Finished, Available, Finished)


üíæ Writing tables to Lakehouse...

   Writing demo_dim_date...
      ‚úÖ 365 rows

   Writing demo_dim_product...
      ‚úÖ 700 rows

   Writing demo_dim_location...
      ‚úÖ 100 rows

   Writing demo_dim_customer...
      ‚úÖ 800 rows

   Writing demo_dim_employee...
      ‚úÖ 150 rows

   Writing demo_fact_orders...
      ‚úÖ 25,000 rows

üéâ All tables written to Lakehouse!

üìã Tables created:
   - demo_dim_date
   - demo_dim_product
   - demo_dim_location
   - demo_dim_customer
   - demo_dim_employee
   - demo_fact_orders


In [11]:
# ============================================================================
# üîÑ REFRESH ALL TABLES (Dynamic - works for any business type)
# ============================================================================

print("üîÑ Refreshing all demo tables...")
print("="*70 + "\n")

# Get all tables in the current database
tables = spark.sql("SHOW TABLES").select("tableName").rdd.flatMap(lambda x: x).collect()

# Refresh all demo tables that exist
refreshed = 0
for table in tables:
    if table.startswith("demo_"):
        try:
            spark.sql(f"REFRESH TABLE {table}")
            print(f"   ‚úÖ Refreshed: {table}")
            refreshed += 1
        except Exception as e:
            print(f"   ‚ö†Ô∏è  Couldn't refresh {table}: {str(e)}")

print("\n" + "="*70)
print(f"‚úÖ Refreshed {refreshed} tables - cache cleared!")

StatementMeta(, e3fa5b02-20b6-4b8f-a663-589c44bbfbe0, 13, Finished, Available, Finished)

üîÑ Refreshing all demo tables...

   ‚úÖ Refreshed: demo_dim_customer
   ‚úÖ Refreshed: demo_dim_date
   ‚úÖ Refreshed: demo_dim_employee
   ‚úÖ Refreshed: demo_dim_location
   ‚úÖ Refreshed: demo_dim_product
   ‚úÖ Refreshed: demo_fact_orders

‚úÖ Refreshed 6 tables - cache cleared!


## üëÄ Step 9: Preview the Data

In [12]:
print("\n" + "="*70)
print("üìä DATA PREVIEW\n")

print("üè∑Ô∏è  Themed Products:")
dim_product.select("product_name", "category", "brand", "unit_price").show(10, truncate=False)

print("\nüìç Themed Locations:")
dim_location.select("location_name", "city", "region").show(10, truncate=False)

print("\nüë• Themed Customers:")
dim_customer.select("first_name", "last_name", "email", "loyalty_tier").show(10, truncate=False)

print(f"\nüî• Fact Table (fact_{fact_name}):")
fact_table.show(10, truncate=False)

StatementMeta(, e3fa5b02-20b6-4b8f-a663-589c44bbfbe0, 14, Finished, Available, Finished)


üìä DATA PREVIEW

üè∑Ô∏è  Themed Products:
+-------------------------+-----------------+------------------+----------+
|product_name             |category         |brand             |unit_price|
+-------------------------+-----------------+------------------+----------+
|Tasty Armadillo Appetizer|Stealthy Sides   |Grill Thrills     |447.62    |
|Exotic Snake Sausage     |Roadkill Classics|Trackside Tastings|99.97     |
|Exotic Fox Fritters      |Stealthy Sides   |Street Eats       |165.07    |
|Hearty Toad Tacos        |Wild Wraps       |Furry Feasts      |117.19    |
|Wild Gopher Gumbo        |Savory Soups     |Treaded Treats    |180.72    |
|Savory Fox Fritters      |Sweet Road Trips |Street Eats       |39.36     |
|Tasty Snake Sausage      |Wild Wraps       |Street Eats       |261.93    |
|Wild Fox Fritters        |Savory Soups     |Furry Feasts      |38.47     |
|Stealthy Beaver Bisque   |Hearty Hashes    |Treaded Treats    |401.18    |
|Tasty Coyote Casserole   |Wild Wraps     

## üìà Step 10: Sample Analytics

In [13]:
print("\n" + "="*70)
print("üìà SAMPLE ANALYTICS\n")

print("üí∞ Daily Revenue (Last 10 Days):")
daily_revenue = fact_table.join(dim_date, "date_key") \
    .groupBy("date") \
    .agg(
        F.sum("net_amount").alias("revenue"),
        F.sum("profit_amount").alias("profit"),
        F.count("*").alias("transactions")
    ).orderBy(F.desc("date")).limit(10)
daily_revenue.show(truncate=False)

print("\nüèÜ Top 10 Products by Revenue:")
top_products = fact_table.join(dim_product, "product_key") \
    .groupBy("product_name", "category") \
    .agg(
        F.sum("net_amount").alias("revenue"),
        F.sum("quantity").alias("units")
    ).orderBy(F.desc("revenue")).limit(10)
top_products.show(truncate=False)

print("\nüè™ Top 10 Locations:")
location_perf = fact_table.join(dim_location, "location_key") \
    .groupBy("location_name", "city") \
    .agg(
        F.sum("net_amount").alias("revenue"),
        F.count("*").alias("transactions")
    ).orderBy(F.desc("revenue")).limit(10)
location_perf.show(truncate=False)

print("\n" + "="*70)
print("‚úÖ Generation Complete!")
print(f"\nüéâ Your {theme} {business_type} dataset is ready!")
print(f"   - {fact_row_count:,} transactions generated")
print(f"   - 6 tables written to lakehouse")
print(f"   - Ready for analytics, ML, and BI!")

StatementMeta(, e3fa5b02-20b6-4b8f-a663-589c44bbfbe0, 15, Finished, Available, Finished)


üìà SAMPLE ANALYTICS

üí∞ Daily Revenue (Last 10 Days):
+-------------------+------------------+------------------+------------+
|date               |revenue           |profit            |transactions|
+-------------------+------------------+------------------+------------+
|2025-11-15 00:00:00|18249.260000000002|12896.94          |69          |
|2025-11-14 00:00:00|47600.88999999999 |34137.32000000001 |68          |
|2025-11-13 00:00:00|47638.25          |37295.770000000004|68          |
|2025-11-12 00:00:00|92974.22          |62914.7           |69          |
|2025-11-11 00:00:00|107056.21         |73078.96          |69          |
|2025-11-10 00:00:00|17344.39          |12201.120000000003|68          |
|2025-11-09 00:00:00|45027.26          |32067.63          |68          |
|2025-11-08 00:00:00|49961.340000000004|37333.32          |69          |
|2025-11-07 00:00:00|90739.40999999999 |62385.08          |69          |
|2025-11-06 00:00:00|102048.59999999999|69817.13          |68    

## üéì Next Steps

### What to do with your data:

**1. Build Dashboards**
- Connect Power BI to your lakehouse
- Create visualizations
- Share with stakeholders

**2. Run SQL Queries**
```sql
SELECT 
    p.category,
    SUM(f.net_amount) as revenue,
    COUNT(*) as transactions
FROM demo_fact_orders f
JOIN demo_dim_product p ON f.product_key = p.product_key
GROUP BY p.category
ORDER BY revenue DESC
```

**3. Train ML Models**
- Revenue forecasting
- Customer segmentation
- Product recommendations

**4. Generate More Datasets**
- Try different themes!
- Change business types
- Increase scale to "large"

---

**Happy analyzing! üöÄ**