fix(consolidator): dedupe + ON CONFLICT for observation_sources INSERT#1340
Merged
nicoloboschi merged 1 commit intovectorize-io:mainfrom Apr 30, 2026
Merged
Conversation
Both _execute_update_action and _execute_create_action insert into the
observation_sources junction table. Previously, both:
- Built INSERT batches without deduping the source_ids list
- Lacked ON CONFLICT handling
This caused UniqueViolationError on (observation_id, source_id) under
several scenarios:
1. Same source_id repeated within source_ids (a single batch can have
duplicates when several memories collapse to the same effective
source).
2. Concurrent consolidation of the same observation racing on the
DELETE-then-INSERT pattern in _execute_update_action.
3. Residual rows surviving the DELETE (rare but possible at transaction
boundaries).
Fix:
- dict.fromkeys() preserves insertion order while deduping the list.
- ON CONFLICT (observation_id, source_id) DO NOTHING absorbs any
surviving duplicates without aborting the entire batch.
Both layers are needed: dedupe avoids the round-trip on intra-batch
duplicates, ON CONFLICT handles cross-batch / concurrent races.
nicoloboschi
approved these changes
Apr 30, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Both
_execute_update_actionand_execute_create_actionin the consolidator insert rows into theobservation_sourcesjunction table. Both INSERT sites were missing two safeguards, causingUniqueViolationErroronobservation_sources_pkeyunder realistic load:source_ids(orsource_memory_ids) can contain the same id more than once when several memories collapse to the same effective source. The unfiltered list was passed toexecutemany, which then hit the unique constraint within the same batch._execute_update_action.Fix
dict.fromkeys(source_ids)(anddict.fromkeys(source_memory_ids)) — preserves insertion order while deduping in-batch.ON CONFLICT (observation_id, source_id) DO NOTHINGon both INSERT sites — absorbs any surviving duplicate without aborting the batch.Both layers are needed: dedupe avoids the round-trip on common in-batch duplicates;
ON CONFLICThandles cross-batch / concurrent races.Reproduced in production on a workload that retained sessions with overlapping source memories. After this change, consolidation runs that were previously rolling back per-batch now complete cleanly.
Test plan
UniqueViolationErroron observation_sources_pkey on the original code with overlapping source_ids