Vector Embedding needs to respect payload size#26943
Conversation
| public void bulkIndex(List<Map<String, Object>> documents, String targetIndex) { | ||
| if (documents == null || documents.isEmpty()) { | ||
| return; | ||
| } | ||
|
|
||
| try { | ||
| List<BulkOperation> operations = new ArrayList<>(); | ||
| for (int i = 0; i < documents.size(); i++) { | ||
| Map<String, Object> doc = documents.get(i); | ||
| String parentId = (String) doc.get("parent_id"); | ||
| int chunkIndex = doc.containsKey("chunk_index") ? (int) doc.get("chunk_index") : i; | ||
| String docId = parentId + "-" + chunkIndex; | ||
|
|
||
| operations.add( | ||
| BulkOperation.of( | ||
| op -> op.index(idx -> idx.index(targetIndex).id(docId).document(doc)))); | ||
| } | ||
| VectorBulkProcessor processor = getOrCreateBulkProcessor(targetIndex); | ||
| for (int i = 0; i < documents.size(); i++) { | ||
| Map<String, Object> doc = documents.get(i); | ||
| String parentId = (String) doc.get("parent_id"); | ||
| int chunkIndex = doc.containsKey("chunk_index") ? (int) doc.get("chunk_index") : i; | ||
| String docId = parentId + "-" + chunkIndex; | ||
| processor.addChunk(docId, doc); | ||
| } |
There was a problem hiding this comment.
⚠️ Bug: Race condition: bulkIndex uses processor after it may be closed
bulkIndex() is not synchronized. It obtains a VectorBulkProcessor reference from getOrCreateBulkProcessor() (which IS synchronized), then iterates documents calling processor.addChunk() outside the lock. If another thread concurrently calls bulkIndex() with a different targetIndex, getOrCreateBulkProcessor will close the old processor and create a new one. The first thread then calls addChunk() on the closed processor — whose scheduler is already shut down. Because addChunk() never checks closed, chunks are added to the dead processor's buffer. These will only be flushed if shouldFlush triggers a size-based flush; otherwise they are silently lost.
The same race exists with flushBulkProcessor(): it closes and nullifies centralBulkProcessor while bulkIndex may still hold a stale reference and be mid-iteration.
Suggested fix:
Either synchronize the entire `bulkIndex` method on the same lock as `getOrCreateBulkProcessor`/`flushBulkProcessor`, or hold the lock for the full duration of the add loop:
public void bulkIndex(List<Map<String, Object>> documents, String targetIndex) {
if (documents == null || documents.isEmpty()) return;
synchronized (this) {
VectorBulkProcessor processor = getOrCreateBulkProcessor(targetIndex);
for (int i = 0; i < documents.size(); i++) {
// ... addChunk ...
}
}
}
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
| this.scheduler = Executors.newScheduledThreadPool(1); | ||
| scheduler.scheduleAtFixedRate( | ||
| this::flushIfNeeded, FLUSH_INTERVAL_MS, FLUSH_INTERVAL_MS, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
💡 Bug: Scheduler thread pool is non-daemon, may delay JVM shutdown
Executors.newScheduledThreadPool(1) creates a thread pool with non-daemon threads. If close() is not called (e.g., due to an exception in the calling code), the scheduler thread will prevent JVM shutdown. Use a custom ThreadFactory with daemon threads.
Suggested fix:
ThreadFactory daemonFactory = r -> {
Thread t = new Thread(r, "vector-bulk-flush");
t.setDaemon(true);
return t;
};
this.scheduler = Executors.newScheduledThreadPool(1, daemonFactory);
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
Code Review
|
| Compact |
|
Was this helpful? React with 👍 / 👎 | Gitar
Describe your changes:
Fixes
I worked on ... because ...
Type of change:
Checklist:
Fixes <issue-number>: <short explanation>Summary by Gitar
VectorBulkProcessorwith scheduled periodic flushesOpenSearchVectorServiceto respect max payload limitsbulkIndexmethod to useVectorBulkProcessorinstead of direct bulk operationsVectorBulkProcessorfield and import fromOpenSearchBulkSinkThis will update automatically on new commits.