In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import *
from datetime import date, timedelta
import random


In [0]:
base_schema = StructType([
    StructField("order_id", StringType(), False),
    StructField("order_date", DateType(), False),
    StructField("customer_id", StringType(), True),
    StructField("product_id", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("unit_price", DoubleType(), True),
    StructField("order_status", StringType(), True),
    StructField("ingestion_ts", TimestampType(), True)
]
)

In [0]:
start_date = date.today() - timedelta(days=8)

In [0]:
failure_plan = {
    3: "row_drop",
    4: "null_spike",
    5: "schema_drift",
    6: "cardinality_collapse",
    7: "multiple_failures"
}

In [0]:
def generate_orders(run_date, failure_type=None):
    base_rows = 1000

    if failure_type == "row_drop":
        rows = int(base_rows*0.15)
    else:
        rows = base_rows

    data = []

    for i in range(rows):
        order_id = f"O-{run_date.strftime('%Y%m%d')}-{i}"

        customer_id = f"C-{random.randint(1,500)}"
        product_id = f"P-{random.randint(1,100)}"
        quantity = random.randint(1,5)

        unit_price = round(random.uniform(10,500),2)
        order_status = random.choice(["CREATED", "SHIPPED", "DELIVERED"])

        if failure_type == "null_spike" and random.random() < 0.7:
            customer_id = None

        if failure_type == "cardinality_collapse":
            product_id = "P-1"
        
        if failure_type == "multiple_failures":
            if random.random() < 0.6:
                customer_id = None
            product_id = "P-1"
            unit_price = 0.0
            order_status = "UNKNOWN"

        data.append((
            order_id,
            run_date,
            customer_id,
            product_id,
            quantity,
            unit_price,
            order_status
        ))

    df = spark.createDataFrame(
        data,
        ["order_id", "order_date", "customer_id", "product_id",
        "quantity", "unit_price", "order_status"]
    )

    df = df.withColumn("ingestion_ts", F.current_timestamp())

    return df

In [0]:
final_df = None

for days_offset in range(8):
    run_date = start_date + timedelta(days=days_offset)
    failure = failure_plan.get(days_offset)

    df = generate_orders(run_date, failure)

    if failure == "schema_drift":
        df = df.withColumn("promo_code", F.lit("NEWYEAR50"))

    final_df = df if final_df is None else final_df.unionByName(df, allowMissingColumns=True)

In [0]:
(final_df
.write
.format("delta")
.mode("overwrite")
.partitionBy('order_date')
.saveAsTable("retail_orders")
)

In [0]:
%sql

SELECT 
  order_date,
  COUNT(*) as row_count,
  COUNT(customer_id) as non_null_customers,
  COUNT(DISTINCT product_id) as distinct_products
FROM
  workspace.default.retail_orders
GROUP BY order_date
ORDER BY order_date

-- SELECT COUNT(*) FROM retail_orders

-- DROP table retail_orders