-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Ensure insertedIds contain ids from all batches #850
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
The previous code was incorrect because it was comparing absolute write indexes with indexes that are relative to the current batch. This patch avoids that by using the insertedId map from SplittablePayload directly, which already contains absolute write indexes. JAVA-4436
@@ -282,21 +281,11 @@ private BulkWriteResult getBulkWriteResult(final BsonDocument result) { | |||
} | |||
|
|||
private List<BulkWriteInsert> getInsertedItems(final BsonDocument result) { | |||
if (payload.getPayloadType() == SplittablePayload.Type.INSERT) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed the conditional just to simplify the code. If the payload type is anything but INSERT, then payload.getInsertedIds
will be empty anyway.
if (payload.getPayloadType() == SplittablePayload.Type.INSERT) { | ||
|
||
Stream<WriteRequestWithIndex> writeRequests = payload.getWriteRequestWithIndexes().stream(); | ||
List<Integer> writeErrors = getWriteErrors(result).stream().map(BulkWriteError::getIndex).collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed this to a Set to make the contains
check more efficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
|
||
Stream<WriteRequestWithIndex> writeRequests = payload.getWriteRequestWithIndexes().stream(); | ||
List<Integer> writeErrors = getWriteErrors(result).stream().map(BulkWriteError::getIndex).collect(Collectors.toList()); | ||
if (!writeErrors.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this conditional, as it seems an unnecessary optimization. The contains check will be fast enough on any empty set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
} | ||
return Collections.emptyList(); | ||
Set<Integer> writeErrors = getWriteErrors(result).stream().map(BulkWriteError::getIndex).collect(Collectors.toSet()); | ||
return payload.getInsertedIds().entrySet().stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's safe to use SplittablePayload`s insertedIds map directly, since it will only contain the ids from the previously inserted batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So much neater and easier to read.
@@ -228,23 +229,71 @@ class BulkWriteBatchSpecification extends Specification { | |||
!bulkWriteBatch.hasAnotherBatch() | |||
} | |||
|
|||
def 'should only map inserts up to the payload position'() { | |||
def 'should map all inserted ids'() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might have made this test more complicated that it needs to be, as I changed it first just to reproduce the bug, but before I settled on the simplified approach in BulkWriteBatch
new BulkWriteInsert(2, new BsonInt32(2))] | ||
} | ||
|
||
def 'should not map inserted id with a write error'() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There wasn't a test for this so added one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good stuff
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Much easier to read now.
I think you mentioned checking upsertedIds as well - does anything need happen with them?
} | ||
return Collections.emptyList(); | ||
Set<Integer> writeErrors = getWriteErrors(result).stream().map(BulkWriteError::getIndex).collect(Collectors.toSet()); | ||
return payload.getInsertedIds().entrySet().stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So much neater and easier to read.
if (payload.getPayloadType() == SplittablePayload.Type.INSERT) { | ||
|
||
Stream<WriteRequestWithIndex> writeRequests = payload.getWriteRequestWithIndexes().stream(); | ||
List<Integer> writeErrors = getWriteErrors(result).stream().map(BulkWriteError::getIndex).collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
|
||
Stream<WriteRequestWithIndex> writeRequests = payload.getWriteRequestWithIndexes().stream(); | ||
List<Integer> writeErrors = getWriteErrors(result).stream().map(BulkWriteError::getIndex).collect(Collectors.toList()); | ||
if (!writeErrors.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
new BulkWriteInsert(2, new BsonInt32(2))] | ||
} | ||
|
||
def 'should not map inserted id with a write error'() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good stuff
No, I was wrong about that, as upsertedIds come directly from the server's response and wouldn't suffer from the same mapping issue. |
The previous code was incorrect because it was comparing absolute write indexes with indexes that are relative to the current batch. This patch avoids that by using the insertedId map from SplittablePayload directly, which already contains absolute write indexes. JAVA-4436
The previous code was incorrect because it was comparing absolute write indexes
with indexes that are relative to the current batch. This patch avoids that
by using the insertedId map from SplittablePayload directly, which already
contains absolute write indexes.
JAVA-4436