In [0]:
%run /Workspace/Users/kianhow2000@gmail.com/Databricks/01_Data_Engineer_Learning_Plan/Lab-Setup/common-utils

In [0]:
%python
# Databricks notebook source
from pyspark.sql import functions as F

# -----------------------------
# Target paths
# -----------------------------
ORDERS_FILE   = "/Volumes/workspace/data_engineering_labs_00/v01/orders/00.json"
STATUS_FILE   = "/Volumes/workspace/data_engineering_labs_00/v01/status/00.json"
CUSTOMERS_FILE = "/Volumes/workspace/data_engineering_labs_00/v01/customers/00.json"
CUSTOMERS_NEW_FILE = "/Volumes/workspace/data_engineering_labs_00/v01/customers_new/01.json"

SEED = 42

# -----------------------------
# Helpers
# -----------------------------
def banner(title):
    print("\n" + "=" * 60)
    print(title)
    print("=" * 60)

def ensure_parent_dir(file_path: str):
    parent = file_path.rsplit("/", 1)[0]
    dbutils.fs.mkdirs(parent)
    return parent

def write_single_json_file(df, final_file_path: str, overwrite: bool = True):
    """
    Spark JSON writer writes directories. This helper:
      1) writes df as a single part file to a temp directory
      2) moves the part-*.json to the desired final filename
      3) removes temp directory
    """
    ensure_parent_dir(final_file_path)

    tmp_dir = final_file_path + ".__tmp__"
    mode = "overwrite" if overwrite else "error"

    # Clean tmp if it exists
    try:
        dbutils.fs.rm(tmp_dir, recurse=True)
    except Exception:
        pass

    (df.coalesce(1)
       .write
       .mode(mode)
       .json(tmp_dir))

    # Find the single part file
    part_files = [f.path for f in dbutils.fs.ls(tmp_dir) if f.path.endswith(".json")]
    if len(part_files) != 1:
        raise Exception(f"Expected exactly 1 part json in {tmp_dir}, found {len(part_files)}: {part_files}")

    part_path = part_files[0]

    # Remove final if exists (overwrite semantics)
    try:
        dbutils.fs.rm(final_file_path)
    except Exception:
        pass

    dbutils.fs.mv(part_path, final_file_path)

    # Remove temp dir (contains _SUCCESS and maybe crc files)
    dbutils.fs.rm(tmp_dir, recurse=True)

def assert_count(df, expected: int, name: str):
    c = df.count()
    if c != expected:
        raise Exception(f"{name}: expected {expected} rows, got {c}")
    print(f"✅ {name}: {c} rows")

# -----------------------------
# Dataset 1: orders (174 rows)
# -----------------------------
banner("Creating Dataset 1: orders/00.json (174 rows)")

ORDERS_N = 174

# Make order_id such that sample 75123 exists (when idx=124 -> 75000+123 = 75123)
ORDER_ID_BASE = 75000

# Make customer_id range align with customers dataset (see below), and allow sample 23564 to exist:
# customers will be customer_id = 23000 + customer_idx, so 23564 exists when customer_idx=564.
CUSTOMER_ID_BASE = 23000

# Use an epoch seconds base around the provided sample scale
ORDER_TS_BASE = 1640390000  # close to the sample 1640394862

orders_df = (
    spark.range(1, ORDERS_N + 1).withColumnRenamed("id", "idx")
      .withColumn("order_id", (F.lit(ORDER_ID_BASE) + F.col("idx") - F.lit(1)).cast("long"))
      .withColumn(
          "customer_id",
          (F.lit(CUSTOMER_ID_BASE) +
           F.pmod(F.xxhash64(F.col("idx"), F.lit(SEED)), F.lit(1000))).cast("long")
      )
      .withColumn(
          "notifications",
          F.when(F.pmod(F.xxhash64(F.col("order_id"), F.lit(SEED)), F.lit(2)) == 0, F.lit("Y"))
           .otherwise(F.lit("N"))
      )
      .withColumn(
          "order_timestamp",
          (F.lit(ORDER_TS_BASE) +
           F.pmod(F.xxhash64(F.col("order_id"), F.lit(SEED)), F.lit(30 * 24 * 60 * 60))).cast("long")
      )
      .select("customer_id", "notifications", "order_id", "order_timestamp")
)

# Ensure notifications only Y/N
bad_notif = orders_df.filter(~F.col("notifications").isin("Y", "N")).count()
if bad_notif != 0:
    raise Exception(f"Found {bad_notif} invalid notifications values")

assert_count(orders_df, ORDERS_N, "orders_df")

write_single_json_file(orders_df, ORDERS_FILE)
print("✅ Written:", ORDERS_FILE)

