# Importing necessary packages


In [0]:
from pyspark.sql import functions as F, types as T, Window
from datetime import datetime, timedelta
import uuid, random, json
import random as _r

# Preparing dataspace in UC ( unity catalog)

In [0]:
%skip
%sql
--create catalog if not exists workspace2;
create schema  if not exists workspace.files comment 'test schema';
create volume  if not exists  workspace.files.testbed COMMENT 'Testbed for sample data';

# Setting up path
Before filling it up create volume in catalog and copy base path from there. 

In [0]:

BRONZE_ROOT = f"/Volumes/workspace/files/testbed/hybrid_retail/bronze"
LANDING_ROOT = f"/Volumes/workspace/files/testbed/hybrid_retail/landing"

PATHS = {
    "customers": f"{BRONZE_ROOT}/customers",
    "products":  f"{BRONZE_ROOT}/products",
    "stores":    f"{BRONZE_ROOT}/stores",
    "employees": f"{BRONZE_ROOT}/employees",
    "inventory": f"{BRONZE_ROOT}/inventory",
    "sales":     f"{BRONZE_ROOT}/sales",
    "api_products_updates": f"{LANDING_ROOT}/api_products_updates_json",
    "sftp_inventory":       f"{LANDING_ROOT}/sftp_inventory_csv",
}

# Create Required Directories

In [0]:
for p in PATHS.values():
    dbutils.fs.mkdirs(p)

# Adding Guiding Elements 

In [0]:
ODD_NAMES = ["Müller","José","Zoë","François","IKEA – Malmö","Café Au Lait","Pokémon","Naïve","Smörgås"]
CURRENCIES = ["SEK","EUR","USD","GBP"]
COUNTRIES = ["SE","DE","FR","NL","GB","US"]
REGIONS = ["Nordics","DACH","Benelux","UKI","North America"]
CATEGORIES = ["Electronics","Home Office","Audio","Accessories","Gaming","Appliances"]
BRANDS = ["Contoso","Fabrikam","Litware","Northwind","AdventureWorks","Tailspin"]


SEED = 42
random.seed(SEED)

# Creating required functions

In [0]:

def new_uuid():
    return str(uuid.uuid4())

def random_ts_days_ago(max_days=365):
    return (datetime.utcnow() - timedelta(days=random.randint(0, max_days)))


@F.udf("string")
def udf_random_currency():
    import random
    return random.choice(CURRENCIES)

@F.udf("double")
def udf_price_noise(x):
    import random
    def monetary_noise(amount: float):
        r = random.random()
        if r < 0.01:
            return round(amount * random.uniform(10, 40), 2)
        if r < 0.03:
            return round(amount * random.uniform(0.01, 0.25), 2)
        return round(amount, 2)
    return monetary_noise(x if x else 0.0)


def inject_encoding(s):
    return _r.choice(ODD_NAMES) if _r.random() < 0.05 else s

def maybe_null(v, p=0.03):
    return None if _r.random() < p else v

def maybe_empty(v, p=0.02):
    return "" if _r.random() < p else v

def maybe_inconsistent_date(ts, p=0.05):
    if _r.random() > p:
        return ts.strftime("%Y-%m-%d %H:%M:%S")
    choices = [ts.strftime("%d/%m/%Y"), ts.strftime("%m-%d-%Y"), ts.isoformat(timespec="seconds"), "2025-13-40"]
    return _r.choice(choices)

# ---- Dimensions

