In [0]:
pip install --upgrade dbldatagen

In [0]:

# ---------------------------------------------
# CONFIG & WIDGETS
# ---------------------------------------------
spark.sql("USE CATALOG dev")
db_name = "dev.bronze"
table_name = "dim_product"
full_table_name = f"{db_name}.{table_name}"

# Batch date (YYYY-MM-DD) for deterministic daily generation

from pyspark.sql import functions as F, Window
from datetime import date
as_of_date_col = F.current_date()  
as_of_date_str = date.today().isoformat()  

# Synthetic generation controls
MODELS_PER_BRAND_ITEM_BASE = 3       # core models per (brand,item)
NEW_MODELS_PER_BRAND_ITEM = 3      # new models introduced daily
DAILY_PRICE_CHANGE_RATE = 0.30       # 30% of models get price changes daily
MODEL_PRICE_SPREAD_PCT = 0.15        # +/-15% spread around brand+item baseline
DAILY_DRIFT_PCT = 0.05               # +/-5% daily drift for changing models

GST_RATE = 0.18
FUTURE_DATE = "2099-12-31"

# ---------------------------------------------
# IMPORTS
# ---------------------------------------------
import dbldatagen as dg
import random
import hashlib

# Seed random deterministically by date
seed_val = int(hashlib.sha256((as_of_date_str or str(F.current_date())).encode("utf-8")).hexdigest(), 16) % (10**8)
random.seed(seed_val)

# ---------------------------------------------
# DOMAIN: categories, items, brands
# ---------------------------------------------
name_to_category = {
    "Electronics": {"items": ["Mobiles", "Laptops"]},
    "Appliances": {"items": ["Refrigerators", "Washing Machines"]},
    "Clothing": {"items": ["Jeans", "Shoes"]}
}

brands_by_category = {
    "Electronics": ["Apple", "Samsung", "Dell", "Lenovo", "ASUS"],
    "Appliances": ["LG", "Samsung", "Whirlpool"],
    "Clothing": ["Levi's", "Nike", "Adidas", "Puma"]
}

# Baseline price by (brand, item) for realistic ranges (INR)
price_by_brand_item = {
    # Electronics
    ("Apple", "Mobiles"): 90000,
    ("Samsung", "Mobiles"): 45000,
    ("Lenovo", "Laptops"): 55000,
    ("ASUS", "Laptops"): 60000,
    ("Dell", "Laptops"): 70000,

    # Appliances
    ("LG", "Refrigerators"): 60000,
    ("Samsung", "Refrigerators"): 65000,
    ("Whirlpool", "Refrigerators"): 40000,
    ("LG", "Washing Machines"): 32000,
    ("Samsung", "Washing Machines"): 35000,

    # Clothing
    ("Levi's", "Jeans"): 3800,
    ("Nike", "Shoes"): 9000,
    ("Adidas", "Shoes"): 8000,
    ("Puma", "Shoes"): 5000
}

# Optional core series names to make models look realistic
core_series = {
    # Electronics
    ("Apple", "Mobiles"): ["iPhone 15", "iPhone 15 Pro", "iPhone SE"],
    ("Samsung", "Mobiles"): ["Galaxy S23", "Galaxy A54", "Galaxy M34"],
    ("Dell", "Laptops"): ["XPS 13", "Inspiron 15", "G15"],
    ("Lenovo", "Laptops"): ["ThinkPad X1", "IdeaPad Slim 5", "Legion 5"],
    ("ASUS", "Laptops"): ["ZenBook 14", "VivoBook 15", "ROG Strix G16"],

    # Appliances
    ("LG", "Refrigerators"): ["InstaView", "Side-by-Side 687L", "Frost Free 360L"],
    ("Samsung", "Refrigerators"): ["Family Hub", "Side-by-Side 700L", "Frost Free 345L"],
    ("Whirlpool", "Refrigerators"): ["IntelliFresh 340L", "Double Door 265L"],
    ("LG", "Washing Machines"): ["Front Load 8kg (AI DD)", "Top Load 7kg", "Washer Dryer 9/6kg"],
    ("Samsung", "Washing Machines"): ["EcoBubble 8kg", "Hygiene Steam 7.5kg", "AddWash 9kg"],

    # Clothing
    ("Levi's", "Jeans"): ["511 Slim", "512 Slim Taper", "501 Original", "541 Athletic"],
    ("Nike", "Shoes"): ["Air Force 1", "Air Max 270", "Revolution 6"],
    ("Adidas", "Shoes"): ["Stan Smith", "Ultraboost", "Duramo SL"],
    ("Puma", "Shoes"): ["Smash v2", "RS-X", "Carson Runner"]
}

# ---------------------------------------------
# HELPERS: synthetic model & price generation
# ---------------------------------------------
def make_new_model_name(brand: str, item: str, idx: int, date_str: str) -> str:
    """Generate a 'new' model name that varies by date to simulate new product arrivals."""
    suffix = date_str.replace("-", "") if date_str else "today"
    return f"{brand} {item} {suffix}-{idx}"

def model_price_from_baseline(baseline: float, model_name: str) -> float:
    """Make a stable per-model price around baseline using hash-based offset."""
    # Stable offset per model within +/- MODEL_PRICE_SPREAD_PCT
    h = int(hashlib.sha256(model_name.encode("utf-8")).hexdigest(), 16) % 10000
    frac = (h / 10000.0 - 0.5) * 2.0  # in [-1, +1]
    spread = MODEL_PRICE_SPREAD_PCT * frac
    return max(1.0, round(baseline * (1 + spread), 2))

def maybe_apply_daily_drift(price: float) -> float:
    """Apply daily drift to a subset of models based on DAILY_PRICE_CHANGE_RATE."""
    if random.random() < DAILY_PRICE_CHANGE_RATE:
        drift_frac = (random.random() * 2 - 1) * DAILY_DRIFT_PCT  # [-DRIFT, +DRIFT]
        price = price * (1 + drift_frac)
    return round(price, 2)

# ---------------------------------------------
# SYNTHETIC DAILY DATA GENERATION
# ---------------------------------------------
records = []
date_str = as_of_date_str or "TODAY"

for category, info in name_to_category.items():
    items = info.get("items", [])
    category_brands = brands_by_category.get(category, [])
    for item in items:
        for brand in category_brands:
            # Baseline price for brand+item
            baseline = price_by_brand_item.get((brand, item), 10000)

            # 1) Core models
            core = core_series.get((brand, item), [])
            core_models = core[:MODELS_PER_BRAND_ITEM_BASE] if core else []
            # If fewer core names than base count, synthesize extra
            while len(core_models) < MODELS_PER_BRAND_ITEM_BASE:
                core_models.append(make_new_model_name(brand, item, len(core_models) + 1, "BASE"))

            # 2) New models for the day
            new_models = [make_new_model_name(brand, item, i + 1, date_str) for i in range(NEW_MODELS_PER_BRAND_ITEM)]

            # Combine
            all_models = core_models + new_models

            # Prices per model
            for model in all_models:
                base_price = model_price_from_baseline(baseline, model)  # per-model stable price
                final_price = maybe_apply_daily_drift(base_price)        # daily change for some models
                records.append({
                    "category": category,
                    "name": item,
                    "brand": brand,
                    "model": model,
                    "Price": float(final_price)
                })

# Create daily DF
df_daily = spark.createDataFrame(records)

# ---------------------------------------------
# ENRICH: currency, GST, effective dates, is_current
# ---------------------------------------------
df_src = (
    df_daily
    .withColumn("currency", F.lit("INR"))
    .withColumn("gst_rate", F.lit(GST_RATE))
    .withColumn("price_incl_gst", F.round(F.col("Price") * (1 + F.col("gst_rate")), 2))
    .withColumn("effective_from", as_of_date_col)
    .withColumn("effective_to", F.to_date(F.lit(FUTURE_DATE)))
    .withColumn("is_current", F.lit(True))
)

