# Synthetic data generation

This notebook fabricates a miniature dataset that emulates the signals Monumental Labs could capture from its 7-axis KUKA carving cells. The goal is to create three related tables: job-level summaries, CAM toolpath parameters, and high-frequency telemetry sampled during each cut.

In [None]:
from __future__ import annotations

from dataclasses import dataclass
from datetime import datetime, timedelta
from pathlib import Path
from typing import List

import numpy as np
import pandas as pd

np.random.seed(42)
rng = np.random.default_rng(42)
OUTPUT_DIR = Path("../data")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

In [None]:
materials = pd.DataFrame(
    [
        {"material": "limestone", "base_finish": 150, "price_per_cm3": 0.16, "complexity_factor": 120},
        {"material": "marble", "base_finish": 210, "price_per_cm3": 0.25, "complexity_factor": 150},
        {"material": "granite", "base_finish": 260, "price_per_cm3": 0.32, "complexity_factor": 180},
    ]
)

def synthesize_jobs(n_jobs: int = 16) -> pd.DataFrame:
    start_date = datetime(2025, 10, 1, 8)
    choices = materials.sample(n=n_jobs, replace=True, random_state=7).reset_index(drop=True)
    complexity = np.round(rng.uniform(0.25, 0.85, size=n_jobs), 2)
    base_volume = rng.normal(30000, 6000, size=n_jobs)
    volume_removed = np.clip(base_volume * (0.6 + complexity), 12000, 52000)

    finish_minutes = np.round(
        choices["base_finish"].values
        + complexity * choices["complexity_factor"].values
        + rng.normal(0, 18, size=n_jobs),
        1,
    )
    quoted = np.round(volume_removed * choices["price_per_cm3"].values * (0.8 + complexity), 2)
    scheduled = [start_date + timedelta(hours=i * 6 + rng.uniform(-1, 1)) for i in range(n_jobs)]

    jobs = pd.DataFrame(
        {
            "job_id": [f"J{i+1:03d}" for i in range(n_jobs)],
            "material": choices["material"],
            "geometry_complexity": complexity,
            "volume_removed_cm3": volume_removed.astype(int),
            "finish_minutes": finish_minutes,
            "quoted_price_usd": quoted,
            "scheduled_start": [dt.isoformat() for dt in scheduled],
        }
    )
    return jobs


def synthesize_toolpaths(jobs: pd.DataFrame) -> pd.DataFrame:
    records: List[dict] = []
    tool_ids = np.array(["Tool_A", "Tool_B", "Tool_C", "Tool_Rough", "Tool_Finish"])

    for idx, job in jobs.iterrows():
        n_paths = int(rng.integers(2, 4))
        shares = rng.uniform(0.18, 0.45, size=n_paths)
        shares /= shares.sum()
        for seq, share in enumerate(shares):
            complexity = job["geometry_complexity"]
            records.append(
                {
                    "toolpath_id": f"T{idx * 4 + seq:03d}",
                    "job_id": job["job_id"],
                    "feed_mm_min": max(int(rng.normal(1100 - 180 * complexity, 110)), 400),
                    "rpm": max(int(rng.normal(5200 + 1200 * (1 - complexity), 380)), 3200),
                    "spindle_current_a": round(rng.normal(15 + 6 * complexity, 1.4), 2),
                    "contact_time_s": round(job["finish_minutes"] * 60 * share * rng.uniform(0.95, 1.08), 1),
                    "tool_id": rng.choice(tool_ids),
                }
            )

    return pd.DataFrame.from_records(records)


def synthesize_telemetry(toolpaths: pd.DataFrame, points_per_path: int = 12) -> pd.DataFrame:
    base_time = datetime(2025, 10, 30, 12)
    telemetry_rows = []
    for _, tp in toolpaths.iterrows():
        start_offset = rng.uniform(-6, 6)
        timestamp = base_time + timedelta(minutes=float(start_offset))
        step = tp["contact_time_s"] / points_per_path
        for _ in range(points_per_path):
            timestamp += timedelta(seconds=float(step))
            telemetry_rows.append(
                {
                    "timestamp": timestamp.isoformat(),
                    "toolpath_id": tp["toolpath_id"],
                    "spindle_current_a": round(tp["spindle_current_a"] * rng.uniform(0.92, 1.08), 2),
                    "vibration_g": round(max(rng.normal(0.08 + 0.04 * rng.random(), 0.02), 0.02), 3),
                    "coolant_flow_lpm": round(max(rng.normal(6.5, 0.5), 5.2), 2),
                }
            )

    return pd.DataFrame(telemetry_rows)


jobs_df = synthesize_jobs()
toolpaths_df = synthesize_toolpaths(jobs_df)
telemetry_df = synthesize_telemetry(toolpaths_df)

jobs_df.head()

In [None]:
toolpaths_df.head()

In [None]:
telemetry_df.head()

In [None]:
jobs_df.to_csv(OUTPUT_DIR / "jobs.csv", index=False)
toolpaths_df.to_csv(OUTPUT_DIR / "toolpaths.csv", index=False)
telemetry_df.to_csv(OUTPUT_DIR / "telemetry.csv", index=False)

len(jobs_df), len(toolpaths_df), len(telemetry_df)

We now have reproducible CSVs ready for DuckDB modeling and dashboarding. If you prefer a scriptable form, run `python scripts/generate_data.py` to emit the same synthetic data.