def gen_customers(n):
    rows = []
    for _ in range(n):
        cid = new_uuid()
        created_at = random_ts_days_ago(900)
        last_updated = created_at
        is_active = 1 if _r.random() > 0.05 else 0
        first = _r.choice(["Alice","Bob","Charlie","Diana","Erik","Fatima","Göran","Hanna","Ivan","Julia","Karl"])
        last  = _r.choice(["Andersson","Berg","Carlsson","Dahl","Ekström","Fischer","Ghosh","Hernández","Ilyas","Johansson"])
        full  = inject_encoding(f"{first} {last}")
        email = maybe_empty(maybe_null(f"{first.lower()}.{last.lower()}@example.com"))
        phone = maybe_null(f"+46{_r.randint(700000000, 799999999)}")
        birth = random_ts_days_ago(365*60)
        country = _r.choice(COUNTRIES)
        currency = _r.choice(CURRENCIES)
        rows.append((cid, full, email, phone, maybe_inconsistent_date(birth), country, currency, is_active, maybe_inconsistent_date(last_updated), maybe_inconsistent_date(created_at)))
    schema = T.StructType([
        T.StructField("customer_id", T.StringType(), False),
        T.StructField("full_name", T.StringType(), True),
        T.StructField("email", T.StringType(), True),
        T.StructField("phone", T.StringType(), True),
        T.StructField("birth_date", T.StringType(), True),
        T.StructField("country_code", T.StringType(), True),
        T.StructField("preferred_currency", T.StringType(), True),
        T.StructField("is_active", T.IntegerType(), True),
        T.StructField("last_updated", T.StringType(), True),
        T.StructField("created_at", T.StringType(), True),
    ])
    return spark.createDataFrame(rows, schema)

def gen_stores(n):
    rows = []
    for i in range(n):
        sid = new_uuid()
        created_at = random_ts_days_ago(1200)
        last_updated = created_at
        is_active = 1 if _r.random() > 0.08 else 0
        name = inject_encoding(f"Store-{i+1:03d}")
        region = _r.choice(REGIONS)
        city = _r.choice(["Malmö","Stockholm","Göteborg","Lund","Helsingborg","Uppsala","Copenhagen","Hamburg","Amsterdam","London","Seattle"])
        rows.append((sid, name, region, city, is_active, maybe_inconsistent_date(last_updated), maybe_inconsistent_date(created_at)))
    schema = T.StructType([
        T.StructField("store_id", T.StringType(), False),
        T.StructField("store_name", T.StringType(), True),
        T.StructField("region", T.StringType(), True),
        T.StructField("city", T.StringType(), True),
        T.StructField("is_active", T.IntegerType(), True),
        T.StructField("last_updated", T.StringType(), True),
        T.StructField("created_at", T.StringType(), True),
    ])
    return spark.createDataFrame(rows, schema)

def gen_products(n):
    rows = []
    for i in range(n):
        pid = new_uuid()
        created_at = random_ts_days_ago(1500)
        last_updated = created_at
        is_active = 1 if _r.random() > 0.03 else 0
        name = inject_encoding(_r.choice(["Laptop","Smartphone","Headphones","Monitor","Keyboard","Mouse","Printer","Webcam","Speaker","VR Headset","SSD","HDD"])) + f" {_r.randint(100,999)}"
        brand = _r.choice(BRANDS)
        category = _r.choice(CATEGORIES)
        list_price = round(_r.uniform(9.99, 3499.0), 2)
        cost = round(list_price * _r.uniform(0.4, 0.85), 2)
        currency = _r.choice(CURRENCIES)
        rows.append((pid, name, brand, category, cost, list_price, currency, is_active, maybe_inconsistent_date(last_updated), maybe_inconsistent_date(created_at)))
    schema = T.StructType([
        T.StructField("product_id", T.StringType(), False),
        T.StructField("product_name", T.StringType(), True),
        T.StructField("brand", T.StringType(), True),
        T.StructField("category", T.StringType(), True),
        T.StructField("unit_cost", T.DoubleType(), True),
        T.StructField("list_price", T.DoubleType(), True),
        T.StructField("currency", T.StringType(), True),
        T.StructField("is_active", T.IntegerType(), True),
        T.StructField("last_updated", T.StringType(), True),
        T.StructField("created_at", T.StringType(), True),
    ])
    return spark.createDataFrame(rows, schema)

def gen_employees(n, store_df):
    sids = [r.store_id for r in store_df.select("store_id").collect()]
    rows = []
    for _ in range(n):
        eid = new_uuid()
        store = _r.choice(sids)
        hire = random_ts_days_ago(2000)
        last_updated = hire
        term_flag = 1 if _r.random() < 0.07 else 0
        term_date = random_ts_days_ago(300) if term_flag else None
        is_active = 0 if term_flag else 1
        first = _r.choice(["Asha","Bengt","Chen","Deepa","Eva","Farid","Greta","Hiro","Isha","Jonas","Kiran","Lars"])
        last  = _r.choice(["Lind","Nguyen","Olofsson","Persson","Quinn","Rahman","Svensson","Tanaka","Ulrich","Vega","Wang","Xu"])
        name = inject_encoding(f"{first} {last}")
        role = _r.choice(["Cashier","Manager","Sales Associate","Stock Clerk","HR","Security","Cleaner","Barista","Technician","Supervisor"])
        salary = round(_r.uniform(24000, 95000), 2)
        rows.append((eid, name, store, role, salary, is_active, maybe_inconsistent_date(last_updated), maybe_inconsistent_date(hire), maybe_inconsistent_date(term_date) if term_date else None))
    schema = T.StructType([
        T.StructField("employee_id", T.StringType(), False),
        T.StructField("employee_name", T.StringType(), True),
        T.StructField("store_id", T.StringType(), True),
        T.StructField("role", T.StringType(), True),
        T.StructField("monthly_salary", T.DoubleType(), True),
        T.StructField("is_active", T.IntegerType(), True),
        T.StructField("last_updated", T.StringType(), True),
        T.StructField("hire_date", T.StringType(), True),
        T.StructField("termination_date", T.StringType(), True),
    ])
    return spark.createDataFrame(rows, schema)

# Generating data

In [0]:
num_customers, num_stores, num_products, num_employees = 5000, 40, 1500, 600
customers_df = gen_customers(num_customers)
stores_df    = gen_stores(num_stores)
products_df  = gen_products(num_products)
employees_df = gen_employees(num_employees, stores_df)

  return (datetime.utcnow() - timedelta(days=random.randint(0, max_days)))


# Saving data to partioned file

In [0]:
load_dt = datetime.utcnow().strftime("%Y-%m-%d")
for name, df in [("customers", customers_df),("stores", stores_df),("products", products_df),("employees", employees_df)]:
    (df.withColumn("ingestion_date", F.lit(load_dt)).write.mode("overwrite").format("delta").partitionBy("ingestion_date").save(PATHS[name]))


  load_dt = datetime.utcnow().strftime("%Y-%m-%d")


# Introducing Updates 
## Product updates
### Suffles rows randomly and select top 1000

In [0]:
# Suffles rows randomly and give 1000 random records 
num_updates = 200
pids = [r.product_id for r in spark.read.format("delta").load(PATHS["products"]).orderBy(F.rand()).select("product_id").limit(1000).collect()]


### Prepare json with induced updates

In [0]:

updates = []
for _ in range(num_updates):
    now_s = datetime.utcnow().isoformat(timespec="seconds")
    r = random.random()
    if r < 0.70 and pids:
        pid = random.choice(pids)
        updates.append({"event_type":"UPDATE","product_id":pid,"list_price":round(random.uniform(5,3999),2),"currency":random.choice(CURRENCIES),"is_active":1,"last_updated":now_s})
    elif r < 0.85:
        pid = new_uuid()
        updates.append({"event_type":"CREATE","product_id":pid,"product_name":inject_encoding(random.choice(["Router","Dock","Microphone","Camera"]))+f" {random.randint(100,999)}","brand":random.choice(BRANDS),"category":random.choice(CATEGORIES),"unit_cost":round(random.uniform(3,1200),2),"list_price":round(random.uniform(9,3999),2),"currency":random.choice(CURRENCIES),"is_active":1,"created_at":now_s,"last_updated":now_s})
    else:
        pid = random.choice(pids)
        updates.append({"event_type":"SOFT_DELETE","product_id":pid,"is_active":0,"last_updated":now_s})

upd_df = spark.createDataFrame(updates, "event_type string, product_id string, product_name string, brand string, category string, unit_cost double, list_price double, currency string, is_active int, created_at string, last_updated string")
# .coalesce : used to write all data in single file.
upd_df.coalesce(1).write.mode("append").json(PATHS["api_products_updates"])


  now_s = datetime.utcnow().isoformat(timespec="seconds")


