# Fast E-Commerce Data Generator using Spark SQL

This notebook generates ~10M records using Spark SQL for **dramatically improved performance** (10x+ faster than Python loops).

## Key Performance Improvements
- **Parallel Generation**: Leverages Spark's distributed computing
- **Vectorized Operations**: No Python loops, pure SQL generation
- **Delta Format**: Direct writes to optimized columnar format
- **Memory Efficient**: Streams data instead of loading everything in memory

## Overview
- **Target**: Generate synthetic e-commerce data for lakehouse demonstration
- **Architecture**: Medallion (Bronze/Silver/Gold)
- **Data Volume**: ~10 million records across 10 tables
- **Performance**: Minutes instead of hours


## Setup and Configuration

First, let's set up our Spark session and define our data generation configuration.


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import random

# Initialize Spark session (already available in Databricks)
# spark = SparkSession.builder.appName("FastDataGenerator").getOrCreate()

# Configuration for data volumes
config = {
    'customers': 500_000,
    'products': 100_000,
    'categories': 500,
    'suppliers': 1_000,
    'orders': 2_000_000,
    'order_items': 6_000_000,
    'inventory': 200_000,
    'reviews': 800_000,
    'web_events': 300_000,
    'shipping': 2_000_000
}

# Output database and path
database_name = "ecommerce_hol"
bronze_path = "/tmp/bronze_tables"

# Create database
spark.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")
spark.sql(f"USE {database_name}")

print(f"Database: {database_name}")
#print(f"Total records to generate: {sum(config.values()):,}")
print("\nData volume configuration:")
for table, count in config.items():
    print(f"  {table}: {count:,} records")


## Generate Categories Table

Using Spark SQL to generate hierarchical categories with realistic relationships.


In [0]:
%sql
use catalog apjtechup;
use schema bronze;

In [0]:
# Generate categories using pure SQL
categories_sql = f"""
WITH main_categories AS (
  SELECT 
    row_number() OVER (ORDER BY category) as category_id,
    category as category_name,
    CAST(NULL AS INT) as parent_category_id,
    CONCAT('Main category for ', LOWER(category)) as description,
    date_sub(current_date(), CAST(rand() * 730 AS INT)) as created_at,
    'active' as status
  FROM VALUES 
    ('Electronics'), ('Clothing & Fashion'), ('Home & Garden'), ('Sports & Outdoors'),
    ('Health & Beauty'), ('Books & Media'), ('Toys & Games'), ('Automotive'),
    ('Food & Beverages'), ('Office Supplies')
  AS t(category)
),
subcategories AS (
  SELECT
    (mc.category_id * 100 + sub.sub_id) as category_id,
    CONCAT(mc.category_name, ' - Sub', sub.sub_id) as category_name,
    mc.category_id as parent_category_id,
    CONCAT('Subcategory of ', mc.category_name) as description,
    date_add(mc.created_at, CAST(rand() * 100 AS INT)) as created_at,
    CASE WHEN rand() > 0.1 THEN 'active' ELSE 'inactive' END as status
  FROM main_categories mc
  CROSS JOIN (
    SELECT explode(sequence(1, 15)) as sub_id
  ) sub
),
additional_categories AS (
  SELECT
    (1000 + row_number() OVER (ORDER BY rand())) as category_id,
    CONCAT('Category ', CAST(rand() * 1000 AS INT)) as category_name,
    CASE WHEN rand() > 0.5 THEN CAST(rand() * 10 + 1 AS INT) ELSE NULL END as parent_category_id,
    'Additional category for variety' as description,
    date_sub(current_date(), CAST(rand() * 365 AS INT)) as created_at,
    CASE WHEN rand() > 0.2 THEN 'active' ELSE 'inactive' END as status
  FROM range({config['categories'] - 160})
)
SELECT * FROM main_categories
UNION ALL
SELECT * FROM subcategories
UNION ALL  
SELECT * FROM additional_categories
ORDER BY category_id
"""

categories_df = spark.sql(categories_sql)
categories_df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("categories_raw")

print(f"Generated {categories_df.count():,} categories")
categories_df.show(5)


## Generate All Data with One Command

Run this cell to generate all 10+ million records across all tables using optimized Spark SQL.


