Skip to content

fix(async-ops): atomically commit batch_retain parent and child rows#1343

Merged
nicoloboschi merged 4 commits intomainfrom
fix/atomic-batch-retain-parent-and-children
Apr 30, 2026
Merged

fix(async-ops): atomically commit batch_retain parent and child rows#1343
nicoloboschi merged 4 commits intomainfrom
fix/atomic-batch-retain-parent-and-children

Conversation

@cdbartholomew
Copy link
Copy Markdown
Contributor

Summary

submit_async_batch_retain inserts a parent row (status='pending', task_payload=NULL — it's a status aggregator, not directly executable) and then loops to insert one child row per sub-batch. Today the parent INSERT and the child INSERTs are not transactionally coupled: the parent's INSERT runs in its own auto-committing connection, and each child goes through a separate _submit_async_operation() call that acquires its own connection.

Any failure between those steps — connection drop, asyncpg timeout, schema-cache invalidation under concurrent load, or any exception raised during child setup — leaves a parent row with zero children. From there:

  • Worker poller skips the parent forever (the claim query filters task_payload IS NOT NULL).
  • Status aggregator never fires for the parent (there are no children to drive it to a terminal state).
  • The row sits pending indefinitely, growing without bound and inflating queue-depth metrics that operators rely on to size worker pools.

This is the same class of "crash window leaves an unclaimable row" bug that #1091 fixed at the child level (atomic INSERT-with-task_payload). The parent path was missed because the parent intentionally has task_payload=NULL, so the per-row atomicity fix doesn't apply — the cross-row atomicity has to come from a transaction.

Fix

Wrap parent INSERT + all child INSERTs in a single async with conn.transaction() block. The child INSERT SQL is inlined for the duration of the transaction (so all writes share the same connection). _submit_async_operation is left untouched — other callers are unaffected.

submit_task() is deferred until after the transaction commits. Rationale: SyncTaskBackend (used in tests) executes the task synchronously, which would not see the still-uncommitted child row. BrokerTaskBackend and WorkerTaskBackend are effectively no-ops once task_payload is populated, but we defer them all uniformly for clarity.

Single-sub-batch and multi-sub-batch paths flow through the same code; the transaction is essentially free for the common case (single sub-batch, two INSERTs).

Test plan

  • New regression test test_submit_async_batch_retain_rolls_back_parent_on_child_failure monkeypatches BatchRetainChildMetadata to raise on the second sub-batch and asserts zero async_operations rows remain (parent must roll back together with children). Mirrors the existing test_submit_async_operation_leaves_claimable_row_when_submit_task_fails test at the parent level.
  • Manual sanity: test_request_context_retry_count_propagated_to_validator and the new test both pass locally (the rest of the file's tests need an LLM API key configured and run in CI).
  • ruff check and ruff format --check clean.

Out of scope

  • Cleaning up any historical orphan parent rows that may already exist — that's an operational task, not part of this fix.
  • Extending the claim reaper (which today only handles stale processing rows) to also auto-fail pending rows that have NULL task_payload and zero children. Useful as defense-in-depth; tracked separately.

submit_async_batch_retain inserts a parent row (status='pending',
task_payload=NULL — it's a status aggregator, not directly executable)
and then loops to insert one child row per sub-batch. The parent INSERT
and child INSERTs were not transactionally coupled: the parent's
INSERT ran in its own auto-committing connection, and each child went
through a separate _submit_async_operation call that acquired its own
connection.

Any failure between them (connection drop, asyncpg timeout, schema-
cache invalidation under concurrent load, or any other exception
raised during child setup) leaves a parent row with zero children.
The worker poller skips it forever because of the
"task_payload IS NOT NULL" filter, the status aggregator never fires
because there are no children to complete, and the row sits pending
indefinitely. It also pollutes queue-depth metrics that operators rely
on to size worker pools.

Fix: wrap parent INSERT and all child INSERTs in a single
async transaction so the create-batch operation is atomic — either
all rows become visible to workers or none are. Child INSERT SQL is
inlined for the duration of the transaction; _submit_async_operation
is left untouched so other callers are unaffected. submit_task() is
deferred to after the transaction commits because SyncTaskBackend
(used in tests) executes synchronously and would otherwise read the
not-yet-committed row.

Tests:
- New regression test
  test_submit_async_batch_retain_rolls_back_parent_on_child_failure
  monkeypatches BatchRetainChildMetadata to raise on the second
  sub-batch and asserts zero async_operations rows remain after the
  failure (parent must roll back together with children).
- Mirrors the existing
  test_submit_async_operation_leaves_claimable_row_when_submit_task_fails
  but at the parent-level (the child-level case was already fixed).
submit_async_batch_retain now inserts children inline inside the
parent's transaction (rather than calling _submit_async_operation per
child) and notifies the task backend after commit. The pre-existing
test mocked _submit_async_operation and asserted on its call args;
that path no longer runs for children.

Replace those assertions with the new equivalent: count the INSERTs on
the connection, inspect the post-commit submit_task payload for
document_tags, and cross-check the JSON serialized into the child's
task_payload column. Same intent (document_tags propagates through to
the worker), aligned with the new code path.
handle_document_tracking calls delete_stale_observations_for_memories
with ops=ops, but ops is not a parameter of handle_document_tracking
itself (introduced in #1325 as part of the backend-aware observation
read split). Every retain that hits the document-tracking path raises
NameError before any actual work happens.

Add ops as a kwarg-only parameter on handle_document_tracking and
forward pool.ops from each of the three call sites in
_streaming_retain_batch. Behaviorally a no-op for the PG path
(uses_observation_sources_table is False, so the existing PG branch
runs) and for the Oracle path (junction table branch already runs
when ops.uses_observation_sources_table is True).
@cdbartholomew
Copy link
Copy Markdown
Contributor Author

@nicoloboschi heads up — your PR #1325 added ops=ops to the call inside fact_storage.handle_document_tracking (line 312) but didn't add ops as a parameter to that function or thread it through the three call sites in orchestrator._streaming_retain_batch. Every retain hitting the document-tracking path raises NameError: name 'ops' is not defined on main right now — 16 tests failing across test_delta_retain, test_retain, test_async_batch_retain, test_document_tracking, test_observation_invalidation, test_retain_append_mode.

I needed CI green on this PR so I rolled the fix into commit 302331be:

  • Added ops as a kwarg-only parameter to handle_document_tracking
  • Forwarded pool.ops from each of the three callers in orchestrator.py

Behaviorally it's the same outcome you intended: PG branch (your new native-array path) when ops.uses_observation_sources_table is False, Oracle branch (junction table) otherwise. Just plumbing the parameter you'd already started using.

Pinging in case you want to land it as a standalone fix faster than waiting for this PR to merge — it's blocking everyone else's CI on main too.

The test calls handle_document_tracking directly (rather than going
through the retain orchestrator) and didn't pass ops. With the param
defaulting to None, the inner delete_stale_observations_for_memories
call falls through to the Oracle junction-table read path and queries
a non-existent public.observation_sources relation under PG.

The orchestrator's three call sites already pass pool.ops; this test
just needs to mirror that. Pass memory._backend.ops to keep the test
backend-agnostic.
Copy link
Copy Markdown
Collaborator

@nicoloboschi nicoloboschi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, nice catch

@nicoloboschi nicoloboschi merged commit f4ca303 into main Apr 30, 2026
182 of 185 checks passed
@github-actions
Copy link
Copy Markdown

✅ CI results

Status Count
✅ Passed 17
⏭️ Skipped 31

View full run

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants