feat(worker-sync): dispatch sync_graph_task via registry (Phase 5 #51)#287
feat(worker-sync): dispatch sync_graph_task via registry (Phase 5 #51)#287charlie83Gs merged 2 commits intomainfrom
Conversation
First Phase-5 consumer wiring: sync_graph_task in services/worker-sync/workflows/sync.py now routes through the registry-resolved SyncProvider declared by the graph's composition. Wiring pattern (mirrors the P4 fact-decomposition wiring on PR #44): 1. Task entry calls kt_hatchet.composition.resolve_sync_provider(state, graph_id). 2. When a provider resolves, the task calls provider.initialize(services) + provider.sync_cycle(SyncContext) with session factories + shared singletons (embedding service, qdrant client) packed into options. 3. When no provider resolves (worker hasn't loaded the plugin, or the task runs without a graph context), the task falls back to constructing SyncEngine directly — keeps rolling deployments safe. The DefaultSyncProvider (kt-plugin-be-sync) unpacks the options dict into a SyncEngine under the hood, so observable behaviour matches the legacy path exactly — the plugin boundary is a registration adapter in this PR, not a code move. resolve_sync_provider helper lives in kt_hatchet.composition alongside resolve_fact_decomposition_provider so every worker applies the same fallback rules + fail-fast semantics (WARN on unresolved id; raise on transport / state errors). Tests: 4 new composition helper tests (registry lookup round-trip, None-graph-id short-circuit, empty-composition short-circuit, WARN on unresolved id). 18 worker-sync unit tests still green.
Non-blocking review items on the sync wiring PR: #4 Replace `assert worker_state.services is not None` with an explicit `if ... raise RuntimeError` so the invariant holds under `python -O` (which strips asserts) and we never dispatch against a silently-None services container. #5 Dedup the 'no plugin contributes this id' WARN from `resolve_sync_provider` per `(graph_id, provider_id)` pair. Sync dispatch is a cron that runs once per graph every minute; without dedup a rolling deployment fills the log with N×60 duplicate lines per hour per missing plugin. First occurrence still fires at WARNING; subsequent occurrences drop to DEBUG so operators can still see them under a verbose logger but don't get paged on spam. Set stays bounded by (graph_id, provider_id) cardinality (handful × handful) — no eviction needed; a worker restart re-warns once, which is the signal operators want. #6 Surface `SyncResult.failures` in the task log line + emitted event. Legacy path doesn't have a failures counter (engine's dead-letter inserts are logged inside the engine), so the field is 0 on the legacy branch and populated from the provider result on the registry branch. No behaviour change on legacy; parity for the provider contract. #7 Move the `from kt_worker_sync.sync_engine import SyncEngine` import inside the legacy-path branch — the provider-driven path never needs it, so we skip the import on workers whose plugin is registered. Micro-optimisation; clearer at the call site too. Skipped: - #1 PR body overstates scope: will amend body on GitHub instead. - #2 ABC lifecycle mismatch (init once vs per-task): flagged for a Phase-5 follow-up. Cache-on-WorkerState design needs more thought than fits in a review pass. - #3 options-dict workaround: real design tension — per-graph session factories don't fit the `initialize(services)` once-per- worker contract. Phase-5 follow-up. Tests: 15 composition helper tests (2 new dedup tests — one asserting repeated resolves produce a single WARN, one asserting distinct keys warn independently), 18 worker-sync unit tests.
|
Applied review feedback in commit f91a07f: #4 Replaced #5 Deduped 'no plugin contributes this id' WARN in #6 #7 Moved Scope notes (follow-ups, not blockers):
Tests: 73 kt-hatchet (2 new dedup tests), 18 worker-sync. Full suite green via pre-push hook. |
|
I have read the CLA Document and I hereby sign the CLA You can retrigger this bot by commenting recheck in this Pull Request. Posted by the CLA Assistant Lite bot. |
Summary
sync_graph_taskroutes through the registry-resolvedSyncProviderdeclared by the graph's composition.What changes
sync_graph_task(services/worker-sync/workflows/sync.py) resolvesSyncProviderviakt_hatchet.composition.resolve_sync_provider(state, graph_id):await provider.initialize(services)+await provider.sync_cycle(ctx)with session factories + shared singletons packed intoSyncContext.options.None(rolling deployment, plugin not loaded) → falls back to directSyncEngineconstruction. Keeps sync draining.kt_hatchet.composition.resolve_sync_providerhelper added alongsideresolve_fact_decomposition_provider— same fail-fast posture, same WARN-on-unresolved-id tolerant path.DefaultSyncProvider(kt-plugin-be-sync) unpacks the options dict intoSyncEngineunder the hood — observable behaviour matches legacy path exactly. Plugin boundary is a registration adapter, not a code move.Scope note
This PR wires the sync provider only. The other P5 providers (definition, source-cache, source-contribution) still run through their legacy call sites. Follow-up PRs will thread the remaining providers (same helper pattern, separate consumer sites). Dimensions / relations have ABC-vs-factory-signature tension that needs design work before wiring.
Test plan
uv run --project libs/kt-hatchet pytest -x -v(71 green — 4 new tests forresolve_sync_provider: registry round-trip, None-graph-id short-circuit, empty-composition short-circuit, WARN on unresolved id)uv run --project services/worker-sync pytest -x -v(18 green)uv run --frozen ruff format --check ./ruff check .🤖 Generated with Claude Code