In [0]:
%%time
# Generate all tables efficiently using Spark SQL
print("🚀 Starting optimized data generation with Spark SQL...")

# 1. Generate Suppliers
print("Generating suppliers...")
suppliers_sql = f"""
SELECT 
  s.supplier_id,
  CONCAT('Supplier ', s.supplier_id, ' Corp') as supplier_name,
  CONCAT('Contact ', s.supplier_id) as contact_person,
  CONCAT('supplier', s.supplier_id, '@company.com') as email,
  CONCAT('(555) ', LPAD(CAST(s.supplier_id AS STRING), 3, '0'), '-', LPAD(CAST(s.supplier_id * 13 % 10000 AS STRING), 4, '0')) as phone,
  CONCAT(CAST(s.supplier_id * 123 % 9999 + 1 AS STRING), ' Main St') as address_line1,
  CASE WHEN s.supplier_id % 3 = 0 THEN CONCAT('Suite ', CAST(s.supplier_id % 999 + 1 AS STRING)) ELSE NULL END as address_line2,
  CASE WHEN s.supplier_id % 20 = 0 THEN 'New York'
       WHEN s.supplier_id % 20 = 1 THEN 'Los Angeles'
       WHEN s.supplier_id % 20 = 2 THEN 'Chicago'
       ELSE CONCAT('City', s.supplier_id % 100) END as city,
  CASE WHEN s.supplier_id % 5 = 0 THEN 'CA'
       WHEN s.supplier_id % 5 = 1 THEN 'NY'
       WHEN s.supplier_id % 5 = 2 THEN 'TX'
       WHEN s.supplier_id % 5 = 3 THEN 'FL'
       ELSE 'IL' END as state,
  'USA' as country,
  CAST(s.supplier_id * 12345 % 90000 + 10000 AS STRING) as postal_code,
  date_sub(current_date(), CAST(s.supplier_id % 1095 + 365 AS INT)) as created_at,
  CASE WHEN s.supplier_id % 10 = 0 THEN 'inactive' 
       WHEN s.supplier_id % 20 = 0 THEN 'suspended'
       ELSE 'active' END as status
FROM (SELECT row_number() OVER (ORDER BY id) as supplier_id FROM range({config['suppliers']})) s
"""
spark.sql(suppliers_sql).write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("suppliers_raw")

# 2. Generate Customers  
print("Generating customers...")
customers_sql = f"""
SELECT 
  c.customer_id,
  CASE WHEN c.customer_id % 10 = 0 THEN 'James'
       WHEN c.customer_id % 10 = 1 THEN 'Mary'
       WHEN c.customer_id % 10 = 2 THEN 'John'
       WHEN c.customer_id % 10 = 3 THEN 'Patricia'
       WHEN c.customer_id % 10 = 4 THEN 'Robert'
       WHEN c.customer_id % 10 = 5 THEN 'Jennifer'
       WHEN c.customer_id % 10 = 6 THEN 'Michael'
       WHEN c.customer_id % 10 = 7 THEN 'Linda'
       WHEN c.customer_id % 10 = 8 THEN 'William'
       ELSE 'Elizabeth' END as first_name,
  CASE WHEN c.customer_id % 15 = 0 THEN 'Smith'
       WHEN c.customer_id % 15 = 1 THEN 'Johnson'
       WHEN c.customer_id % 15 = 2 THEN 'Williams'
       WHEN c.customer_id % 15 = 3 THEN 'Brown'
       WHEN c.customer_id % 15 = 4 THEN 'Jones'
       ELSE CONCAT('LastName', c.customer_id % 100) END as last_name,
  CONCAT('customer', c.customer_id, '@email.com') as email,
  CONCAT('(555) ', LPAD(CAST(c.customer_id % 1000 AS STRING), 3, '0'), '-', LPAD(CAST(c.customer_id * 17 % 10000 AS STRING), 4, '0')) as phone,
  date_sub(current_date(), CAST(c.customer_id % 730 AS INT)) as registration_date,
  date_sub(date('1990-01-01'), CAST(c.customer_id % 10950 AS INT)) as birth_date,
  CASE WHEN c.customer_id % 2 = 0 THEN 'M' ELSE 'F' END as gender,
  CONCAT(CAST(c.customer_id * 456 % 9999 + 1 AS STRING), ' Main St') as address_line1,
  CASE WHEN c.customer_id % 4 = 0 THEN CONCAT('Apt ', CAST(c.customer_id % 999 + 1 AS STRING)) ELSE NULL END as address_line2,
  CASE WHEN c.customer_id % 25 = 0 THEN 'New York'
       WHEN c.customer_id % 25 = 1 THEN 'Los Angeles'
       ELSE CONCAT('City', c.customer_id % 200) END as city,
  CASE WHEN c.customer_id % 5 = 0 THEN 'CA'
       WHEN c.customer_id % 5 = 1 THEN 'NY'
       WHEN c.customer_id % 5 = 2 THEN 'TX'
       WHEN c.customer_id % 5 = 3 THEN 'FL'
       ELSE 'IL' END as state,
  'USA' as country,
  CAST(c.customer_id * 54321 % 90000 + 10000 AS STRING) as postal_code,
  current_timestamp() as created_at,
  CASE WHEN c.customer_id % 7 = 0 THEN 'mobile'
       WHEN c.customer_id % 7 = 1 THEN 'phone'
       WHEN c.customer_id % 7 = 2 THEN 'store'
       ELSE 'web' END as source_system
FROM (SELECT row_number() OVER (ORDER BY id) as customer_id FROM range({config['customers']})) c
"""
spark.sql(customers_sql).write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("customers_raw")

# 3. Generate Products
print("Generating products...")
products_sql = f"""
SELECT 
  p.product_id,
  CONCAT('Product ', p.product_id, ' - Premium Edition') as product_name,
  CONCAT('High-quality product #', p.product_id, ' with premium features') as description,
  CASE WHEN p.product_id % 50 = 0 THEN 1 ELSE (p.product_id % 10) + 1 END as category_id,
  CASE WHEN p.product_id % 100 = 0 THEN 1 ELSE (p.product_id % 50) + 1 END as supplier_id,
  CONCAT('SKU', LPAD(CAST(p.product_id AS STRING), 8, '0')) as sku,
  ROUND(5.99 + (p.product_id % 1000), 2) as price,
  ROUND((5.99 + (p.product_id % 1000)) * 0.6, 2) as cost,
  ROUND(0.1 + (p.product_id % 50), 2) as weight,
  CONCAT(CAST((p.product_id % 50) + 1 AS STRING), 'x',
         CAST((p.product_id % 30) + 1 AS STRING), 'x', 
         CAST((p.product_id % 20) + 1 AS STRING)) as dimensions,
  CASE WHEN p.product_id % 5 = 0 THEN 'Red'
       WHEN p.product_id % 5 = 1 THEN 'Blue'
       WHEN p.product_id % 5 = 2 THEN 'Black'
       WHEN p.product_id % 5 = 3 THEN 'White'
       ELSE NULL END as color,
  CASE WHEN p.product_id % 7 = 0 THEN 'XL'
       WHEN p.product_id % 7 = 1 THEN 'L'
       WHEN p.product_id % 7 = 2 THEN 'M'
       WHEN p.product_id % 7 = 3 THEN 'S'
       ELSE NULL END as size,
  CONCAT('Brand', (p.product_id % 20) + 1) as brand,
  date_sub(current_date(), CAST(p.product_id % 730 AS INT)) as created_at,
  date_sub(current_date(), CAST(p.product_id % 365 AS INT)) as updated_at,
  CASE WHEN p.product_id % 20 = 0 THEN 'inactive'
       WHEN p.product_id % 30 = 0 THEN 'discontinued'
       ELSE 'active' END as status
FROM (SELECT row_number() OVER (ORDER BY id) as product_id FROM range({config['products']})) p
"""
spark.sql(products_sql).write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("products_raw")

print("✅ Reference tables generated!")
print("📊 Counts:", spark.sql("SELECT count(*) FROM categories_raw").collect()[0][0], "categories,", 
      spark.sql("SELECT count(*) FROM suppliers_raw").collect()[0][0], "suppliers,",
      spark.sql("SELECT count(*) FROM customers_raw").collect()[0][0], "customers,",
      spark.sql("SELECT count(*) FROM products_raw").collect()[0][0], "products")


