Skip to content

feat(worker-bottomup): graph_migration_wf Hatchet workflow (Phase 7 #57)#267

Merged
charlie83Gs merged 2 commits intomainfrom
feat/graph-migration-workflow
Apr 20, 2026
Merged

feat(worker-bottomup): graph_migration_wf Hatchet workflow (Phase 7 #57)#267
charlie83Gs merged 2 commits intomainfrom
feat/graph-migration-workflow

Conversation

@charlie83Gs
Copy link
Copy Markdown
Contributor

Summary

The durable Hatchet workflow that wraps the Phase 7 primitives
(`begin_migration` / `run_hop` / `commit_migration` / `fail_migration`)
with per-hop commits, so a worker crash mid-plan leaves earlier audit
rows + flipped graph state persisted, and the next dispatch resumes via
`run_hop`'s idempotent-replay path.

The executor helper (`kt_db.migration_executor.execute_migration`) is a
single-session composition — no between-hop commits, so it can't give
crash durability. The workflow layer is exactly where that durability
lives. This PR re-composes the primitives around per-hop sessions.

Orchestration body lives in `run_graph_migration` (framework-agnostic)
so integration tests hit it directly against a real DB without Hatchet.

What's here

  • `GraphMigrationInput` / `GraphMigrationOutput` Pydantic models in
    `kt_hatchet.models`
  • `run_graph_migration` helper + `graph_migration_wf` Hatchet task in
    `services/worker-bottomup/src/kt_worker_bottomup/workflows/graph_migration.py`
  • Registered on the orchestrator worker and `worker-all`
  • 7 integration tests:
    • `test_single_hop_success_commits_per_hop` — fresh session after run proves persistence
    • `test_multi_hop_success_audit_rows_visible_between_hops` — hop2 peeks at hop1's committed row mid-plan
    • `test_failure_stamps_last_applied_and_error_state` — v2 not v3, status=error, read_only
    • `test_remigrate_resumes_and_skips_completed_hops` — first-run hop1 audit row makes second-run short-circuit
    • `test_unknown_plugin_fails_cleanly` — no graph state touched
    • `test_already_at_target_is_noop` — no read_only flip
    • `test_target_ahead_of_plugin_trims_plan` — dispatcher/plugin drift

Test plan

  • `uv run --project services/worker-bottomup pytest services/worker-bottomup/tests/integration/ -x` — 7/7
  • `uv run --project services/worker-bottomup pytest services/worker-bottomup/tests/test_bottom_up.py services/worker-bottomup/tests/test_wave_planner.py -x` — 34/34
  • CI green

What's NOT here (follow-ups)

🤖 Generated with Claude Code

The durable wrapper that composes the Phase 7 primitives with per-hop
commits — so a worker crash mid-plan leaves earlier audit rows and
flipped graph state persisted, and the next dispatch resumes from the
last applied ``to_version`` via the idempotent-replay path in
``run_hop`` (``success`` / ``skipped`` audit rows short-circuit).

The executor helper (``kt_db.migration_executor.execute_migration``) is
a single-session composition of the same primitives; this workflow
deliberately doesn't call it because single-session has no durability
between hops — exactly the property the workflow layer exists to add.

The orchestration body lives in a framework-agnostic helper
(``run_graph_migration``) so integration tests hit it against a real
session factory without Hatchet infrastructure. 7 tests cover:
single/multi-hop success (proving audit rows commit mid-plan via fresh
reads), mid-plan failure with correct ``last_applied`` stamping,
re-migrate resume via idempotent replay (completed hops NOT
re-invoked), unknown plugin failing cleanly without touching graph
state, already-at-target no-op, and target-ahead plan trimming.

Registered on the orchestrator worker and ``worker-all``.
Three bugs from PR review:

1. **mark_failed audit row never persisted on hop crash.** ``run_hop``
   flushes the failed row but doesn't commit — committing is the
   caller's job. The workflow's outer ``async with`` closed on the
   exception path before we committed, rolling back the flushed write,
   so the history API never surfaced failures from crashes. Fix: catch
   inside the async-with, commit, then re-raise into the outer handler
   that flips ``fail_migration`` on the graph row. New test
   ``test_failed_hop_persists_failed_audit_row`` reads the audit row
   from a fresh session to prove durability.

2. **Target-ahead-of-plugin silently stamped wrong version.** If the
   dispatcher asked for v3 but the plugin topped out at v2, the workflow
   trimmed the plan to v1→v2 yet still stamped
   ``graph_type_version=3`` at commit — sync worker would read v3 on
   v2 data. Fix: abort up-front when ``target_version >
   plugin.current_version``, leaving the graph untouched. Updated
   test ``test_target_ahead_of_plugin_aborts_before_any_hop`` asserts
   the abort path end-to-end (no hop invoked, no version bump, no
   read_only flip).

3. **Misleading "per-hop refresh" comment.** Removed. There was no
   ``ctx.refresh_timeout`` call to justify the comment. Per-hop timeout
   refresh is a future enhancement, can be wired via a callback if/when
   we see real long-running hops.

Reviewer's minor items left for follow-ups:
- Advisory-lock gap between begin/commit/fail is noted for #58/#59
  integration (``find_in_flight_for_graph`` mitigates at dispatch time).
- ``repr(exc)`` sanitization on the SSE stream — fine as-is for the
  internal operator view; expose-sanitized version if/when we surface
  this on a user-facing stream.
@charlie83Gs
Copy link
Copy Markdown
Contributor Author

Applied review fixes in 00df51b:

Bug 1 — failed audit row never persisted: Fixed in the workflow by catching inside the session's async-with, committing the flushed failed-row write, then re-raising into the outer handler that does fail_migration. New test test_failed_hop_persists_failed_audit_row reads the audit row from a fresh session to prove durability (exactly what the history API endpoint does).

Bug 2 — trimmed plan stamped wrong version: Fixed by adding an up-front abort when target_version > plugin.current_version. The graph stays untouched (no begin_migration, no audit rows, no read_only flip). Dispatcher is expected to re-dispatch at the real plugin.current_version. Updated test test_target_ahead_of_plugin_aborts_before_any_hop asserts the abort end-to-end: result.success=False, final_version=1, error mentions "tops out at v2", migration.calls == [], and the graph row is unchanged.

Bug 3 — misleading comment: Removed. Per-hop refresh_timeout wiring isn't needed for the no-op migrations we have today; can add via a callback param if we start seeing long-running hops.

Reviewer's minors:

  • Whole-workflow advisory-lock gap: noted — find_in_flight_for_graph mitigates at the dispatcher side (PR feat(kt-db): startup auto-dispatch for out-of-date graphs (Phase 7 #58) #269).
  • repr(exc) SSE sanitization: leaving as-is for the internal operator stream.
  • PipelineContext reuse across hops: confirmed stateless — WorkerCoreServices opens its own sessions per graph_engine() / write_engine() call, nothing cached between hops.

All 8 tests in test_graph_migration_workflow.py pass.

@charlie83Gs charlie83Gs merged commit 0230401 into main Apr 20, 2026
28 checks passed
@charlie83Gs charlie83Gs deleted the feat/graph-migration-workflow branch April 20, 2026 21:32
@github-actions
Copy link
Copy Markdown


Thank you for your submission, we really appreciate it. Like many open-source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution. You can sign the CLA by just posting a Pull Request Comment same as the below format.


I have read the CLA Document and I hereby sign the CLA


You can retrigger this bot by commenting recheck in this Pull Request. Posted by the CLA Assistant Lite bot.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant