-
Notifications
You must be signed in to change notification settings - Fork 100
PERL-791 Change stream support #151
PERL-791 Change stream support #151
Conversation
…NotFoundError exceptions
…e ::CursorNotFoundError below ::DatabaseError
…has a possibly temporary cause
…r resumability detection and limit to one retry
xdg
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't gotten to the tests, yet, but here are the comments on the code. All minor stuff; looks good!
lib/MongoDB/ChangeStream.pm
Outdated
|
|
||
| use namespace::clean -except => 'meta'; | ||
|
|
||
| has _cursor => ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to call this _result to clearly differentiate it from the legacy cursor type.
lib/MongoDB/ChangeStream.pm
Outdated
| init_arg => 'resume_after', | ||
| predicate => '_has_resume_token', | ||
| lazy => 1, | ||
| builder => '_build_resume_token', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a builder?
lib/MongoDB/ChangeStream.pm
Outdated
| required => 1, | ||
| ); | ||
|
|
||
| has options => ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please rename "aggregation_options" to make it clear these are not the exact same as the options passed to watch.
lib/MongoDB/ChangeStream.pm
Outdated
| sub _build_cursor { | ||
| my ($self) = @_; | ||
|
|
||
| my $pipeline = $self->pipeline; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this extraneous?
lib/MongoDB/ChangeStream.pm
Outdated
|
|
||
| my $pipeline = $self->pipeline; | ||
|
|
||
| my @pipeline = @{ $self->pipeline || [] }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the || [] isn't necessary anymore given required and the type constraint.
lib/MongoDB/ChangeStream.pm
Outdated
| =head1 SYNOPSIS | ||
| $stream = $collection->watch( $pipeline, $options ); | ||
| while (my $change = $stream->next) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please wrap this in an infinite loop with a comment about how the inner loop exits when there are no changes available.
lib/MongoDB/Collection.pm
Outdated
| my $stream = $collection->watch( \@pipeline ); | ||
| my $stream = $collection->watch( \@pipeline, \%options ); | ||
| while (my $change = $stream->next) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also wrap in an infinite loop here.
lib/MongoDB/Op/_Aggregate.pm
Outdated
| my $options = $self->options; | ||
| my $is_2_6 = $link->does_write_commands; | ||
|
|
||
| my $query_flags = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this unused?
lib/MongoDB/Op/_Aggregate.pm
Outdated
| isa => Num, | ||
| ); | ||
|
|
||
| has cursorType => ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is unnecessary. For aggregation, if there's maxAwaitTimeMS, then we can conclude that it's a tailable await.
lib/MongoDB/Role/_CommandCursorOp.pm
Outdated
| $max_time_ms = $self->maxAwaitTimeMS if $self->maxAwaitTimeMS; | ||
| } | ||
| elsif ($self->isa('MongoDB::Op::_Aggregate') && | ||
| $self->cursorType eq 'tailable_await') { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I said above, I think it's sufficient to just check if isa MongoDB::Op::_Aggregate (and that maxAwaitTimeMS is true) without an extra cursor_type.
xdg
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test don't fully cover the spec test cases but those are really complicated to test and I think what we have is good enough. I can verify from inspection that the rest of the code does what it's expected to do.
I have only one more minor request, I think.
| notification for partial updates will include both a delta describing the | ||
| changes to the document, as well as a copy of the entire document that | ||
| was changed from some time after the change occurred. | ||
| * C<resumeAfter> - The logical starting point for this change stream. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add to this that it must be the _id from a document returned from calling next on the ChangeStream object.
…since they are covered by type constraints and required
…s no more changes
|
@xdg Thanks for the review! All fixes for the issues should now be in the branch. |
|
LGTM. I'll squash and merge. Thanks! |
|
Squashed and merged as f7b8339 |
Implements PERL-791 "Change stream support"
Overview:
MongoDB::ChangeStreamas the reconnecting wrapper around change stream aggregations returned from$collection->watch.::_CommandCursorOpfor tailable::Op::_Aggregateinstances.::_Aggregateto deal withmaxAwaitTimeMS.watchto::Collection.MongoDB::InvalidOperationError.maxAwaitTimeMS,resumeAfter,fullDocument.