# Optional: show a couple of rows (including sample-like scale)
display(orders_df.orderBy("order_id").limit(5))


# -----------------------------
# Dataset 2: status (5000 rows)
# -----------------------------
banner("Creating Dataset 2: status/00.json (5000 rows)")

STATUS_N = 5000

status_values = [
    "on the way",
    "cancelled",
    "return cancelled",
    "report shipping error",
    "delivered",
    "return processed",
    "return picked up",
    "placed",
    "preparing",
    "return requested"
]

# We'll cycle through the 174 order_ids so ALL 5000 rows can join to orders on order_id
# order_id = ORDER_ID_BASE + (id % ORDERS_N)
status_df = (
    spark.range(0, STATUS_N).withColumnRenamed("id", "idx")
      .withColumn("order_id", (F.lit(ORDER_ID_BASE) + F.pmod(F.col("idx"), F.lit(ORDERS_N))).cast("long"))
      .withColumn(
          "order_status",
          F.element_at(
              F.array(*[F.lit(s) for s in status_values]),
              (F.pmod(F.xxhash64(F.col("idx"), F.lit(SEED)), F.lit(len(status_values))) + F.lit(1)).cast("int")
          )
      )
      .withColumn(
          "status_timestamp",
          # timestamps near sample 1640392092; spread within ~60 days
          (F.lit(ORDER_TS_BASE) +
           F.pmod(F.xxhash64(F.col("order_id"), F.col("idx"), F.lit(SEED)), F.lit(60 * 24 * 60 * 60))).cast("long")
      )
      .select("order_id", "order_status", "status_timestamp")
)

assert_count(status_df, STATUS_N, "status_df")

# Validate join coverage: every status row should match an order_id
status_unmatched = (
    status_df.join(orders_df.select("order_id").distinct(), on="order_id", how="left_anti").count()
)
if status_unmatched != 0:
    raise Exception(f"Expected all status rows to match orders.order_id; unmatched={status_unmatched}")
print("✅ status_df: all rows joinable to orders on order_id")

write_single_json_file(status_df, STATUS_FILE)
print("✅ Written:", STATUS_FILE)

display(status_df.limit(5))


# -----------------------------
# Dataset 3: customers (1000 rows)
# -----------------------------
banner("Creating Dataset 3: customers/00.json (1000 rows)")

CUSTOMERS_N = 1000

cities = ["Singapore", "Jurong", "Tampines", "Woodlands", "Hougang", "Bedok", "Bukit Batok", "Ang Mo Kio"]

customers_df = (
    spark.range(1, CUSTOMERS_N + 1).withColumnRenamed("id", "idx")
      .withColumn("customer_id", (F.lit(CUSTOMER_ID_BASE) + F.col("idx") - F.lit(1)).cast("long"))
      .withColumn("name", F.concat(F.lit("Customer "), F.col("customer_id").cast("string")))
      .withColumn("email", F.concat(F.lit("customer"), F.col("customer_id").cast("string"), F.lit("@example.com")))
      .withColumn(
          "city",
          F.element_at(
              F.array(*[F.lit(c) for c in cities]),
              (F.pmod(F.xxhash64(F.col("customer_id"), F.lit(SEED)), F.lit(len(cities))) + F.lit(1)).cast("int")
          )
      )
      .withColumn("address", F.concat(F.lit("Blk "), F.pmod(F.col("customer_id"), F.lit(999)).cast("string"),
                                      F.lit(", Street "), F.pmod(F.col("customer_id"), F.lit(50)).cast("string")))
      .withColumn("operation", F.lit("NEW"))
      .select("address", "city", "customer_id", "email", "name", "operation")
)

assert_count(customers_df, CUSTOMERS_N, "customers_df")

# Validate operation all NEW
bad_ops = customers_df.filter(F.col("operation") != "NEW").count()
if bad_ops != 0:
    raise Exception(f"Expected all customers.operation to be NEW; bad={bad_ops}")

write_single_json_file(customers_df, CUSTOMERS_FILE)
print("✅ Written:", CUSTOMERS_FILE)

display(customers_df.limit(5))


# -----------------------------
# Dataset 4: customers_new (23 rows)
# -----------------------------
banner("Creating Dataset 4: customers_new/01.json (23 rows)")

# Requirements:
# - 23 records
# - same columns as dataset 3
# - operation distribution: 12 UPDATE, 1 DELETE, 10 NEW
# - UPDATE: record matches dataset 3 exactly EXCEPT email changed
# - DELETE: customer_id found in dataset 3, all other values null
# - NEW: new generated customers

UPDATES_N = 12
DELETES_N = 1
NEWS_N = 10

