-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Propagate the batch size in ChangeStreamPublisherImpl.withDocumentClass #664
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not clear
@Override | ||
AsyncReadOperation<AsyncBatchCursor<TDocument>> asAsyncReadOperation(final int initialBatchSize) { | ||
return createChangeStreamOperation(getMongoOperationPublisher().getCodecRegistry().get(clazz), initialBatchSize); | ||
} | ||
}; | ||
Integer batchSize = getBatchSize(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you write a test for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@Override | ||
AsyncReadOperation<AsyncBatchCursor<TDocument>> asAsyncReadOperation(final int initialBatchSize) { | ||
return createChangeStreamOperation(getMongoOperationPublisher().getCodecRegistry().get(clazz), initialBatchSize); | ||
} | ||
}; | ||
Integer batchSize = getBatchSize(); | ||
if (batchSize != null) { | ||
result.batchSize(batchSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not clear to me why batchSize is the one property that isn't propagated. Can you explain what's going on?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's actually not clear to me either. That's why I asked Ross directly in another comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll answer in a comment.
@rozza You are probably in the best position to tell whether or not this fixes a bug or introduces one. It appears that we must propagate the batch size if it was set, even for the reactive driver. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great catch, definitely could do with a regression test.
A BatchCursorPublisher has 3 properties: final clientSession
, final MongoOperationPublisher<T>
and the mutable Integer batchSize
.
This is the only case where a user can change a publisher type by using withDocumentClass
(which returns Publisher<T>
). All properties apart from the batchSize are set on the operation via getMongoOperationPublisher
.
Depending on how the ChangeStreamPublisher is used a explicitly declared batchSize maybe lost. If its set before withDocumentClass
is called its lost, if its set after its ok. If its lost it will be set the signalled demand value.
So to fix its probably cleaner to add a new constructor to BatchCursorPublisher taking all 3 params.
@@ -112,12 +112,18 @@ private ChangeStreamPublisherImpl( | |||
|
|||
@Override | |||
public <TDocument> Publisher<TDocument> withDocumentClass(final Class<TDocument> clazz) { | |||
return new BatchCursorPublisher<TDocument>(getClientSession(), getMongoOperationPublisher().withDocumentClass(clazz)) { | |||
BatchCursorPublisher<TDocument> result = new BatchCursorPublisher<TDocument>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add an constructor that includes the batchSize as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Depending on how the ChangeStreamPublisher is used a explicitly declared batchSize maybe lost. If its set before withDocumentClass is called its lost, if its set after its ok. If its lost it will be set the signalled demand value.
Was this meant to be the opposite?
- It seems to me that if
ChangeStreamPublisherImpl.batchSize
is called beforeChangeStreamPublisherImpl.withDocumentClass
, then with the changes in this PR, the specifiedbatchSize
is used by the newBatchCursorPublisher
, i.e., it is not lost. - If
ChangeStreamPublisherImpl.batchSize
is called afterChangeStreamPublisherImpl.withDocumentClass
, then theBatchCursorPublisher
created by the methodwithDocumentClass
cannot observe the newbatchSize
, and it is lost. I do not see how adding a new constructor toBatchCursorPublisher
would change this, am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just meant you can clean up the code by just adding a new constructor eg: rozza/mongo-java-driver@50b51d6
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was this meant to be the opposite?
I was describing why this fix was needed after Jeff asked "It's not clear to me why batchSize is the one property that isn't propagated. Can you explain what's going on?"
As withDocumentClass
returns a plan Publisher batchSize
cannot be set after conversion. Batchsize was only implicitly set via demand, even if a user explicitly set it by calling batchSize
before calling withDocumentClass
.
So this fix, fixes that niche case where a user wants to control the batchSizes from the cursor and use an alternative document class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @rozza, I merged your proposed commit. I still do not completely understand your explanation, likely because I lack more broad understanding of how this class is used.
Update: we had a call, where Ross clarified things for me.
@Override | ||
AsyncReadOperation<AsyncBatchCursor<TDocument>> asAsyncReadOperation(final int initialBatchSize) { | ||
return createChangeStreamOperation(getMongoOperationPublisher().getCodecRegistry().get(clazz), initialBatchSize); | ||
} | ||
}; | ||
Integer batchSize = getBatchSize(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@Override | ||
AsyncReadOperation<AsyncBatchCursor<TDocument>> asAsyncReadOperation(final int initialBatchSize) { | ||
return createChangeStreamOperation(getMongoOperationPublisher().getCodecRegistry().get(clazz), initialBatchSize); | ||
} | ||
}; | ||
Integer batchSize = getBatchSize(); | ||
if (batchSize != null) { | ||
result.batchSize(batchSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll answer in a comment.
I'm going to leave this one for @rozza to review (I'm not able to remove myself from the reviewers list, since I've already commented). |
I also tried and it's the same - cannot remove you. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Merging, because Jeff retracted himself from reviewing. |
JAVA-4010