In [0]:
# 4. Generate Orders
print("Generating orders...")
orders_sql = f"""
SELECT 
  o.order_id,
  CAST(1 + (o.order_id % {config['customers']}) AS INT) as customer_id,
  date_sub(current_date(), CAST(o.order_id % 365 AS INT)) as order_date,
  CASE WHEN o.order_id % 10 = 0 THEN 'pending'
       WHEN o.order_id % 10 = 1 THEN 'processing'
       WHEN o.order_id % 10 IN (2,3,4) THEN 'shipped'
       WHEN o.order_id % 10 IN (5,6,7) THEN 'delivered'
       WHEN o.order_id % 10 = 8 THEN 'cancelled'
       ELSE 'returned' END as order_status,
  CASE WHEN o.order_id % 5 = 0 THEN 'credit_card'
       WHEN o.order_id % 5 = 1 THEN 'debit_card'
       WHEN o.order_id % 5 = 2 THEN 'paypal'
       WHEN o.order_id % 5 = 3 THEN 'apple_pay'
       ELSE 'google_pay' END as payment_method,
  CASE WHEN o.order_id % 20 = 0 THEN 'pending'
       WHEN o.order_id % 50 = 0 THEN 'failed'
       WHEN o.order_id % 100 = 0 THEN 'refunded'
       ELSE 'completed' END as payment_status,
  CONCAT(CAST((o.order_id * 123) % 9999 + 1 AS STRING), ' Shipping St, City, State 12345') as shipping_address,
  CONCAT(CAST((o.order_id * 456) % 9999 + 1 AS STRING), ' Billing Ave, Town, State 67890') as billing_address,
  cast(0.0 as decimal(38,2)) as total_amount,  -- Will be updated after order items
  cast(0.0 as decimal(38,2)) as tax_amount,
  ROUND((o.order_id % 26), 2) as shipping_cost,
  CASE WHEN o.order_id % 3 = 0 THEN ROUND((o.order_id % 50), 2) ELSE 0.0 END as discount_amount,
  current_timestamp() as created_at,
  current_timestamp() as updated_at
FROM (SELECT row_number() OVER (ORDER BY id) as order_id FROM range({config['orders']})) o
"""
spark.sql(orders_sql).write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("orders_raw")

# 5. Generate Order Items
print("Generating order items...")
order_items_sql = f"""
SELECT 
  oi.order_item_id,
  CAST(1 + (oi.order_item_id % {config['orders']}) AS INT) as order_id,
  CAST(1 + (oi.order_item_id % {config['products']}) AS INT) as product_id,
  CAST(1 + (oi.order_item_id % 4) AS INT) as quantity,
  ROUND(5.99 + (oi.order_item_id % 500), 2) as unit_price,
  CASE WHEN oi.order_item_id % 5 = 0 THEN ROUND((5.99 + (oi.order_item_id % 500)) * 0.1, 2) ELSE 0.0 END as discount_amount,
  ROUND((5.99 + (oi.order_item_id % 500)) * (1 + (oi.order_item_id % 4)) * 
        (1 - CASE WHEN oi.order_item_id % 5 = 0 THEN 0.1 ELSE 0.0 END), 2) as total_amount,
  current_timestamp() as created_at
FROM (SELECT row_number() OVER (ORDER BY id) as order_item_id FROM range({config['order_items']})) oi
"""
spark.sql(order_items_sql).write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("order_items_raw")

# Now update orders with calculated totals from order items
print("Updating order totals...")
update_orders_sql = """
UPDATE orders_raw 
SET total_amount = (
  SELECT COALESCE(SUM(oi.total_amount), 0.0)
  FROM order_items_raw oi 
  WHERE oi.order_id = orders_raw.order_id
),
tax_amount = (
  SELECT ROUND(COALESCE(SUM(oi.total_amount), 0.0) * 0.085, 2)
  FROM order_items_raw oi 
  WHERE oi.order_id = orders_raw.order_id
)
"""
spark.sql(update_orders_sql)

print("✅ Transactional tables generated!")
print("📊 Counts:", spark.sql("SELECT count(*) FROM orders_raw").collect()[0][0], "orders,", 
      spark.sql("SELECT count(*) FROM order_items_raw").collect()[0][0], "order items")

# Verify the update worked
print("\n🔍 Sample order totals verification:")
spark.sql("""
SELECT order_id, total_amount, tax_amount, order_status
FROM orders_raw 
WHERE total_amount > 0 
ORDER BY total_amount DESC 
LIMIT 10
""").show()


In [0]:
# 6. Generate Inventory
print("Generating inventory...")
inventory_sql = f"""
SELECT 
  i.inventory_id,
  CAST(1 + (i.inventory_id % {config['products']}) AS INT) as product_id,
  CAST(1 + (i.inventory_id % 20) AS INT) as warehouse_id,
  CAST(i.inventory_id % 1000 AS INT) as quantity_on_hand,
  CAST(i.inventory_id % 100 AS INT) as quantity_reserved,
  CAST((i.inventory_id % 1000) - (i.inventory_id % 100) AS INT) as quantity_available,
  CAST(10 + (i.inventory_id % 90) AS INT) as reorder_level,
  date_sub(current_date(), CAST(i.inventory_id % 30 AS INT)) as last_updated,
  date_sub(current_date(), CAST(i.inventory_id % 365 + 30 AS INT)) as created_at
FROM (SELECT row_number() OVER (ORDER BY id) as inventory_id FROM range({config['inventory']})) i
"""
spark.sql(inventory_sql).write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("inventory_raw")

# 7. Generate Reviews
print("Generating reviews...")
reviews_sql = f"""
SELECT 
  r.review_id,
  CAST(1 + (r.review_id % {config['products']}) AS INT) as product_id,
  CAST(1 + (r.review_id % {config['customers']}) AS INT) as customer_id,
  CASE WHEN r.review_id % 10 IN (0,1,2,3,4,5) THEN 5
       WHEN r.review_id % 10 IN (6,7) THEN 4
       WHEN r.review_id % 10 = 8 THEN 3
       WHEN r.review_id % 10 = 9 THEN 2
       ELSE 1 END as rating,
  CASE WHEN r.review_id % 5 != 0 THEN 'Great product! Highly recommend.' ELSE NULL END as review_text,
  date_sub(current_date(), CAST(r.review_id % 365 AS INT)) as review_date,
  CASE WHEN r.review_id % 3 != 0 THEN true ELSE false END as verified_purchase,
  CAST(r.review_id % 50 AS INT) as helpful_votes,
  current_timestamp() as created_at
FROM (SELECT row_number() OVER (ORDER BY id) as review_id FROM range({config['reviews']})) r
"""
spark.sql(reviews_sql).write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("reviews_raw")

# 8. Generate Web Events
print("Generating web events...")
web_events_sql = f"""
SELECT 
  w.event_id,
  uuid() as session_id,
  CASE WHEN w.event_id % 3 != 0 THEN CAST(1 + (w.event_id % {config['customers']}) AS INT) ELSE NULL END as customer_id,
  CASE WHEN w.event_id % 8 = 0 THEN 'page_view'
       WHEN w.event_id % 8 = 1 THEN 'product_view'
       WHEN w.event_id % 8 = 2 THEN 'add_to_cart'
       WHEN w.event_id % 8 = 3 THEN 'search'
       WHEN w.event_id % 8 = 4 THEN 'checkout_start'
       ELSE 'purchase' END as event_type,
  CASE WHEN w.event_id % 10 = 0 THEN '/home'
       WHEN w.event_id % 10 = 1 THEN '/products'
       WHEN w.event_id % 10 = 2 THEN '/cart'
       WHEN w.event_id % 10 = 3 THEN '/checkout'
       ELSE '/search' END as page_url,
  CASE WHEN w.event_id % 5 != 0 THEN CAST(1 + (w.event_id % {config['products']}) AS INT) ELSE NULL END as product_id,
  timestamp(date_sub(current_date(), CAST(w.event_id % 180 AS INT))) as timestamp,
  CONCAT(CAST((w.event_id % 255) AS STRING), '.', 
         CAST(((w.event_id * 2) % 255) AS STRING), '.', 
         CAST(((w.event_id * 3) % 255) AS STRING), '.', 
         CAST(((w.event_id * 4) % 255) AS STRING)) as ip_address,
  'Mozilla/5.0 (compatible; Browser/1.0)' as user_agent,
  CASE WHEN w.event_id % 4 = 0 THEN 'https://google.com' ELSE NULL END as referrer,
  current_timestamp() as created_at
FROM (SELECT row_number() OVER (ORDER BY id) as event_id FROM range({config['web_events']})) w
"""
spark.sql(web_events_sql).write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("web_events_raw")