# Pick deterministic customer_ids for update/delete from existing customers
# Use ordering by hash for repeatability
existing_ids_df = (
    customers_df
      .select("customer_id")
      .withColumn("h", F.xxhash64(F.col("customer_id"), F.lit(SEED)))
      .orderBy("h")
)

update_ids = [r["customer_id"] for r in existing_ids_df.limit(UPDATES_N).collect()]
delete_id  = existing_ids_df.filter(~F.col("customer_id").isin(update_ids)).limit(1).collect()[0]["customer_id"]

updates_df = (
    customers_df
      .filter(F.col("customer_id").isin(update_ids))
      .withColumn("email", F.concat(F.lit("updated_"), F.col("customer_id").cast("string"), F.lit("@example.com")))
      .withColumn("operation", F.lit("UPDATE"))
)


delete_df = (
    spark.range(1).select(
        F.lit(None).cast("string").alias("address"),
        F.lit(None).cast("string").alias("city"),
        F.lit(int(delete_id)).cast("long").alias("customer_id"),
        F.lit(None).cast("string").alias("email"),
        F.lit(None).cast("string").alias("name"),
        F.lit("DELETE").alias("operation"),
    )
)


# New customer_ids outside dataset 3 range to avoid collisions
# customers in dataset 3 are 23000..23999
new_base = 50000
new_customers_df = (
    spark.range(1, NEWS_N + 1).withColumnRenamed("id", "idx")
      .withColumn("customer_id", (F.lit(new_base) + F.col("idx")).cast("long"))
      .withColumn("name", F.concat(F.lit("New Customer "), F.col("customer_id").cast("string")))
      .withColumn("email", F.concat(F.lit("newcustomer"), F.col("customer_id").cast("string"), F.lit("@example.com")))
      .withColumn(
          "city",
          F.element_at(
              F.array(*[F.lit(c) for c in cities]),
              (F.pmod(F.xxhash64(F.col("customer_id"), F.lit(SEED)), F.lit(len(cities))) + F.lit(1)).cast("int")
          )
      )
      .withColumn("address", F.concat(F.lit("Unit "), F.pmod(F.col("customer_id"), F.lit(200)).cast("string"),
                                      F.lit(", Avenue "), F.pmod(F.col("customer_id"), F.lit(30)).cast("string")))
      .withColumn("operation", F.lit("NEW"))
      .select("address", "city", "customer_id", "email", "name", "operation")
)

customers_new_df = updates_df.unionByName(delete_df).unionByName(new_customers_df)

# Validations
assert_count(customers_new_df, UPDATES_N + DELETES_N + NEWS_N, "customers_new_df")

ops_check = customers_new_df.groupBy("operation").count()
display(ops_check)

# Confirm 12 UPDATE, 1 DELETE, 10 NEW
op_counts = {r["operation"]: r["count"] for r in ops_check.collect()}
if op_counts.get("UPDATE", 0) != UPDATES_N or op_counts.get("DELETE", 0) != DELETES_N or op_counts.get("NEW", 0) != NEWS_N:
    raise Exception(f"Operation counts wrong: {op_counts}")

# Confirm UPDATE rows match dataset 3 except email
# Join on customer_id and compare columns
cmp = (
    customers_new_df.filter(F.col("operation") == "UPDATE").alias("n")
      .join(customers_df.alias("c"), on="customer_id", how="inner")
      .select(
          "customer_id",
          (F.col("n.address") == F.col("c.address")).alias("address_same"),
          (F.col("n.city") == F.col("c.city")).alias("city_same"),
          (F.col("n.name") == F.col("c.name")).alias("name_same"),
          (F.col("n.email") == F.col("c.email")).alias("email_same"),
      )
)
bad_update = cmp.filter(~(F.col("address_same") & F.col("city_same") & F.col("name_same")) | (F.col("email_same"))).count()
if bad_update != 0:
    raise Exception(f"UPDATE validation failed: {bad_update} rows did not match rules")
print("✅ UPDATE validation passed (matches dataset 3 except email changed)")

# Confirm DELETE row has nulls except customer_id + operation
del_bad = customers_new_df.filter(F.col("operation") == "DELETE") \
    .filter(~(F.col("address").isNull() & F.col("city").isNull() & F.col("email").isNull() & F.col("name").isNull())).count()
if del_bad != 0:
    raise Exception("DELETE validation failed: non-null fields found in DELETE row")
print("✅ DELETE validation passed (nulls except customer_id)")

write_single_json_file(customers_new_df, CUSTOMERS_NEW_FILE)
print("✅ Written:", CUSTOMERS_NEW_FILE)

display(customers_new_df.orderBy("operation", "customer_id"))

