In [0]:
# 01_generate_data_small (writes landing_* tables)

import random
from datetime import datetime, timedelta
import pandas as pd
from pyspark.sql import functions as F 

spark.sql("USE retailpulse_lite")
random.seed(42)

n_customers = 200
n_products  = 50
n_orders    = 500

# Customers
customers = []
for i in range(n_customers):
    customers.append({
        "customer_id": f"C{i:04d}",
        "email": f"user{i}@example.com",
        "country": random.choice(["US","CA","GB"]),
        "created_at": (datetime(2025,1,1) + timedelta(days=random.randint(0,30))).strftime("%Y-%m-%d %H:%M:%S"),
        "status": random.choice(["active","active","inactive"])
    })
customers_df = spark.createDataFrame(pd.DataFrame(customers))

# Products
products = []
cats = ["books","electronics","home"]
for i in range(n_products):
    products.append({
        "product_id": f"P{i:04d}",
        "category": random.choice(cats),
        "price": round(random.uniform(5, 200), 2),
        "active": random.choice([True, True, False])
    })
products_df = spark.createDataFrame(pd.DataFrame(products))

# Orders + Items
orders, items = [], []
start = datetime(2025,2,1)
for i in range(n_orders):
    oid = f"O{i:05d}"
    cid = f"C{random.randint(0, n_customers-1):04d}"
    ts = start + timedelta(minutes=random.randint(0, 10_000))
    status = random.choice(["placed","shipped","delivered","cancelled"])
    orders.append({
        "order_id": oid,
        "customer_id": cid,
        "order_ts": ts.strftime("%Y-%m-%d %H:%M:%S"),
        "status": status,
        "channel": random.choice(["web","mobile"])
    })
    for line in range(1, random.randint(2,4)):
        pid = f"P{random.randint(0, n_products-1):04d}"
        qty = random.randint(1, 2)
        items.append({
            "order_id": oid,
            "line_id": line,
            "product_id": pid,
            "quantity": qty
        })

orders_df = spark.createDataFrame(pd.DataFrame(orders))
items_df  = spark.createDataFrame(pd.DataFrame(items))

# Write landing "full" tables
customers_df.write.mode("overwrite").saveAsTable("landing_customers_full")
products_df.write.mode("overwrite").saveAsTable("landing_products_full")
orders_df.write.mode("overwrite").saveAsTable("landing_orders_full")
items_df.write.mode("overwrite").saveAsTable("landing_order_items_full")

# Incremental batches
cust_incr = customers_df.sample(withReplacement=False, fraction=10/n_customers, seed=1)
cust_incr = cust_incr.withColumn("status", F.lit("active"))
cust_incr.write.mode("overwrite").saveAsTable("landing_customers_incr_20250301")

new_orders = orders_df.sample(withReplacement=False, fraction=20/n_orders, seed=2)
new_orders = (new_orders
              .withColumn("order_id", F.concat(F.lit("ON"), F.monotonically_increasing_id().cast("string")))
              .withColumn("order_ts", F.date_format(F.to_timestamp(F.lit("2025-03-01 00:00:00")) + F.expr("INTERVAL 1 MINUTE"), "yyyy-MM-dd HH:mm:ss"))
             )
new_orders.write.mode("overwrite").saveAsTable("landing_orders_incr_20250301")

display(spark.sql("SHOW TABLES"))


In [0]:
%sql
USE retailpulse_lite;

In [0]:
%sql
SHOW TABLES;

In [0]:
%sql
SELECT count(*) FROM landing_customers_full;