New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RUBY-1228 Change Streams #888
Conversation
5c7d0b0
to
637e63f
Compare
I looked over this, and unfortunately I don't really have any useful feedback to provide. Everything seems fine to me. |
ok, thank you @saghm ! |
496d72e
to
709737d
Compare
process(get_more_operation.execute(@server)) | ||
else | ||
read_with_retry do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think getmores should be retryed with the same cursor. The spec says to create a new cursor on a retryable error: https://github.com/mongodb/specifications/blob/master/source/change-streams.rst#resume-process
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, the cursor created by the ChangeStream has the options { disable_retry: true} so the first block that doesn't wrap the get_more in the read_with_retry
block will be executed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation.
# @since 2.5.0 | ||
# | ||
# @yieldparam [ BSON::Document ] Each change stream document. | ||
def each |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a couple questions. First, the spec says:
A driver SHOULD attempt to kill the cursor on the server on which the cursor is opened during the resume process, and MUST NOT attempt to kill the cursor on any other server.
Does close
ensure that the kill cursor request goes to the original server?
Second, create_cursor
retrys once and this method retrys as well. That means it will retry on consecutive network errors. The spec says to only retry on the first network error. Any errors that happen during the retry should be fatal. I think instead of (@cursor || create_cursor!)
it should be something like (@cursor || create_cursor_no_retry!)
. Do you agree or am I misunderstanding the code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does close ensure that the kill cursor request goes to the original server?
Yes. A cursor has a reference to the server (@server
variable). The kill cursors method called on the cursor in line 105, sends the operation to that server.
Second, create_cursor retrys once and this method retrys as well. That means it will retry on consecutive network errors. The spec says to only retry on the first network error. Any errors that happen during the retry should be fatal. I think instead of (@cursor || create_cursor!) it should be something like (@cursor || create_cursor_no_retry!). Do you agree or am I misunderstanding the code?
Yes, you're right. I shouldn't be retrying in the create_cursor method. I altered the logic a few commits ago. I believe it was correct before so I'll change it back.
@change_stream_filters = pipeline | ||
@options = options.freeze | ||
@resume_token = @options[:resume_after] | ||
create_cursor! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought I remember reading that the initial aggregation command should not be retryed but I don't see it in the spec. I'll open a spec question.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Waiting for an answer: https://jira.mongodb.org/browse/SPEC-947
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing this out. I chose to retry it because we historically retry read operations in the driver.
(here is where we retry finds, aggregations, and here is where we retry a count, for example)
and I thought it would be inconsistent to not retry the initial aggregation for a change stream. I'll see how the discussion goes in the spec ticket, thanks for opening it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, this makes a lot more sense now that I know Ruby retries read operations already.
# @since 2.5.0 | ||
def initialize(view, pipeline, options = {}) | ||
@view = view | ||
@change_stream_filters = pipeline |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need to make a deep copy of the pipeline to ensure the user does not modify it after calling watch?
pipeline = []
change_stream = coll.watch(pipeline, {})
pipeline << {'$project': {_id: 0}}
# Will the next cursor be created with [{'$changeStream': ...}, {'$project': {_id: 0}}]?
Maybe I'm being too paranoid...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
haha, that is possible, I guess. Good call though, it wouldn't hurt to copy it.
private | ||
|
||
def cache_resume_token(doc) | ||
unless @resume_token = doc[:_id] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, do you need to duplicate the _id to prevent a user from modifying it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, because ObjectIds are immutable and we are caching the ObjectId itself, not the document. If we were caching the document, then we might have to duplicate it. Thanks for checking though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The _id
here is a change stream resume token which is a document.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh right, I forgot. hm, I guess I should duplicate it then. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ChangeStream and #watch LGTM!
74a4055
to
c3fec3e
Compare
c3fec3e
to
5cec119
Compare
No description provided.