In [0]:
from pyspark.sql.functions import rand, expr
from pyspark.sql.types import *
from datetime import datetime
import random

In [0]:
catalog = 'zg'
schema = 'mfg_lakebase_demo'

In [0]:
spark.sql(
    f"CREATE SCHEMA IF NOT EXISTS {catalog}.{schema}"
)

In [0]:
# Generate dummy machine_metrics
machines = [f"machine_{i}" for i in range(1, 6)]
dates = [f"2025-08-{d:02d}" for d in range(1, 4)]

machine_rows = []
for m in machines:
    for d in dates:
        machine_rows.append((
            m,
            d,
            round(random.uniform(20.0, 45.0), 2),   # avg_runtime
            random.randint(0, 5),                   # errors_last_24h
            round(random.uniform(0.6, 1.0), 2)       # capacity_utilization
        ))

machine_schema = StructType([
    StructField("machine_id", StringType(), True),
    StructField("date", StringType(), True),
    StructField("avg_runtime", DoubleType(), True),
    StructField("errors_last_24h", IntegerType(), True),
    StructField("capacity_utilization", DoubleType(), True)
])

machine_df = spark.createDataFrame(machine_rows, schema=machine_schema)
machine_df.write.mode("overwrite").saveAsTable(f"{catalog}.{schema}.machine_metrics")

In [0]:
# Generate dummy part_backlog with rich metadata
parts = [f"part_{i}" for i in range(1, 51)]

# Manufacturing metadata that shop floor workers would need
materials = ["Steel", "Aluminum", "Titanium", "Carbon Fiber", "Plastic", "Ceramic", "Copper", "Brass"]
part_types = ["Bracket", "Housing", "Shaft", "Gear", "Bearing", "Valve", "Connector", "Mount", "Cover", "Assembly"]
quality_levels = ["A", "B", "C", "Military", "Aerospace", "Medical"]
surface_finishes = ["Anodized", "Powder Coated", "Chrome Plated", "Raw", "Painted", "Polished"]
tolerances = ["±0.001", "±0.005", "±0.010", "±0.025", "±0.050"]

backlog = []
for i, p in enumerate(parts):
    priority = random.choice(["high", "medium", "low"])
    quantity = random.randint(10, 100)
    
    # Create realistic due date distribution
    if i < 10:  # 20% overdue
        due_date = f"2025-01-{random.randint(1, 15):02d}"
    elif i < 25:  # 30% due soon (next 7 days)
        due_date = f"2025-01-{random.randint(16, 22):02d}"
    elif i < 40:  # 30% future dates (next 2-4 weeks)
        due_date = f"2025-01-{random.randint(23, 31):02d}"
    else:  # 20% long-term (2026)
        due_date = f"2026-{random.randint(1, 12):02d}-{random.randint(1, 28):02d}"
    
    # Add rich manufacturing metadata
    material = random.choice(materials)
    part_type = random.choice(part_types)
    quality_level = random.choice(quality_levels)
    surface_finish = random.choice(surface_finishes)
    tolerance = random.choice(tolerances)
    weight = round(random.uniform(0.1, 50.0), 2)  # kg
    dimensions = f"{random.randint(10, 500)}x{random.randint(10, 500)}x{random.randint(5, 200)}mm"
    drawing_number = f"DW-{random.randint(1000, 9999)}-{random.randint(1, 99)}"
    revision = f"Rev {random.choice(['A', 'B', 'C', 'D'])}"
    estimated_hours = round(random.uniform(0.5, 8.0), 1)
    
    backlog.append((
        p, priority, quantity, due_date, material, part_type, 
        quality_level, surface_finish, tolerance, weight, dimensions, 
        drawing_number, revision, estimated_hours
    ))

part_schema = StructType([
    StructField("part_id", StringType(), True),
    StructField("priority", StringType(), True),
    StructField("quantity_pending", IntegerType(), True),
    StructField("due_date", StringType(), True),
    StructField("material", StringType(), True),
    StructField("part_type", StringType(), True),
    StructField("quality_level", StringType(), True),
    StructField("surface_finish", StringType(), True),
    StructField("tolerance", StringType(), True),
    StructField("weight_kg", DoubleType(), True),
    StructField("dimensions", StringType(), True),
    StructField("drawing_number", StringType(), True),
    StructField("revision", StringType(), True),
    StructField("estimated_hours", DoubleType(), True)
])

part_df = spark.createDataFrame(backlog, schema=part_schema)
part_df.write.mode("overwrite").saveAsTable(f"{catalog}.{schema}.part_backlog")

In [0]:
recommendations_df = part_df \
  .withColumn("recommended_machine_id", expr("concat('machine_', CAST(rand()*5 + 1 AS INT))")) \
  .withColumn("route_confidence", (rand() * 0.3 + 0.7).cast("double"))

In [0]:
display(recommendations_df)

In [0]:
recommendations_df.write.mode("overwrite").saveAsTable(f"{catalog}.{schema}.recommended_routes")

In [0]:
# enable change data feed on the source delta table to support incremental syncing into lakebase
spark.sql(f"alter table {catalog}.{schema}.recommended_routes SET TBLPROPERTIES (delta.enableChangeDataFeed = true);")