fix(server): batch upsert pipeline to prevent OOM on large datasets (v2)#1009
fix(server): batch upsert pipeline to prevent OOM on large datasets (v2)#1009diecoscai wants to merge 8 commits into
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: c48057327e
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| pageContent: chunk.pageContent, | ||
| metadata: JSON.parse(chunk.metadata) | ||
| const UPSERT_BATCH_SIZE = 500 | ||
| const isFullCleanup = recordManagerObj && data.recordManagerConfig && JSON.parse(data.recordManagerConfig)?.cleanup === 'full' |
There was a problem hiding this comment.
Parse recordManagerConfig without assuming JSON string
In _insertIntoVectorStoreWorkerThread, isFullCleanup does JSON.parse(data.recordManagerConfig), but this codepath receives recordManagerConfig as an object (for example from saveVectorStoreConfig and the upsertDocStore internal call). When a record-manager-enabled upsert runs, JSON.parse on an object throws before any upsert happens, so these requests fail outright instead of indexing documents. Read cleanup from the object directly (or parse conditionally only when the value is a string).
Useful? React with 👍 / 👎.
| where: filterOptions, | ||
| skip, | ||
| take: UPSERT_BATCH_SIZE, | ||
| order: { chunkNo: 'ASC' } |
There was a problem hiding this comment.
Add deterministic ordering for batched chunk pagination
The new batched upsert query uses skip/take with order: { chunkNo: 'ASC' }, but chunkNo is not unique across a store when multiple docs are indexed together. In that common storeId-only case, offset pagination over a non-unique sort key can return overlapping or missing rows between batches, which silently drops or duplicates chunks in the vector store. Add a unique tie-breaker (for example id) or use keyset pagination to make batching stable.
Useful? React with 👍 / 👎.
PR Review: fix(server): batch upsert pipeline to prevent OOM on large datasets (v2)Summary: This PR re-applies the OOM fix reverted by #1008, batching both the chunk-save step ( Critical IssuesNone. No security, multi-tenancy, or authentication regressions were introduced. All existing Major Concerns1. When Consider whether 2. indexResult.totalKeys = batchResult.totalKeys ?? indexResult.totalKeys
indexResult.addedDocs = batchResult.addedDocs ?? [] // last batch only
Minor Issues and Suggestions3. const recordManagerConfig =
typeof data.recordManagerConfig === 'string' ? JSON.parse(data.recordManagerConfig) : data.recordManagerConfig
const isFullCleanup = recordManagerObj && recordManagerConfig?.cleanup === 'full'
const isFullCleanup =
recordManagerObj &&
(() => {
const cfg = typeof data.recordManagerConfig === 'string'
? JSON.parse(data.recordManagerConfig)
: data.recordManagerConfig
return cfg?.cleanup === 'full'
})()Or simply move the parse inside the 4. Per-chunk Each chunk in a batch is saved with a separate 5. In-place mutation of for (const domain of data as any[]) {
domain.tags = domain.domain_tags?.map((dt: any) => dt.tags).filter(Boolean) || []
}The previous code mapped to a new array ( 6. Telemetry flowGraph: omit(indexResult['result'], ['totalKeys', 'addedDocs'])
Positive Observations
Next Steps
Overall this is a sound fix. The batch loop logic is correct and the regression fixes are well-targeted. Reviewed by Claude (claude-sonnet-4-6) |
Full-cleanup branch: replace unbounded find() with keyset-paginated while loop accumulating all chunks into a single docs[] array, then calling upsert() once. Preserves the single indexStartDt boundary required by record manager full-cleanup semantics - splitting upsert() calls causes later calls to delete earlier keys. Non-full path: replace count()+skip offset pagination with MoreThan(lastId) keyset cursor on UUID PK, eliminating race condition where concurrent writes shift pages.
|
PR Review posted below - see full review in next comment |
|
Deleting placeholder comment - full review follows |
PR Review: fix(server): batch upsert pipeline to prevent OOM on large datasets (v2) --- Summary This PR re-applies the OOM fix from 1006 (reverted in 1008). It adds keyset-based pagination for DocumentStoreFileChunk reads, batches the chunk-save step, and fixes two regressions: _createVectorStoreObject called inside the per-batch loop, and null-dereference on telemetry when indexResult is undefined. Two files changed: AAIDomains.ts and documentstore/index.ts. --- ### CRITICAL: filterOptions is missing organizationId (multi-tenancy gap) Location: packages/server/src/services/documentstore/index.ts lines 1572-1577. filterOptions only contains storeId (and optionally docId). organizationId is absent, so every paginated find() reads chunks across ALL organizations for that store. DocumentStoreFileChunk has an indexed organizationId column and data.organizationId is available from entity.organizationId earlier in the same function. Per project conventions ALL DB queries must filter by organizationId. This must be fixed before merge. --- ### CRITICAL: UUID keyset pagination may skip rows under concurrent writes Location: both while-true loops in _insertIntoVectorStoreWorkerThread. The cursor uses order id ASC with MoreThan(lastId) on a UUID v4 PK. UUID v4 is randomly generated so concurrent inserts with UUIDs sorting before lastId are silently skipped. A composite (createdDate, id) cursor would be safer. --- ### MAJOR: addedDocs not accumulated across batches in non-full-cleanup path numAdded/numDeleted/numUpdated/numSkipped are summed correctly but addedDocs is overwritten per batch. History records only show the last batch. Correctness bug if addedDocs is used for audit or display. --- ### MAJOR: JSON.parse(chunk.metadata) unguarded in both pagination loops Malformed metadata on any single chunk throws and fails the entire job. A per-chunk try/catch with fallback to empty object would be more resilient. --- ### MAJOR: Full-cleanup path accumulates ALL docs in memory with no comment The isFullCleanup branch correctly calls upsert() once (splitting breaks full-cleanup semantics since later calls delete keys from earlier ones) but there is NO comment explaining this. A future developer could split it and reintroduce the deletion bug. Please add an explanatory comment. --- ### Suggestions - AAIDomains.ts tag transform mutates data in place vs original .map() approach. Minor style concern. - SAVE_BATCH_SIZE and UPSERT_BATCH_SIZE are inline constants. Consider hoisting to module-level with a note on rationale. - No tests added. Given this is the 2nd attempt after a regression caused a revert, tests for batching and result accumulation are strongly recommended. - Pre-existing typo: chatlowId (missing f) in telemetry is carried forward unchanged. --- ### Positive Observations - Hoisting _createVectorStoreObject outside the per-batch loop correctly fixes the primary regression from 1006. - Guarding telemetry with if (indexResult) prevents the null-dereference crash. - AAIDomains.ts metadata refactor replaces two near-identical map() branches with one for-of loop cleanly. - userId and organizationId correctly set on every DocumentStoreFileChunk in _saveChunksToStorage. - Well-formed commit messages and PR description accurately explains the changes and tradeoffs. --- ### Recommendation: Request Changes Issue 1 (missing organizationId in filterOptions) is a multi-tenancy gap that must be resolved before merge. Issues 3 and 4 are correctness bugs to fix. Issue 5 needs a clarifying comment. |
PR Review: fix(server): batch upsert pipeline to prevent OOM on large datasets (v2)Reviewed commit: 61a95f7 (latest as of review) Summary: This PR re-applies the OOM fix reverted by #1008. It adds keyset-paginated reads of Critical IssuesNone remaining. The previously flagged multi-tenancy gap ( Major Concerns1. Full-cleanup path accumulates ALL chunks in memory before calling upsert The Consider documenting a known limitation or opening a follow-up ticket to explore whether the vector store's record manager can accept a pre-set 2. const recordManagerConfig =
typeof data.recordManagerConfig === 'string' ? JSON.parse(data.recordManagerConfig) : data.recordManagerConfig
const isFullCleanup = recordManagerObj && recordManagerConfig?.cleanup === 'full'
Minor Issues and Suggestions3. Per-chunk individual Each of the 500 chunks in a batch is saved with its own 4. The first-batch case initializes 5. Telemetry flowGraph: omit(indexResult['result'], ['totalKeys', 'addedDocs'])
6. No tests added Positive Observations
RecommendationApprove with suggestions. The two critical issues from prior review passes are resolved. The remaining items are either pre-existing issues or lower-priority improvements that can be deferred. The only actionable blocker I would flag for discussion before merge is item 1 (full-cleanup path still builds an in-memory Items 2 and 5 are the highest priority follow-ups. Item 6 (tests) is strongly recommended given the regression history. Reviewed by Claude (claude-sonnet-4-6) |
- Move delete-before-save to delete-after-save in both _saveChunksToStorage and syncAndRefreshChunks: old chunks are preserved until all new chunks are confirmed persisted - Add per-batch error handling so partial failures set status STALE and preserve completed batches rather than throwing all-or-nothing - Replace sequential one-by-one saves in syncAndRefreshChunks with SAVE_BATCH_SIZE-batched parallel saves matching the existing pattern - Derive totalChunks/totalChars from actual persisted counts, not expected values from the chunk response - Add durability contract comments to both save paths documenting the SYNCING -> SYNC/STALE state machine and safe-delete-timing mechanism
PR Review: fix(server): batch upsert pipeline to prevent OOM on large datasets (v2)Summary This PR re-applies the OOM fix from #1006 that was reverted in #1008. It batches both the chunk-save step ( The overall direction is correct and the changes are well-scoped. Below are the findings. Critical IssuesNone. No security regressions, no multi-tenancy gaps, and no authentication bypasses were introduced. All Major Concerns1. Location: The DB reads are now paginated (good), but all pages are accumulated into a single
2. Location: // step 8: delete old chunks only after all new chunks are confirmed saved
await appDataSource.getRepository(DocumentStoreFileChunk).delete({ docId: newLoaderId })The corresponding delete in await appDataSource.getRepository(DocumentStoreFileChunk).delete({
docId: newLoaderId,
userId: data.userId,
organizationId: data.organizationId
})Minor Issues and Suggestions3. Location: const recordManagerConfig =
typeof data.recordManagerConfig === 'string' ? JSON.parse(data.recordManagerConfig) : data.recordManagerConfig
const isFullCleanup = recordManagerObj && recordManagerConfig?.cleanup === 'full'
const isFullCleanup = (() => {
if (!recordManagerObj) return false
const cfg = typeof data.recordManagerConfig === 'string'
? JSON.parse(data.recordManagerConfig)
: data.recordManagerConfig
return cfg?.cleanup === 'full'
})()4. Per-chunk Location: Each chunk is saved in a separate const docChunks = batch.map((chunk, localIndex) => {
return repository.create({ ...fields, chunkNo: i + localIndex + 1 })
})
await repository.save(docChunks) // single multi-row INSERTThis would reduce connection pressure significantly for large syncs and is the more idiomatic TypeORM pattern. 5. IIFE for JSON.parse in document mapping adds noise with no error recovery Location: metadata: (() => { try { return chunk.metadata ? JSON.parse(chunk.metadata) : {} } catch { return {} } })()Returning metadata: (() => {
try { return chunk.metadata ? JSON.parse(chunk.metadata) : {} }
catch { logger.warn(`[documentstore] Failed to parse metadata for chunk ${chunk.id}`); return {} }
})()6. Test file declares Location: The 7. Typo fix in telemetry — Location: This is a good catch and worth noting explicitly. The original field name Positive Observations
Checklist Assessment
Overall assessment: Request changes on the two major concerns before merging. The multi-tenancy gap in the delete call (#2) is a correctness issue worth fixing now. The full-cleanup memory concern (#1) needs at minimum a documented decision. The minor items can be addressed as follow-up. Reviewed by Claude Sonnet 4.6 |
Summary
AAIDomains.tsand batched save/upsert flow indocumentstore/index.ts._createVectorStoreObjectoutside the upsert batch loop and guarding telemetry whenindexResultis undefined.numAdded,numDeleted,numUpdated,numSkipped) so history reflects full-run totals instead of last-batch-only stats.Why
Verification
pnpm --filter flowise-components buildpnpm --filter flowise buildnpx prettier --check packages/components/nodes/documentloaders/AAIDomains/AAIDomains.ts packages/server/src/services/documentstore/index.ts