Skip to content

feat: workflow trace via OTel spans + analyzer#3

Merged
timzsu merged 37 commits into
mainfrom
zsu/lineage-redis-jsonl
May 2, 2026
Merged

feat: workflow trace via OTel spans + analyzer#3
timzsu merged 37 commits into
mainfrom
zsu/lineage-redis-jsonl

Conversation

@timzsu
Copy link
Copy Markdown
Collaborator

@timzsu timzsu commented Apr 29, 2026

Purpose

The trace was implemented in a standalone service, which FlowMesh connects to. To avoid the complexity, this PR integrates the feature into the execution workflow. Specifically, we record the events via OpenTelemetry (OTel) format as JSONL files under each task's logs/ directory. The use of the OTel format keeps the door open to a real OTel exporter (Jaeger / Honeycomb / Phoenix) without changing the on-disk format.

Changes

  • Worker — executors emit OTel-shaped spans plus asset / lineage rows as JSONL under out_dir/logs/. The data.py mixin handles data prep and inherits the new governance.py mixin which owns span emission (_task_span / _span / _log_event) and asset / lineage row writes. Each task's run() is wrapped in a root span; heavy phases (model load, generation, postprocessing, dump to storage) become typed child spans tagged with SpanType (compute / network / marker). trace_id is pinned from the workflow id so a workflow's spans share one trace. Owner identity flows from WorkerTaskMessage.owner_id through _task_span onto each asset row, dropping the redundant spec.governance field. A separate maybe_upload_traces ships out_dir/logs/ to the server without bundling them into flowmesh result fetch.
  • Serverrouters/v1/traces.py owns the full trace surface: POST /traces/tasks/{task_id}/{spans|assets|lineage} (per-task upload from the worker, fixed filename per type — no client-supplied paths), GET /traces/workflows/analyze/{wf_id} (returns ProfileSummary), and GET /traces/workflows/{wf_id}/{spans|assets|lineage} (streams the JSONL artifact across the workflow's tasks). The artifact endpoint and _resolve_artifact_path are no longer overloaded with a logs/ prefix shortcut. The pydantic Span models + analyzer live under src/server/governance/. No changes to dispatcher / task runtime / monitoring.
  • Sharedshared/schemas/governance.py holds the wire-contract SpanType enum and span-name constants used by both worker producer and server analyzer. shared/utils/json.py absorbs the JSONL helpers. Dead data_key / workflow_data_pattern / AssetRow / LineageRow are dropped.
  • SDK / CLI — SDK owns its own models/traces.py (ProfileSummary and friends); no shared imports. client.traces.fetch(wf_id, trace_type) and client.traces.analyze(wf_id). flowmesh trace fetch <type> <wf_id> and flowmesh trace analyze <wf_id> [-f rich|critical-path(cp)|end-to-end(e2e)|queuing|lineage|json].

Design

Per-task lineage as OTel spans. Each row is self-describing — explicit start_time / end_time, flowmesh.type (compute / network / marker) on the producer side, parent / span ids for trace structure. The wire format is whatever ReadableSpan.to_json() emits today; we don't transform it. Adding a real OTLP exporter later is additive: drop OTLPSpanExporter on the TracerProvider alongside our JSONL exporter and the two run independently — nothing about the on-disk format has to change. trace_id is pinned from the workflow id (wfl- prefix stripped, padded to 32 hex) via a custom IdGenerator, so all spans for a workflow share one trace.

Analyzer. e2e_breakdown (hardware_summary + network_summary, both EventSummary) and critical_path summarize the spans. per_data_id.queuing_delay = task.start - max(parent.dump_to_storage.end) — useful for spotting imbalance off the critical path. Only the consumer knows when it actually started running, so queuing delay is computed at analysis time rather than emitted by the producer. CP-restricted breakdown follows merged-execution batch_id so shared work (model load, generation) attributed to a merge parent's data_id is still counted when a merged-child branch lands on the path.

Test Plan

uv run flowmesh stack build  # FLOWMESH_VERSION=lineage-test
uv run flowmesh stack up
uv run flowmesh stack worker up gpu -t 0
uv run flowmesh workflow submit templates/dag_inference_example.yaml
uv run flowmesh trace fetch spans <wf_id>                # raw rows
uv run flowmesh trace analyze <wf_id>                    # full rich view
uv run flowmesh trace analyze <wf_id> -f cp              # Critical path
uv run flowmesh trace analyze <wf_id> -f e2e             # End-to-end
uv run flowmesh trace analyze <wf_id> -f queuing         # Per-data_id queuing delay
uv run flowmesh trace analyze <wf_id> -f lineage         # Lineage DAG

Test Result

Sample rendered output — flowmesh trace analyze <wf_id> --format critical-path 9e628ba6-d622-414b-9411-6fd01e00fda5
Sample rendered output — flowmesh trace analyze <wf_id> --format queuing 61597751-3cc3-4bef-a40e-f5ecb913e844
Sample rendered output — flowmesh trace analyze <wf_id> --format lineage 655d2e0b-e3df-4a58-9edb-8dad427c27b2

Pre-submission Checklist
  • I have read the contribution guidelines.
  • I have run pre-commit run --all-files and fixed any issues.
  • I have added or updated tests covering my changes.
  • I have verified that uv run pytest tests/ passes locally.
  • If I changed shared schemas or proto definitions, I have checked downstream compatibility across Server and Worker.
  • If I changed the SDK or CLI, I have verified the affected packages work (uv sync --all-extras --frozen).
  • If this is a breaking change, I have prefixed the PR title with [BREAKING] and described migration steps above.
  • I have updated documentation or config examples if user-facing behavior changed.

🤖 Generated with Claude Code

@timzsu timzsu marked this pull request as draft April 30, 2026 04:56
@timzsu timzsu force-pushed the zsu/lineage-redis-jsonl branch 2 times, most recently from 3c4058a to 054e6a0 Compare April 30, 2026 13:18
@timzsu timzsu marked this pull request as ready for review April 30, 2026 13:35
@timzsu timzsu changed the title feat: workflow lineage via per-task JSONL + Redis transport feat: workflow trace spans + queuing-delay analyzer Apr 30, 2026
@timzsu timzsu force-pushed the zsu/lineage-redis-jsonl branch from 8022bd7 to 9b1297c Compare April 30, 2026 14:15
@timzsu timzsu changed the title feat: workflow trace spans + queuing-delay analyzer feat: workflow trace via OTel spans + analyzer May 1, 2026
@timzsu timzsu requested a review from kaiitunnz May 1, 2026 08:22
timzsu added 21 commits May 1, 2026 11:44
The standalone governance-relay service goes away. Worker tasks now emit
events / assets / lineage rows as JSONL files under
out_dir/artifacts/logs/ that upload alongside other task artifacts and
land at results/{task_id}/logs/ on the server. Cross-task data payloads
move to Redis (flowmesh:data:{data_id}) with a per-worker on-disk cache
that survives across tasks.

GovernanceMixin is folded into DataMixin; spec.governance keeps user_id
context only. New /api/v1/workflows/{id}/logs/{events|assets|lineage}
and /profile endpoints concat per-task JSONLs and run a pure analyzer
(src/shared/profile/) that lumilake can also import. SDK + CLI gain
client.logs / client.profile and `flowmesh logs|profile fetch` commands.

Tests: 535 pass (no --ignore). E2E scaffold runs against
templates/dag_inference_example.yaml on a GPU worker, gated by
FLOWMESH_E2E=1.

Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
Caught and fixed during the e2e dry-run against dag_inference_example:

- Lineage transport now always-on. _dump_to_governance no longer gated
  behind spec.governance; user_id stays optional context.
- Dispatcher injects upstream metadata only ({task_id}); the worker fetches
  upstream payloads from Redis and writes them back into spec.upstreamResults
  before the graph_template builder runs.
- Supervisor passes REDIS_CONTROL_URL + WORKER_DATA_TTL_SEC into the worker
  container env so DataMixin can connect.
- Worker DataMixin honors REDIS_ACL_ENABLED + REDIS_TLS_CA_FILE for parity
  with the server's Redis client.
- Server logs router reads from artifacts/logs/* (where uploads land via
  the artifact-path resolver), not the literal logs/ dir.
- Profile analyzer aggregates per-phase wall time across data_ids: each
  consecutive event-pair contributes a phase named after the closing
  event. ProfileSummary gains workflow_wall_sec and phase_timings (count,
  total, avg, min, max, p50, p95). New `--format phases` CLI option;
  `--format table` appends the phase breakdown.

Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
… format

Replaced the ad-hoc PhaseTiming output with the same shape lumilake's
TraceAnalyzer produces:

- e2e_breakdown: hardware_summary (sum of per-event-type elapsed across
  data_ids, with batch-aware totals) + network_summary (merged-interval
  active time across overlapping reads/writes); workflow_duration_seconds
  + total_network_seconds.
- critical_path: walks the lineage DAG from latest-finishing sink back to
  the root via dep_map, picks latest-finishing parent at each step.
  Reports per-node active vs wait, plus the hardware/network breakdown
  restricted to the critical path.

`dump to storage` is now emitted by the worker's _write_data right after
the Redis SET succeeds — it's the READY marker the analyzer keys on.
`read response transfer` and `write request transfer` are the network
event types that contribute to merged-interval active time.

CLI: `flowmesh profile fetch --format {e2e,critical-path,mermaid,json}`.
Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
Server returns raw JSON; rendering moves out of `shared.profile.render` so
callers pick a representation that fits their context.

CLI (`flowmesh profile fetch`) defaults to a Rich-rendered view: header
panel with workflow_id + wall/network totals, critical-path tree with
per-node active/wait, hardware/network tables (color-coded by relative
total), and a lineage DAG rendered as a Rich tree with critical-path
nodes marked. `--format json` pretty-prints raw JSON via rich.JSON.

SDK exposes `client.profile.fetch_summary()` for a parsed `ProfileSummary`
and `flowmesh.profile_views.{hardware,network,critical_path}_dataframe`
that produce sorted pandas DataFrames (lazy import). `to_mermaid` stays in
shared as a text-format helper any consumer can call.

`shared.profile` keeps the analyzer and a single text renderer
(`to_mermaid`); the Rich-based renderers live next to the CLI command and
add `rich` to the CLI dep set.

Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
…pute"

`flowmesh profile fetch -f <view>` now accepts:
- `rich` (default): full report — header, critical-path block, e2e block,
  lineage DAG.
- `critical-path` / `cp`: header + critical-path tree + per-CP compute and
  network tables.
- `e2e` / `compute` / `summary`: header + end-to-end compute and network
  tables only.
- `dag` / `lineage` / `graph`: header + lineage DAG tree.
- `json`: raw payload, syntax-highlighted via rich.JSON.

Section title is "Compute time" instead of "Hardware time" — the schema
field stays `hardware_summary` for lumilake compatibility, but "Compute"
reads better as user-facing UI.

Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
Redis is no longer load-bearing for cross-task data flow. The producing
task's `results.json` upload to the server is the durable copy; Redis is
a hot-path cache with a short TTL.

`_fetch_data` resolves in order: on-disk worker cache → Redis →
`GET /api/v1/results/{task_id}` from the server. A missing Redis key is
recoverable; only a cache miss + Redis miss + server miss is fatal.
`read response transfer` events now record `source=redis` or
`source=server` so the analyzer can attribute network time correctly.

`_write_data` publishes to Redis best-effort. Connection failures or
RedisError are logged and treated as a cache miss — the write proceeds
because the server upload (via `maybe_upload_artifacts` and the runner's
result POST) is the source of truth. `write request transfer` events
record `redis_cache=hit|miss` for the same reason.

`WORKER_DATA_TTL_SEC` default drops to 600s (10 min) — the cache only
needs to outlive the immediate dispatch handoff, not the full workflow.

Tests:
- Server-fallback path: Redis miss → fetch from `_fetch_from_server` →
  events log `source=server`.
- Both-miss path: still raises `ExecutionError`.
- Tolerant write path: Redis raising `RedisError` does not break the
  write — assets / lineage rows still emit and the call returns.

Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
Per AGENTS.md "top-level imports only; inline imports only to break
circular imports":

- `flowmesh.resources.profile` lifts the `from shared.profile import
  ProfileSummary` to module top.
- `flowmesh.profile_views` switches the per-call `import pandas as pd`
  to a top-level `try/except ImportError` sentinel; helpers raise
  `ImportError` with a hint to install the new `flowmesh-sdk[dataframes]`
  extra (defined in `sdk/pyproject.toml`).

Per AGENTS.md "no `del arg` to silence unused-param lint" (extended to
`# noqa: ARG002`):

- `_fetch_data` drops the unused `governance_spec` parameter (also
  removed from `_fetch_upstream_results_from_storage` callers).
- `_write_data` drops the unused `events` parameter; `governance_spec`
  stays — it carries `user_id` for the asset row.

Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
… delay

Worker emits one span per phase via the OpenTelemetry SDK (compute /
network / marker), serialized by ReadableSpan.to_json() to spans.jsonl.
trace_id is pinned from workflow_id via a custom IdGenerator. The
analyzer drops its NETWORK_EVENT_TYPES whitelist (kind now lives on the
span) and surfaces queuing delay for every data_id, not just the
critical path.

Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
…upload

Two bugs surfaced by e2e validation:

- Sub-spans created inside `io_executor.submit(...)` ran under a fresh
  Context, so OTel's parent ContextVar and our `_workflow_id_var` were
  invisible — sub-spans orphaned with random trace ids. New
  `_submit_in_context` helper captures the caller's Context and dispatches
  via `Context.run`. All three submit sites switched over.
- `maybe_upload_artifacts` was called inside `_task_span`, so the root
  span was written to disk *after* the upload ran. Uploaded `spans.jsonl`
  was missing its root. Moved the upload after the with-block in vllm,
  transformers, diffusers, data_retrieval.

Also: switch `FlowMeshSpanKind` to `StrEnum` (UP042).

E2E (GPU 3, dag_inference_example): 48 spans, 1 trace_id matching the
workflow, 3 task root spans, all sub-spans correctly parented; wall
55.5s (was 0.0s pre-fix), synthesis 26ms queuing delay blocked on the
slower branch.

Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
A running stack always uses the image baked when the container was
created; neither `stack up` nor `worker up` rebuilds. Skipping the
rebuild step lets stale code masquerade as the branch under test —
caught this the hard way in the spans e2e run.

Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
…rals

Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
The supervisor already owns the node-local Redis for control plane and
telemetry. Adding a second Redis client to the worker duplicated
credentials and gave each worker process its own driver / TLS config to
manage. Workers now ask the supervisor over gRPC instead.

- Proto: add `FetchData` and `PublishData` unary RPCs to
  `supervisor.v1.Supervisor`. Recompile stubs.
- Supervisor: handlers in `grpc_server.py` go through the existing
  `SyncRedisClient` at `flowmesh:data:{data_id}`; cache failures return
  `ok=False` / `found=False` (the producer's HTTP upload is still the
  source of truth).
- Worker: new `supervisor_data_client.py` — thin gRPC stub the
  `DataMixin` uses. `_fetch_from_cache` / `_publish_to_cache` replace
  the old direct-Redis paths. `redis>=7.0.1` is dropped from
  `worker-core`. Worker container no longer imports `redis`.
- Worker config: `worker_cache_dir` defaults to `RESULTS_DIR / .cache`
  instead of `/tmp/flowmesh_worker_cache`. `redis_control_url` removed
  from `WorkerConfig` (worker no longer needs it). Supervisor adapter
  drops the `REDIS_CONTROL_URL` env passthrough to worker containers.
- Tests: `_FakeSupervisorData` stand-in replaces `fakeredis`. Span
  attribute renames: `read.source: "redis" -> "supervisor"`,
  `write.redis_cache -> write.cache`.

E2E (GPU 3): single trace_id matching the workflow, 3 task root spans,
all 5 publishes succeeded via supervisor (`cache=hit`), Redis on the
supervisor side carries the `flowmesh:data:tsk-...` keys with the
worker-supplied TTL.

Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
The previous design split each phase boundary into a span (for the work)
and an instant marker (for the checkpoint). For checkpoints whose info
is already encoded by the surrounding span, the marker is noise.

- `cache hit` becomes attributes on the read span (`source`, `cache_hit`).
  Caller no longer emits a separate marker.
- `dump to storage` becomes a network-kind span wrapping the entire
  `_write_data` body — supervisor publish, disk cache, asset/lineage
  rows. Its `end_time` is still the data-ready timestamp the analyzer
  picks up via `READY_SPAN_NAME`. The inner `write` span is gone.
- `output postprocessing` and `JSONL export` become spans wrapping the
  per-task remap loop and the `_maybe_export_jsonl` call respectively,
  so their durations show up in the compute breakdown.
- `prompt synchronization` is dropped — the parallel `_collect` futures
  already record their own spans.
- `_instant` renamed to `_log_event` (clearer name for the rare
  moment-in-time-only case; only `queuing for execution` for merged
  children still uses it). `_instant_for_each` shim removed.

E2E (GPU 0): 21 spans, single trace_id, 3 `dump to storage` network
spans with cache_hit/payload_bytes attributes (~23ms total network),
`output postprocessing` and `JSONL export` show up as compute spans,
zero stray `cache hit` or `prompt synchronization` markers. Profile
returns wall 57.7s; synthesis queuing delay 531ms blocked on slower
branch.

Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
The supervisor's gRPC `FetchData` / `PublishData` cache and the
matching worker stub are an optional optimization on top of the
governance / lineage pipeline. Move them out of this PR; the work is
parked at zsu/redis-result-cache for a follow-up PR.

What stays here is the data path the lineage feature actually needs:
worker reads upstream payloads from a per-worker on-disk cache and
falls back to GET /api/v1/results/{task_id} on the server. Producers
write the on-disk cache; durability stays with the server's HTTP
results upload.

- Drop `proto/supervisor/v1` `FetchData` / `PublishData` RPCs and
  regenerate stubs.
- Drop supervisor-side handlers in `services/grpc_server.py` and the
  `WORKER_DATA_TTL_SEC` env passthrough in `adapters/base.py`.
- Drop `set_value(ttl_sec=...)` overload on `SyncRedisClient` and the
  `cli/stack` env-schema entries for `WORKER_CACHE_DIR` /
  `WORKER_DATA_TTL_SEC`. (Worker still honors `WORKER_CACHE_DIR` env
  for the on-disk cache, defaulting to `tempfile.gettempdir()`.)
- Drop `WorkerConfig.worker_cache_dir` / `data_ttl_sec` fields.
- Delete `src/worker/supervisor_data_client.py`.
- Drop `_get_supervisor_data` / `_fetch_from_cache` / `_publish_to_cache`
  methods on `DataMixin`. `_fetch_data` becomes on-disk cache → HTTP
  fallback; `_write_data` writes the on-disk cache + asset/lineage
  rows.
- Tests: replace `_FakeSupervisorData` with the on-disk-cache flow.

E2E (GPU 0): 21 spans, single trace_id, 3 `dump to storage` network
spans, both `read` spans hit the on-disk cache. Profile shows wall
56.3s; synthesis 528 ms queuing delay blocked on slower branch.

Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
- `flowmesh profile fetch -f` now binds to a `_ProfileView(StrEnum)`;
  Typer rejects anything outside `{rich, critical-path, e2e, queuing,
  dag, json}` case-sensitively. The previous alias map (`cp`, `compute`,
  `summary`, `wait`, `lineage`, `graph`, `default`, …) is gone — one
  canonical name per view.
- Remove three redundant `list(task_ids)` copies in `vllm_executor.py`
  where the local already has type `list[str]`.
- Hoist `_raise_for_stream_status_async` to the top of
  `sdk/.../resources/logs.py` and drop the inline `from fastapi import
  HTTPException` calls in `tests/server/test_logs_router.py`.

Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
- `_resolve_task_ids` returns `workflow.task_ids` directly (already a
  Pydantic-validated `list[str]`).
- The `child_results` post-process loop reassigns existing keys only,
  so the `list(...)` snapshot of `dict.items()` is unnecessary.

Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
`list(x)` / `dict(x)` is ambiguous — could be a typecast or a defensive
copy. The two remaining defensive copies in PR-added code now use
`.copy()` so the intent is unambiguous: typecasts stay as `list(...)` /
`dict(...)`, defensive copies as `.copy()`.

- `Span.parse_otel_json`: lift the OTel attributes lookup into a local
  and call `.copy()` on it.
- `_per_data_id_timings`: `parent_data_ids=parents.copy()`.

Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
timzsu and others added 6 commits May 1, 2026 11:44
Only `stack {up,down,pull,pullall}` accept `--image-tag`; `stack build` and `stack worker up` read `FLOWMESH_VERSION` from the env file. The earlier rule incorrectly said `--image-tag` was valid on every command.

Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
CLI / SDK / server now expose a single `trace` namespace with `fetch`
(raw rows) and `analyze` (run the analyzer, parsed `ProfileSummary`).
The split between `flowmesh logs ...` and `flowmesh profile ...` is
gone; all surfaces match.

- CLI: `flowmesh trace fetch <kind> <wf_id>` and `flowmesh trace analyze
  <wf_id>` replace the `logs` and `profile` command groups. `kind` is a
  strict `TraceKind(StrEnum)` — Typer rejects anything outside `spans`,
  `assets`, `lineage`. `--format` on `analyze` rejects unknown views
  (no aliases). The one-line `_to_summary` helper is gone — `client
  .trace.analyze` returns a parsed `ProfileSummary` directly.
- SDK: `client.trace.fetch(workflow_id, kind)` and
  `client.trace.analyze(workflow_id) -> ProfileSummary`. Old
  `client.logs.fetch_*` and `client.profile.fetch_*` removed. The CLI
  call site no longer needs a `# type: ignore[arg-type]` cast.
- Server: `routers/v1/logs.py` → `trace.py` exposing
  `/workflows/{id}/trace/{kind}` and `/workflows/{id}/trace/analyze`.
- Tests: renamed `tests/server/test_logs_router.py` →
  `test_trace_router.py`; e2e doc-comments updated.

Move `src/shared/profile/` into `src/shared/governance/` — the trace
analyzer schemas belong with the rest of the governance surface, not
in a sibling module. `from shared.profile import …` becomes
`from shared.governance import …`.

`shared/governance/spans.py::_parse_iso` was duplicated against
`server/utils/time.py::parse_iso_ts`. Moved the parse-with-Z-suffix
core to `shared/utils/time.py::parse_iso_datetime`; both call sites
now use it. The OTel JSON ingest stays as manual parsing (the SDK's
`ReadableSpan.to_json()` is internal, not OTLP/JSON proto, so no
upstream Pydantic schema applies).

Drop the data-cache layer the cache PR owns:

- Revert `dispatcher/base.py::_collect_upstream_results` to main's
  inline behavior — `results[name] = data` directly. The
  metadata-only `{"task_id": ...}` injection was a cache-feature
  concern.
- Strip `_fetch_data`, `_fetch_from_server`, `_read_cache`,
  `_write_cache`, `_cache_path_for`, `_data_cache_dir`,
  `_cache_dir_lock`, `_upstream_deps_cache`, `_DEFAULT_WORKER_CACHE_DIR`,
  and `WORKER_CACHE_DIR` env handling from `DataMixin`. With the
  dispatcher inlining, the worker reads `spec.upstreamResults`
  directly — no fetch, no on-disk cache, no `upstream fetch` span.
- `_fetch_upstream_results_from_storage` is gone; the `graph_template`
  / `dataframe` paths in `_collect_prompts_for_spec` use
  `_spec_upstream_results(spec)` directly.
- Drop the matching cache tests in `test_data_mixin_lineage.py`.

The cache work continues at branch `zsu/redis-result-cache`.

Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
Owner identity now flows from WorkerTaskMessage.owner_id through _task_span,
removing the redundant spec.governance dict and its plumbing. Deletes the
unused data_key/workflow_data_pattern helpers extracted with the cache PR.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
…data

Compute and network breakdowns now share a single EventSummary type with
strict (non-None) total_seconds. _write_data is renamed to _record_output
since the function only emits asset+lineage rows; serialization is solely
for the dump-to-storage span's payload_bytes attribute.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
… models

Mirrors the OTel JSON layout 1:1 instead of flattening into top-level fields.
Hex-prefix stripping moves to a reusable Annotated[str, BeforeValidator]
helper, removing all .get() lookups from parsing. SpanAttributes pins
data_id, batch_id, and flowmesh_kind as str|None / FlowMeshSpanKind|None;
arbitrary extras still pass through.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
@timzsu timzsu force-pushed the zsu/lineage-redis-jsonl branch from 27cec65 to 87b8ee2 Compare May 1, 2026 11:47
DataMixin grew too large by absorbing span emission + asset/lineage row
writes alongside the existing data prep helpers. Move OTel span emission
(_task_span / _span / _log_event), per-task state (_task_id /
_current_batch_id / _task_out_dir / _task_user_id / _event_lock /
io_executor), thread-context propagation (_submit_in_context), and
asset/lineage row writers (_record_asset / _record_lineage /
_record_output / _dump_to_governance / _extract_source_data_ids) into a
restored GovernanceMixin. DataMixin keeps only data prep — image / S3 /
param helpers, spec accessors, prompt building. Inheritance chain
(DataMixin -> GovernanceMixin, InferenceMixin -> DataMixin) preserved
from main, so executors are unchanged. Sync uv.lock with pyproject.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
Copy link
Copy Markdown
Collaborator

@kaiitunnz kaiitunnz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Several comments. PTAL.

Comment thread AGENTS.md Outdated
Comment thread AGENTS.md Outdated
Comment thread src/worker/executors/mixins/_otel.py Outdated
Comment thread src/worker/executors/mixins/governance.py Outdated
Comment thread src/worker/executors/mixins/governance.py Outdated
Comment thread src/server/routers/v1/trace.py Outdated
Comment thread src/server/routers/v1/trace.py Outdated
Comment thread sdk/src/flowmesh/resources/traces.py
Comment thread sdk/src/flowmesh/resources/trace.py Outdated
Comment thread pyproject.toml Outdated
timzsu and others added 4 commits May 2, 2026 07:47
…CLI fixes

Resolves all 30 review comments on PR #3:

Architecture / placement
- Move src/shared/governance/{analyzer.py, pydantic Span models} to
  src/server/governance/. shared/governance keeps only the wire-contract
  enum (FlowMeshSpanKind) + span-name constants (TASK_SPAN_NAME,
  READY_SPAN_NAME) used by both worker producer and server analyzer.
- shared/governance/render.py inlined into sdk/src/flowmesh/profile_views.py
  (only consumer); shared/governance/schemas.py deleted (dead code).
- SDK no longer imports from shared. Local copies live at
  sdk/src/flowmesh/models/trace.py and sdk/src/flowmesh/_jsonl.py.
- shared/utils/jsonl.py merged into shared/utils/json.py.

Trace endpoint
- Standalone /trace router (src/server/routers/v1/traces.py) at
  prefix /trace; routes /trace/{wf}/{kind} and /trace/{wf}/analyze.
- Trace JSONL artifacts move from out_dir/artifacts/logs/ to out_dir/logs/
  so a default `flowmesh result fetch` no longer ships them.
  maybe_upload_traces() uploads logs/ separately; server's
  _resolve_artifact_path now accepts logs/ prefix.

Worker mixin polish
- _otel.py: PREFIX_WORKFLOW + removeprefix; new task_trace_context
  context manager replaces explicit set/reset pair.
- governance.py: _task_user_id -> _task_owner_id; quoted return type
  dropped; dedup_json removed from payload-size calc;
  _extract_source_data_ids reads spec.upstreamResults directly with
  single-pass dedup; _lineage_dir reused in _task_span; docstrings.

Executors
- _run_body -> _run_inner (matches diffusers); drop unused task param
  (transformers, diffusers); drop redundant task_id (vllm, derive from
  task.task_id).

Analyzer
- Concise per-helper docstrings.
- CP breakdown expands through merged-execution batch_ids so shared work
  (model load, generation) emitted under the merge parent's data_id is
  attributed when a merged-child branch is on the path.

CLI
- Top-level `flowmesh trace ...`. Format: dag -> lineage; cp aliases
  critical-path; e2e aliases end-to-end.
- Header panel surfaces explicit `e2e=` and `cp=` summary lines.
- Queuing-delay table shortens data_id displays so duration_sec /
  wait_sec / blocked_by / cp columns stay visible at default widths.

AGENTS.md trim
- Drop the verbose E2E rebuild paragraph and the .env-writing directive.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
…ighten mixins

Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
…plural for trace/traces

Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
@timzsu timzsu requested a review from kaiitunnz May 2, 2026 09:39
timzsu added 2 commits May 2, 2026 13:44
Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
…onstant

Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
Copy link
Copy Markdown
Collaborator

@kaiitunnz kaiitunnz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comments remain. Also, please add unit tests for the new SDK trace models in tests/sdk/test_models.py.

Comment thread src/server/routers/v1/traces.py Outdated
Comment thread src/server/routers/v1/traces.py Outdated
Comment thread src/server/routers/v1/traces.py Outdated
Comment thread src/shared/utils/json.py Outdated
Comment thread src/worker/executors/utils/checkpoints.py Outdated
Comment thread sdk/src/flowmesh/resources/traces.py Outdated
Comment thread cli/src/flowmesh_cli/commands/trace.py Outdated
…address review nits

Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
@timzsu timzsu requested a review from kaiitunnz May 2, 2026 15:35
@kaiitunnz
Copy link
Copy Markdown
Collaborator

@timzsu Still need to add unit tests for the new SDK models.

Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
@timzsu
Copy link
Copy Markdown
Collaborator Author

timzsu commented May 2, 2026

@timzsu Still need to add unit tests for the new SDK models.

Sorry missed that one. Added.

Copy link
Copy Markdown
Collaborator

@kaiitunnz kaiitunnz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for the work.

Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
@kaiitunnz kaiitunnz force-pushed the zsu/lineage-redis-jsonl branch from 9dfedfa to f6a0040 Compare May 2, 2026 16:08
@timzsu timzsu merged commit 37e0051 into main May 2, 2026
10 checks passed
@timzsu timzsu deleted the zsu/lineage-redis-jsonl branch May 2, 2026 16:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants