In [0]:
from pyspark.sql.types import *
from datetime import datetime, timedelta
import uuid, random

# UC Volume landing path
LANDING = "/Volumes/workspace/loadstar_bronze/loadstar_files/landing/maintenance/events_inbox"

event_types = ["FAILURE", "REPAIR", "DOWNTIME_START", "DOWNTIME_END"]
failure_types = ["ENGINE", "BRAKES", "TIRES", "TRANSMISSION"]
vendors = ["V001", "V002", "V003"]
weather = ["CLEAR", "RAIN", "SNOW"]

rows = []
now = datetime.now()

for i in range(100):
    event_id = str(uuid.uuid4())
    ts = now - timedelta(minutes=random.randint(0, 720))
    rows.append((
        event_id,
        f"TRUCK_{random.randint(1,5)}",
        random.choice(event_types),
        ts,
        f"SITE_{random.randint(1,3)}",
        random.choice(failure_types),
        random.choice(vendors),
        random.choice(weather),
        "maintenance event",
        now
    ))

# Add a duplicate event_id to simulate replay
rows.append(rows[0])

schema = StructType([
    StructField("event_id", StringType(), False),
    StructField("truck_id", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("event_timestamp", TimestampType(), True),
    StructField("site_id", StringType(), True),
    StructField("failure_type", StringType(), True),
    StructField("vendor_id", StringType(), True),
    StructField("weather_condition", StringType(), True),
    StructField("notes", StringType(), True),
    StructField("ingestion_timestamp", TimestampType(), True),
])

df = spark.createDataFrame(rows, schema)

# Write JSON inbox files
df.coalesce(2).write.mode("append").json(LANDING)

print("âœ… Wrote landing files to:", LANDING)
display(dbutils.fs.ls(LANDING))