# ---------------------------------------------
# IDs: stable product_id (hash of natural key), new product_sk per version
# ---------------------------------------------
natural_key_cols = ["category", "name", "brand", "model"]

# Read current max product_sk
exists = spark.catalog.tableExists(full_table_name)
if exists:
    max_sk_val = (
        spark.table(full_table_name)
        .agg(F.max("product_sk").alias("max_sk"))
        .collect()[0]["max_sk"]
    )
    max_sk = int(max_sk_val) if max_sk_val is not None else 0
else:
    max_sk = 0

# Allocate new product_sk for each row
num_rows = df_src.count()
dg_spec = (
    dg.DataGenerator(spark, rows=num_rows, partitions=8)
    .withIdOutput()
    .withColumn("product_id", "string", expr="concat('PID-', cast(id as string))")
    .withColumn("product_sk", "long", expr=f"id + {max_sk} + 1")
)
df_ids = dg_spec.build().select("id", "product_id", "product_sk")

# Deterministic alignment via row_number
w_ids = Window.orderBy(F.col("id"))
df_ids = df_ids.withColumn("rn", F.row_number().over(w_ids))
w_src = Window.orderBy(F.monotonically_increasing_id())
df_src = df_src.withColumn("rn", F.row_number().over(w_src))

df_src = df_src.join(df_ids, on="rn", how="inner").drop("rn", "id")

df_src.display()


In [0]:

spark.sql(f"""
CREATE TABLE IF NOT EXISTS {full_table_name} (
    product_sk BIGINT,
    product_id STRING,
    category STRING,
    name STRING,
    brand STRING,
    model STRING,
    Price DOUBLE,
    currency STRING,
    gst_rate DOUBLE,
    price_incl_gst DOUBLE,
    effective_from DATE,
    effective_to DATE,
    is_current BOOLEAN
)
USING DELTA
""")

# ---------------------------------------------
# OPTIONAL: idempotency cleanup for the same as_of_date
# ---------------------------------------------
if as_of_date_str:
    spark.sql(f"DELETE FROM {full_table_name} WHERE effective_from = DATE('{as_of_date_str}')")


desired_order = [
    "product_sk",
    "product_id",
    "category",
    "name",
    "brand",
    "model",
    "Price",
    "currency",
    "gst_rate",
    "price_incl_gst",
    "effective_from",
    "effective_to",
    "is_current"
]


# Register source view
df_src.createOrReplaceTempView("src_dim_product")

# ---------------------------------------------
# SCD TYPE 2 MERGE: close previous current row on change, insert new version
# ---------------------------------------------
spark.sql(f"""
MERGE INTO {full_table_name} AS tgt
USING src_dim_product AS src
ON  tgt.category = src.category
AND tgt.name     = src.name
AND tgt.brand    = src.brand
AND tgt.model    = src.model
AND tgt.is_current = TRUE

WHEN MATCHED AND (
       tgt.Price          <> src.Price
    OR tgt.currency       <> src.currency
    OR tgt.gst_rate       <> src.gst_rate
    OR tgt.price_incl_gst <> src.price_incl_gst
) THEN
  UPDATE SET
      tgt.effective_to = date_add(src.effective_from, -1),
      tgt.is_current   = FALSE

WHEN NOT MATCHED THEN
  INSERT (
      product_sk,
      product_id,
      category,
      name,
      brand,
      model,
      Price,
      currency,
      gst_rate,
      price_incl_gst,
      effective_from,
      effective_to,
      is_current
  )
  VALUES (
      src.product_sk,
      src.product_id,
      src.category,
      src.name,
      src.brand,
      src.model,
      src.Price,
      src.currency,
      src.gst_rate,
      src.price_incl_gst,
      src.effective_from,
      src.effective_to,
      src.is_current
  )
""")

# ---------------------------------------------
# PREVIEW
# ---------------------------------------------
display(
    spark.table(full_table_name).select(*desired_order)
         .orderBy("category", "name", "brand", "model", "effective_from")
)


In [0]:
%sql
select count(*) from dev.bronze.dim_product