fix(retain): prevent duplicate memory units from delta upsert chunk issues#1178
Merged
nicoloboschi merged 1 commit intomainfrom Apr 21, 2026
Merged
fix(retain): prevent duplicate memory units from delta upsert chunk issues#1178nicoloboschi merged 1 commit intomainfrom
nicoloboschi merged 1 commit intomainfrom
Conversation
18c991d to
566d2f5
Compare
…ng and concurrent upserts Two bugs in the streaming retain pipeline caused duplicate/stale memory units when documents were upserted multiple times: 1. **Out-of-order chunk index assignment**: The producer-consumer pipeline extracted facts from chunks concurrently, but assigned chunk_index based on task completion order rather than the original document position. This caused chunks to be stored at scrambled indices, making delta retain unable to detect unchanged chunks on subsequent upserts (always falling back to expensive full re-processing). 2. **Concurrent upsert race condition**: The streaming path splits document tracking (cascade-delete) and chunk/unit creation into separate transactions with LLM extraction in between. Two concurrent retains for the same document could interleave, producing duplicates or stale data. Fixes: - Use the original `global_idx` (position in pre-chunked content) for chunk_index instead of arrival-order-based offset - Add a PostgreSQL advisory lock per (bank_id, document_id) to serialize concurrent retain operations on the same document - Add stale-request detection: after acquiring the lock, skip if the document was already updated by a more recent retain (prevents older content from overwriting newer conversation state) - Use pg_try_advisory_lock with pool.acquire timeout to avoid deadlocks when pool is near capacity (graceful degradation) - Fix content hash mismatch in recovery detection (sanitize before hashing to match what handle_document_tracking stores)
566d2f5 to
57e7d3c
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Fixes two bugs in the streaming retain pipeline that caused duplicate and stale memory units when documents were upserted multiple times with the same or different content.
Bug 1: Out-of-order chunk index assignment (root cause of delta always falling back)
The streaming retain pipeline extracts facts from chunks concurrently via
asyncio.create_task. Chunks arrive at the database consumer in task-completion order (non-deterministic), butchunk_indexwas assigned based on arrival order:This caused
chunk_id = {bank}_{doc}_{chunk_index}to be non-deterministic. On subsequent upserts, delta retain compared new content (deterministically chunked) against DB chunks stored at scrambled positions — every hash mismatched — so delta always fell back to full streaming re-processing, wasting LLM calls and risking duplicates through the recovery path.Fix: Use the original
global_idx(position in the pre-chunked content array) which is passed through the queue from the producer:Verified with test:
test_repeated_upsert_chunks_not_scrambled— confirms chunks are stored at correct indices.test_delta_detects_unchanged_after_first_retain— confirms delta now correctly identifies all chunks as unchanged on second upsert (previously always fell back).Bug 2: Concurrent upsert race condition
The streaming path splits into separate transactions:
handle_document_tracking(cascade-delete old doc + insert new doc row)store_chunks_batch+insert_facts_batchTwo concurrent retains for the same document could interleave:
Result: A's memory units (stale) + B's memory units both linked to same chunks = duplicates + data corruption.
Fix: PostgreSQL advisory lock per
(bank_id, document_id)serializes concurrent retain operations:hash(bank_id:document_id)— doesn't block unrelated documentsBug 3: Content hash mismatch in recovery detection (minor)
The recovery path computed
new_content_hashfrom raw content, but the stored hash was computed after_sanitize_text(). If content contained control characters, recovery wouldn't trigger, causing unnecessary re-processing.Fix: Apply same sanitization before hashing in recovery check.
Test plan
test_repeated_upsert_chunks_not_scrambled— chunks stored at deterministic indicestest_delta_detects_unchanged_after_first_retain— delta correctly detects unchanged content on 2nd/3rd upserttest_delta_retain.pytests passtest_retain.pytests passtest_chunk_storage_upsert.pyidempotency tests pass