In [0]:
# --- SFTP Simulation: CSV inventory snapshots ---
store_ids = [r.store_id for r in stores_df.select("store_id").collect()]
prod_ids  = [r.product_id for r in products_df.select("product_id").limit(600).collect()]
rows = []
now_iso = datetime.utcnow().isoformat(timespec="seconds")
for sid in store_ids:
    for pid in random.sample(prod_ids, k=min(120, len(prod_ids))):
        qty = max(0, int(random.gauss(40, 20)))
        if random.random() < 0.02:
            qty = int(qty * random.uniform(5, 20))
        unit_cost = round(random.uniform(1, 1200), 2)
        pid_val = pid if random.random() > 0.01 else ""
        sid_val = sid if random.random() > 0.01 else None
        rows.append((sid_val, pid_val, qty, unit_cost, now_iso))
inv_df = spark.createDataFrame(rows, "store_id string, product_id string, on_hand int, unit_cost double, snapshot_ts string")
inv_df.coalesce(1).write.mode("append").option("header","true").csv(PATHS["sftp_inventory"])


  now_iso = datetime.utcnow().isoformat(timespec="seconds")


In [0]:

# --- Streaming sales to Event Hubs + bronze archive ---
customers_b = customers_df.select("customer_id").distinct()
stores_b    = stores_df.select("store_id").distinct()
products_b  = products_df.select("product_id", "list_price")

cids = [r.customer_id for r in customers_b.limit(2000).collect()]
sids = [r.store_id for r in stores_b.collect()]
prows = [(r.product_id, float(r.list_price)) for r in products_b.limit(1500).collect()]

@F.udf("string")
def udf_rand_cust():
    import random
    return random.choice(cids) if cids else str(uuid.uuid4())

@F.udf("string")
def udf_rand_store():
    import random
    return random.choice(sids) if sids else str(uuid.uuid4())

@F.udf("struct<product_id:string, unit_price:double>")
def udf_rand_product():
    import random
    if prows:
        pid, price = random.choice(prows)
        price = float(price) if price else round(random.uniform(5, 3999),2)
        return {"product_id": pid, "unit_price": price}
    return {"product_id": str(uuid.uuid4()), "unit_price": round(random.uniform(5, 3999),2)}

order_stream = (spark.readStream.format("rate").option("rowsPerSecond", 20).load()
    .withColumn("event_ts", F.current_timestamp())
    .withColumn("order_id", F.expr("uuid()"))
    .withColumn("customer_id", udf_rand_cust())
    .withColumn("store_id", udf_rand_store())
    .withColumn("ps", udf_rand_product())
    .withColumn("product_id", F.col("ps.product_id"))
    .withColumn("unit_price", F.col("ps.unit_price"))
    .drop("ps")
    .withColumn("quantity", (F.rand()*5 + 1).cast("int"))
    .withColumn("currency", udf_random_currency())
    .withColumn("gross_amount", F.col("unit_price")*F.col("quantity"))
    .withColumn("event_time_str", F.when(F.rand()<0.12, F.date_format(F.col("event_ts"), "dd/MM/yyyy HH:mm:ss")).otherwise(F.date_format(F.col("event_ts"), "yyyy-MM-dd HH:mm:ss")))
)



In [0]:

bronze_query = (
    order_stream
    .withColumn("ingestion_date", F.to_date(F.current_timestamp()))
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", f"{PATHS['sales']}/_chk_bronze")
    .trigger(availableNow=True)
    .start(PATHS["sales"])
    )

In [0]:
spark.read.format('delta').load(PATHS["sales"] ).display()