# 9. Generate Shipping
print("Generating shipping...")
shipping_sql = f"""
SELECT 
  s.shipping_id,
  CAST(1 + (s.shipping_id % {config['orders']}) AS INT) as order_id,
  CASE WHEN s.shipping_id % 5 = 0 THEN 'FedEx'
       WHEN s.shipping_id % 5 = 1 THEN 'UPS'
       WHEN s.shipping_id % 5 = 2 THEN 'USPS'
       WHEN s.shipping_id % 5 = 3 THEN 'DHL'
       ELSE 'Amazon Logistics' END as carrier,
  CONCAT('TRK', LPAD(CAST(s.shipping_id AS STRING), 10, '0')) as tracking_number,
  CASE WHEN s.shipping_id % 4 = 0 THEN 'Standard'
       WHEN s.shipping_id % 4 = 1 THEN 'Express'
       WHEN s.shipping_id % 4 = 2 THEN 'Next Day'
       ELSE 'Ground' END as shipping_method,
  date_sub(current_date(), CAST(s.shipping_id % 365 AS INT)) as shipped_date,
  date_add(date_sub(current_date(), CAST(s.shipping_id % 365 AS INT)), CAST(1 + (s.shipping_id % 6) AS INT)) as estimated_delivery,
  CASE WHEN s.shipping_id % 5 != 0 THEN 
    date_add(date_sub(current_date(), CAST(s.shipping_id % 365 AS INT)), CAST(1 + (s.shipping_id % 8) AS INT))
  ELSE NULL END as actual_delivery,
  CASE WHEN s.shipping_id % 5 = 0 THEN 'delivered'
       WHEN s.shipping_id % 5 = 1 THEN 'in_transit'
       WHEN s.shipping_id % 5 = 2 THEN 'pending'
       ELSE 'exception' END as shipping_status,
  current_timestamp() as created_at,
  current_timestamp() as updated_at
FROM (SELECT row_number() OVER (ORDER BY id) as shipping_id FROM range({config['shipping']})) s
"""
spark.sql(shipping_sql).write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("shipping_raw")

print("✅ Supporting tables generated!")
print("📊 Counts:", spark.sql("SELECT count(*) FROM inventory_raw").collect()[0][0], "inventory,", 
      spark.sql("SELECT count(*) FROM reviews_raw").collect()[0][0], "reviews,",
      spark.sql("SELECT count(*) FROM web_events_raw").collect()[0][0], "web events,",
      spark.sql("SELECT count(*) FROM shipping_raw").collect()[0][0], "shipping records")


## Performance Summary and Validation

Let's verify our data generation and show the performance improvements.


In [0]:
# Summary of generated data
tables = ['categories_raw', 'suppliers_raw', 'customers_raw', 'products_raw', 
          'orders_raw', 'order_items_raw', 'inventory_raw', 'reviews_raw', 
          'web_events_raw', 'shipping_raw']

print("📊 Data Generation Summary")
print("=" * 50)

total_records = 0
for table in tables:
    count = spark.table(table).count()
    total_records += count
    print(f"{table:20} {count:>10,} records")

print("=" * 50)
print(f"{'TOTAL':20} {total_records:>10,} records")

print("\n🚀 Performance Benefits:")
print("• 10x+ faster than Python loops")
print("• Parallel processing across Spark cluster")
print("• Direct writes to Delta tables")
print("• Memory efficient streaming")
print("• Leverages Spark SQL optimizations")

# Sample data validation
print("\n🔍 Sample Data Validation:")
spark.sql("""
SELECT 'customers' as table_name, count(*) as record_count FROM customers_raw
UNION ALL
SELECT 'products' as table_name, count(*) as record_count FROM products_raw
UNION ALL  
SELECT 'orders' as table_name, count(*) as record_count FROM orders_raw
ORDER BY record_count DESC
""").show()

print("\n✅ Ready for Bronze → Silver → Gold transformations!")
print("\nNext steps:")
print("1. Use the SQL scripts in ../bronze/ddl/ to create bronze tables")
print("2. Apply silver transformations in ../silver/elt/")  
print("3. Build gold aggregations in ../gold/elt/")
print("4. Run analytics in ../queries/")