timestamp,value,event_ts,order_id,customer_id,store_id,product_id,unit_price,quantity,currency,gross_amount,event_time_str,ingestion_date
2025-11-01T05:12:32.430Z,1,2025-11-01T05:12:32.994Z,2936060c-761b-4e0e-8c8e-83257cedd47d,ae110c11-92f5-4003-b098-e209b1e34943,8834b80b-8915-4873-bb53-e1c901b6e05d,d40971b4-294e-4df2-a85b-b485745432f1,1342.18,1,SEK,1342.18,2025-11-01 05:12:32,2025-11-01
2025-11-01T05:12:32.830Z,9,2025-11-01T05:12:32.994Z,3dc84714-b073-4eaa-8688-a7a7c74f258c,48f6f91e-51bc-498e-b0a7-4535ef67720a,7690015a-9ddc-4b6f-856d-69d72fd5400a,777fff37-85e1-477f-896f-2b9093461580,162.74,2,SEK,325.48,2025-11-01 05:12:32,2025-11-01
2025-11-01T05:12:32.380Z,0,2025-11-01T05:12:32.994Z,692f63df-5980-4727-813d-04f0941e5d11,a2899654-7a5c-4b94-b78e-9cde52e256cd,7ad5ca33-71e5-4046-a227-39510503e352,4600910b-ff14-4b4a-a23f-b77895c66419,491.23,5,EUR,2456.15,2025-11-01 05:12:32,2025-11-01
2025-11-01T05:12:32.780Z,8,2025-11-01T05:12:32.994Z,2e325b99-41f2-4991-9779-9a4e32004a10,e930f402-ce4e-437c-98e3-572a2461b3b4,8834b80b-8915-4873-bb53-e1c901b6e05d,0e721437-fdfa-481b-874d-e94cc7da7735,100.24,3,GBP,300.72,2025-11-01 05:12:32,2025-11-01
2025-11-01T05:12:32.530Z,3,2025-11-01T05:12:32.994Z,f1b40658-b81e-4af6-92ae-98b0afe287aa,91a3522c-6b2b-4113-81bd-7458c390ec5c,0a884392-ed30-4af3-98c4-c1d915122b2d,a6c27ea5-f09f-4a1d-9b2c-95e45ac9c74d,1779.03,5,SEK,8895.15,2025-11-01 05:12:32,2025-11-01
2025-11-01T05:12:32.930Z,11,2025-11-01T05:12:32.994Z,47be7a98-016d-40a6-9ca6-a53577bf47af,fcc22e9d-9e13-4e5a-b349-9becf72e8c5d,30ac6943-ba56-4afb-83c2-1768cd1f62dd,f6c1376c-d41a-432e-b224-aa17d573e965,2788.63,4,GBP,11154.52,2025-11-01 05:12:32,2025-11-01
2025-11-01T05:12:32.480Z,2,2025-11-01T05:12:32.994Z,8269468e-e368-49df-ad04-63480f2771e0,f6aad6f3-be3a-4208-88d1-d014fcd75ffe,229aeedc-4f11-4fdd-9af1-076b9bec1f2a,11e02db4-5c7b-4510-aecc-a8cd36f7d19a,2525.19,2,GBP,5050.38,2025-11-01 05:12:32,2025-11-01
2025-11-01T05:12:32.880Z,10,2025-11-01T05:12:32.994Z,4681d145-7487-469e-b139-c5faaec30d69,4c55859f-604b-4940-b11f-25eab9fc3712,416762e0-3127-42be-aa5a-f75cdf2b1db7,15b96603-1b6c-4696-a01d-58dbe5a578a1,2394.6,3,EUR,7183.799999999999,2025-11-01 05:12:32,2025-11-01
2025-11-01T05:12:32.580Z,4,2025-11-01T05:12:32.994Z,898cc94f-94b2-48f3-8ca2-044e1b9363ab,6bb24b5b-527c-41ea-8847-ba9887742ad4,60d1b93a-53a4-4c16-ae69-d189e7f745e7,9b1a077b-fb7f-4432-86c8-3c262fbf5010,243.45,1,SEK,243.45,2025-11-01 05:12:32,2025-11-01
2025-11-01T05:12:32.630Z,5,2025-11-01T05:12:32.994Z,a2dffd20-f34b-40e5-8b86-3078a26fe1c9,c1fd1301-9be3-43db-bdea-258df247bf5a,0a884392-ed30-4af3-98c4-c1d915122b2d,e6cb927d-56a4-4f4a-9772-047c8eda7026,1011.81,5,SEK,5059.049999999999,2025-11-01 05:12:32,2025-11-01
