From 63731048bca47dd4748475c04c75ef94ba17acbb Mon Sep 17 00:00:00 2001 From: Robert Sedlacek Date: Thu, 22 Mar 2018 18:48:21 +0100 Subject: [PATCH 01/23] PERL-791 Change stream support --- lib/MongoDB/ChangeStream.pm | 182 +++++++++++++++++++++++++++ lib/MongoDB/Collection.pm | 80 ++++++++++++ lib/MongoDB/Error.pm | 5 + lib/MongoDB/Op/_Aggregate.pm | 18 +++ lib/MongoDB/Role/_CommandCursorOp.pm | 4 + t/collection.t | 90 +++++++++++++ 6 files changed, 379 insertions(+) create mode 100644 lib/MongoDB/ChangeStream.pm diff --git a/lib/MongoDB/ChangeStream.pm b/lib/MongoDB/ChangeStream.pm new file mode 100644 index 00000000..81e4d5f1 --- /dev/null +++ b/lib/MongoDB/ChangeStream.pm @@ -0,0 +1,182 @@ +# +# Copyright 2009-2018 MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +use strict; +use warnings; +package MongoDB::ChangeStream; + +# ABSTRACT: A stream providing update information for collections. + +use Moo; +use Try::Tiny; +use MongoDB::Cursor; +use MongoDB::Op::_Aggregate; +use MongoDB::Error; + +use namespace::clean -except => 'meta'; + +has _cursor => ( + is => 'rw', + lazy => 1, + builder => '_build_cursor', + clearer => '_clear_cursor', +); + +has collection => ( + is => 'ro', + required => 1, +); + +has options => ( + is => 'ro', +); + +has pipeline => ( + is => 'ro', + required => 1, +); + +has full_document => ( + is => 'ro', + predicate => '_has_full_document', +); + +has _resume_token => ( + is => 'rw', + init_arg => 'resume_after', + predicate => '_has_resume_token', + lazy => 1, + builder => '_build_resume_token', +); + +sub BUILD { + my ($self) = @_; + + # starting point is construction time instead of first next call + $self->_cursor; +} + +sub _build_cursor { + my ($self) = @_; + + my $pipeline = $self->pipeline; + + my @pipeline = @{ $self->pipeline || [] }; + @pipeline = ( + {'$changeStream' => { + ($self->_has_full_document + ? (fullDocument => $self->full_document) + : () + ), + ($self->_has_resume_token + ? (resumeAfter => $self->_resume_token) + : () + ), + }}, + @pipeline, + ); + + return $self->collection->aggregate( + \@pipeline, + { + %{ $self->options || {} }, + cursorType => 'tailable_await', + }, + ); +} + +=head1 STREAM METHODS + +=cut + +=head2 next + + $change_stream = $collection->watch(...); + $change = $change_stream->next; + +Waits for the next change in the collection and returns it. + +B: This method will wait for the amount of milliseconds passed +as C o L or the default. It +will not wait indefinitely. + +=cut + +sub next { + my ($self) = @_; + + my $change; + while (1) { + my $success = try { + $change = $self->_cursor->next; + 1 # successfully fetched result + } + catch { + my $error = $_; + if ( + $error->isa('MongoDB::ConnectionError') + or + $error->isa('MongoDB::CursorNotFoundError') + ) { + $self->_cursor($self->_build_cursor); + } + else { + die $error; + } + 0 # failed, cursor was rebuilt + }; + last if $success; + } + + # this differs from drivers that block indefinitely. we have to + # deal with the situation where no results are available. + if (not defined $change) { + return undef; + } + + if (exists $change->{_id}) { + my $resume_token = $change->{_id}; + $self->_resume_token($resume_token); + return $change; + } + else { + MongoDB::InvalidOperationError->throw( + "Cannot provide resume functionality when the ". + "resume token is missing"); + } +} + +1; + +=head1 SYNOPSIS + + $stream = $collection->watch( $pipeline, $options ); + while (my $change = $stream->next) { + ... + } + +=head1 DESCRIPTION + +This class models change stream results as returned by the +L method. + +=head1 SEE ALSO + +The L. + +The L. + +=cut diff --git a/lib/MongoDB/Collection.pm b/lib/MongoDB/Collection.pm index 31ddf51d..076d3e3c 100644 --- a/lib/MongoDB/Collection.pm +++ b/lib/MongoDB/Collection.pm @@ -24,6 +24,7 @@ package MongoDB::Collection; use version; our $VERSION = 'v1.999.0'; +use MongoDB::ChangeStream; use MongoDB::Error; use MongoDB::IndexView; use MongoDB::InsertManyResult; @@ -1000,6 +1001,79 @@ sub find_one_and_update { return $self->_find_one_and_update_or_replace($filter, $update, $options); } +=method watch + +Watches for changes on this collection- + +Perform an aggregation with an implicit initial C<$changeStream> stage +and returns a L result which can be used to +iterate over the changes in the collection. This functionality is +available since MongoDB 3.6. + + my $stream = $collection->watch(); + my $stream = $collection->watch( \@pipeline ); + my $stream = $collection->watch( \@pipeline, \%options ); + + while (my $change = $stream->next) { + # process $change + } + +The returned stream will not block forever waiting for changes. If you +want to respond to changes over a longer time use C and +regularly call C in a loop. + +B: Using this helper method is preferred to manually aggregating +with a C<$changeStream> stage, since it will automatically resume when +the connection was terminated. + +The optional first argument must be an array-ref of +L +documents. Each pipeline document must be a hash reference. Not all +pipeline stages are supported after C<$changeStream>. + +The optional second argument is a hash reference with options: + +=for :list +* C - The fullDocument to pass as an option to the + C<$changeStream> stage. Allowed values: C, C. + Defaults to C. When set to C, the change + notification for partial updates will include both a delta describing the + changes to the document, as well as a copy of the entire document that + was changed from some time after the change occurred. +* C - The logical starting point for this change stream. +* C - The maximum number of milliseconds for the server + to wait before responding. + +See L for more available options. + +See the L +for general usage information on change streams. + +See the L +for details on change streams. + +=cut + +sub watch { + my ( $self, $pipeline, $options ) = @_; + + $pipeline ||= []; + $options ||= {}; + + return MongoDB::ChangeStream->new( + exists($options->{fullDocument}) + ? (full_document => delete $options->{fullDocument}) + : (full_document => 'default'), + exists($options->{resumeAfter}) + ? (resume_after => delete $options->{resumeAfter}) + : (), + collection => $self, + client => $self->client, + pipeline => $pipeline, + options => $options, + ); +} + =method aggregate @pipeline = ( @@ -1082,6 +1156,12 @@ sub aggregate { options => $options, read_concern => $self->read_concern, has_out => $last_op eq '$out', + exists($options->{cursorType}) + ? (cursorType => delete $options->{cursorType}) + : (cursorType => 'non_tailable'), + exists($options->{maxAwaitTimeMS}) + ? (maxAwaitTimeMS => delete $options->{maxAwaitTimeMS}) + : (), %{ $self->_op_args }, ); diff --git a/lib/MongoDB/Error.pm b/lib/MongoDB/Error.pm index 06ba6615..ddea542f 100644 --- a/lib/MongoDB/Error.pm +++ b/lib/MongoDB/Error.pm @@ -277,6 +277,11 @@ use Moo; use namespace::clean; extends 'MongoDB::Error'; +package MongoDB::InvalidOperationError; +use Moo; +use namespace::clean; +extends 'MongoDB::Error'; + #--------------------------------------------------------------------------# # Private error classes #--------------------------------------------------------------------------# diff --git a/lib/MongoDB/Op/_Aggregate.pm b/lib/MongoDB/Op/_Aggregate.pm index 8b93ba0b..36c9a92a 100644 --- a/lib/MongoDB/Op/_Aggregate.pm +++ b/lib/MongoDB/Op/_Aggregate.pm @@ -28,11 +28,13 @@ use Moo; use MongoDB::Op::_Command; use MongoDB::_Types qw( ArrayOfHashRef + CursorType ); use Types::Standard qw( Bool HashRef InstanceOf + Num ); use namespace::clean; @@ -61,6 +63,17 @@ has has_out => ( isa => Bool, ); +has maxAwaitTimeMS => ( + is => 'rw', + isa => Num, +); + +has cursorType => ( + is => 'rw', + isa => CursorType, + required => 1, +); + with $_ for qw( MongoDB::Role::_PrivateConstructor MongoDB::Role::_CollectionOp @@ -75,6 +88,11 @@ sub execute { my $options = $self->options; my $is_2_6 = $link->does_write_commands; + my $query_flags = { + tailable => ( $self->cursorType =~ /^tailable/ ? 1 : 0 ), + await_data => $self->cursorType eq 'tailable_await', + }; + # maxTimeMS isn't available until 2.6 and the aggregate command # will reject it as unrecognized delete $options->{maxTimeMS} unless $is_2_6; diff --git a/lib/MongoDB/Role/_CommandCursorOp.pm b/lib/MongoDB/Role/_CommandCursorOp.pm index 21b26f85..52b50ccd 100644 --- a/lib/MongoDB/Role/_CommandCursorOp.pm +++ b/lib/MongoDB/Role/_CommandCursorOp.pm @@ -46,6 +46,10 @@ sub _build_result_from_cursor { $self->cursorType eq 'tailable_await') { $max_time_ms = $self->maxAwaitTimeMS if $self->maxAwaitTimeMS; } + elsif ($self->isa('MongoDB::Op::_Aggregate') && + $self->cursorType eq 'tailable_await') { + $max_time_ms = $self->maxAwaitTimeMS if $self->maxAwaitTimeMS; + } my $batch = $c->{firstBatch}; my $qr = MongoDB::QueryResult->_new( diff --git a/t/collection.t b/t/collection.t index 6ecae76c..f82cc197 100644 --- a/t/collection.t +++ b/t/collection.t @@ -945,6 +945,96 @@ subtest "querying w/ collation" => sub { } }; +subtest 'change streams' => sub { + plan skip_all => 'MongoDB version 3.6 or higher required' + unless $server_version >= version->parse('v3.6.0'); + plan skip_all => 'MongoDB replica set required' + unless $server_type eq 'RSPrimary'; + + $coll->drop; + $coll->insert_one({ value => 1 }); + $coll->insert_one({ value => 2 }); + + my $change_stream = $coll->watch(); + is $change_stream->next, undef, 'next without changes'; + + for my $index (1..10) { + $coll->insert_one({ value => 100 + $index }); + } + my %changed; + while (my $change = $change_stream->next) { + is $changed{ $change->{fullDocument}{value} }++, 0, + 'first seen '.$change->{fullDocument}{value}; + } + is scalar(keys %changed), 10, 'seen all changes'; +}; + +subtest 'change streams w/ maxAwaitTimeMS' => sub { + plan skip_all => 'MongoDB version 3.6 or higher required' + unless $server_version >= version->parse('v3.6.0'); + plan skip_all => 'MongoDB replica set required' + unless $server_type eq 'RSPrimary'; + + $coll->drop; + my $change_stream = $coll->watch([], { maxAwaitTimeMS => 3000 }); + my $start = time; + is $change_stream->next, undef, 'next without changes'; + my $elapsed = time - $start; + my $min_elapsed = 2; + ok $elapsed > $min_elapsed, "waited for at least $min_elapsed secs"; +}; + +subtest 'change streams w/ fullDocument' => sub { + plan skip_all => 'MongoDB version 3.6 or higher required' + unless $server_version >= version->parse('v3.6.0'); + plan skip_all => 'MongoDB replica set required' + unless $server_type eq 'RSPrimary'; + + $coll->drop; + $coll->insert_one({ value => 1 }); + my $change_stream = $coll->watch( + [], + { fullDocument => 'updateLookup' }, + ); + $coll->update_one( + { value => 1 }, + { '$set' => { updated => 3 }}, + ); + my $change = $change_stream->next; + is $change->{operationType}, 'update', 'change is an update'; + ok exists($change->{fullDocument}), 'delta contains full document'; +}; + +subtest 'change streams w/ resumeAfter' => sub { + plan skip_all => 'MongoDB version 3.6 or higher required' + unless $server_version >= version->parse('v3.6.0'); + plan skip_all => 'MongoDB replica set required' + unless $server_type eq 'RSPrimary'; + + $coll->drop; + my $id = do { + my $change_stream = $coll->watch(); + $coll->insert_one({ value => 200 }); + $coll->insert_one({ value => 201 }); + my $change = $change_stream->next; + ok $change, 'change exists'; + is $change->{fullDocument}{value}, 200, + 'correct change'; + $change->{_id} + }; + do { + my $change_stream = $coll->watch( + [], + { resumeAfter => $id }, + ); + my $change = $change_stream->next; + ok $change, 'change exists after resume'; + is $change->{fullDocument}{value}, 201, + 'correct change after resume'; + is $change_stream->next, undef, 'no more changes'; + }; +}; + my $js_str = 'function() { return this.a > this.b }'; my $js_obj = MongoDB::Code->new( code => $js_str ); From e32fd12f98bdd4634d5a6c5b7be161f92fe867f2 Mon Sep 17 00:00:00 2001 From: Robert Sedlacek Date: Thu, 22 Mar 2018 19:32:17 +0100 Subject: [PATCH 02/23] adjusted to use Safe::Isa --- lib/MongoDB/ChangeStream.pm | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/MongoDB/ChangeStream.pm b/lib/MongoDB/ChangeStream.pm index 81e4d5f1..b5386762 100644 --- a/lib/MongoDB/ChangeStream.pm +++ b/lib/MongoDB/ChangeStream.pm @@ -25,6 +25,7 @@ use Try::Tiny; use MongoDB::Cursor; use MongoDB::Op::_Aggregate; use MongoDB::Error; +use Safe::Isa; use namespace::clean -except => 'meta'; @@ -127,9 +128,9 @@ sub next { catch { my $error = $_; if ( - $error->isa('MongoDB::ConnectionError') + $error->$_isa('MongoDB::ConnectionError') or - $error->isa('MongoDB::CursorNotFoundError') + $error->$_isa('MongoDB::CursorNotFoundError') ) { $self->_cursor($self->_build_cursor); } From 5b769fc1862ba2c632c3448bd9febe74f967dc12 Mon Sep 17 00:00:00 2001 From: Robert Sedlacek Date: Fri, 23 Mar 2018 03:23:08 +0100 Subject: [PATCH 03/23] Added to MongoDB::ChangeStream --- lib/MongoDB/ChangeStream.pm | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/MongoDB/ChangeStream.pm b/lib/MongoDB/ChangeStream.pm index b5386762..53617158 100644 --- a/lib/MongoDB/ChangeStream.pm +++ b/lib/MongoDB/ChangeStream.pm @@ -20,6 +20,9 @@ package MongoDB::ChangeStream; # ABSTRACT: A stream providing update information for collections. +use version; +our $VERSION = 'v1.999.0'; + use Moo; use Try::Tiny; use MongoDB::Cursor; From 9a323f06802a38a614bdcb2a8eb26cb3be6c9bce Mon Sep 17 00:00:00 2001 From: Robert Sedlacek Date: Fri, 23 Mar 2018 03:23:39 +0100 Subject: [PATCH 04/23] Don't return undef as per Perl::Critic --- lib/MongoDB/ChangeStream.pm | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/MongoDB/ChangeStream.pm b/lib/MongoDB/ChangeStream.pm index 53617158..838f1ec9 100644 --- a/lib/MongoDB/ChangeStream.pm +++ b/lib/MongoDB/ChangeStream.pm @@ -148,7 +148,7 @@ sub next { # this differs from drivers that block indefinitely. we have to # deal with the situation where no results are available. if (not defined $change) { - return undef; + return; } if (exists $change->{_id}) { From 681be3d62c720f4a681b4a2fe1232f8a8bf83a9f Mon Sep 17 00:00:00 2001 From: Robert Sedlacek Date: Fri, 23 Mar 2018 03:25:41 +0100 Subject: [PATCH 05/23] Enable raising of MongoDB::CursorNotFoundError instead of just MongoDB::DatabaseError --- lib/MongoDB/Error.pm | 4 +++- lib/MongoDB/Role/_DatabaseErrorThrower.pm | 3 +++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/MongoDB/Error.pm b/lib/MongoDB/Error.pm index ddea542f..3c12b846 100644 --- a/lib/MongoDB/Error.pm +++ b/lib/MongoDB/Error.pm @@ -45,6 +45,7 @@ BEGIN { UNKNOWN_ERROR => 8, NAMESPACE_NOT_FOUND => 26, INDEX_NOT_FOUND => 27, + CURSOR_NOT_FOUND => 43, EXCEEDED_TIME_LIMIT => 50, COMMAND_NOT_FOUND => 59, WRITE_CONCERN_ERROR => 64, @@ -250,7 +251,8 @@ extends 'MongoDB::Error'; package MongoDB::CursorNotFoundError; use Moo; use namespace::clean; -extends 'MongoDB::Error'; +extends 'MongoDB::DatabaseError'; +sub _build_code { return MongoDB::Error::CURSOR_NOT_FOUND() } package MongoDB::DecodingError; use Moo; diff --git a/lib/MongoDB/Role/_DatabaseErrorThrower.pm b/lib/MongoDB/Role/_DatabaseErrorThrower.pm index a6e07b01..29a7bb24 100644 --- a/lib/MongoDB/Role/_DatabaseErrorThrower.pm +++ b/lib/MongoDB/Role/_DatabaseErrorThrower.pm @@ -49,6 +49,9 @@ sub _throw_database_error { elsif ( grep { $code == $_ } @$ANY_DUP_KEY ) { $error_class = "MongoDB::DuplicateKeyError"; } + elsif ( $code == CURSOR_NOT_FOUND ) { + $error_class = "MongoDB::CursorNotFoundError"; + } elsif ( $self->last_wtimeout ) { $error_class = "MongoDB::WriteConcernError"; } From a7d14340d836dadd375f1fa83e0727274be1b256 Mon Sep 17 00:00:00 2001 From: Robert Sedlacek Date: Fri, 23 Mar 2018 03:26:53 +0100 Subject: [PATCH 06/23] Added test for MongoDB::ChangeStream reconnections on MongoDB::CursorNotFoundError exceptions --- t/collection.t | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/t/collection.t b/t/collection.t index f82cc197..6ce2ef05 100644 --- a/t/collection.t +++ b/t/collection.t @@ -1035,6 +1035,31 @@ subtest 'change streams w/ resumeAfter' => sub { }; }; +subtest 'change streams w/ CursorNotFound reconnection' => sub { + plan skip_all => 'MongoDB version 3.6 or higher required' + unless $server_version >= version->parse('v3.6.0'); + plan skip_all => 'MongoDB replica set required' + unless $server_type eq 'RSPrimary'; + + $coll->drop; + + my $change_stream = $coll->watch; + $coll->insert_one({ value => 301 }); + my $change = $change_stream->next; + ok $change, 'change received'; + is $change->{fullDocument}{value}, 301, 'correct change'; + + $testdb->run_command([ + killCursors => $coll->name, + cursors => [$change_stream->_cursor->_cursor_id], + ]); + + $coll->insert_one({ value => 302 }); + $change = $change_stream->next; + ok $change, 'change received after reconnect'; + is $change->{fullDocument}{value}, 302, 'correct change'; +}; + my $js_str = 'function() { return this.a > this.b }'; my $js_obj = MongoDB::Code->new( code => $js_str ); From 294585de70b8d10e73b41ef0bde7886cddd2b96b Mon Sep 17 00:00:00 2001 From: Robert Sedlacek Date: Fri, 23 Mar 2018 03:48:35 +0100 Subject: [PATCH 07/23] Adjust MongoDB::Error docs to include ::InvalidOperationError and move ::CursorNotFoundError below ::DatabaseError --- lib/MongoDB/Error.pm | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/MongoDB/Error.pm b/lib/MongoDB/Error.pm index 3c12b846..f61572f1 100644 --- a/lib/MongoDB/Error.pm +++ b/lib/MongoDB/Error.pm @@ -354,10 +354,10 @@ To retry failures automatically, consider using L. | | | |->MongoDB::NetworkError | - |->MongoDB::CursorNotFoundError - | |->MongoDB::DatabaseError | | + | |->MongoDB::CursorNotFoundError + | | | |->MongoDB::DuplicateKeyError | | | |->MongoDB::NotMasterError @@ -374,6 +374,8 @@ To retry failures automatically, consider using L. | |->MongoDB::InternalError | + |->MongoDB::InvalidOperationError + | |->MongoDB::ProtocolError | |->MongoDB::SelectionError From 3f28048d0b7d61821139c4069c0c1c9e29566f5c Mon Sep 17 00:00:00 2001 From: Robert Sedlacek Date: Sun, 25 Mar 2018 00:02:03 +0100 Subject: [PATCH 08/23] Add ->_is_resumable() flag to MongoDB::Errors indicating if an error has a possibly temporary cause --- lib/MongoDB/Error.pm | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/lib/MongoDB/Error.pm b/lib/MongoDB/Error.pm index f61572f1..cdc9ba5b 100644 --- a/lib/MongoDB/Error.pm +++ b/lib/MongoDB/Error.pm @@ -110,6 +110,10 @@ sub throw { die $throwable; } +# internal flag indicating if an operation should be retried when +# an error occurs. +sub _is_resumable { 1 } + #--------------------------------------------------------------------------# # Subclasses with attributes included inline below #--------------------------------------------------------------------------# @@ -135,6 +139,8 @@ has code => ( sub _build_code { return MongoDB::Error::UNKNOWN_ERROR() } +sub _is_resumable { 0 } + package MongoDB::DocumentError; use Moo; @@ -192,6 +198,8 @@ use Moo; use namespace::clean; extends 'MongoDB::Error'; +sub _is_resumable { 1 } + package MongoDB::HandshakeError; use Moo; use namespace::clean; @@ -230,6 +238,7 @@ use Moo; use namespace::clean; extends 'MongoDB::DatabaseError'; sub _build_code { return MongoDB::Error::NOT_MASTER() } +sub _is_resumable { 1 } package MongoDB::WriteError; use Moo; @@ -253,6 +262,7 @@ use Moo; use namespace::clean; extends 'MongoDB::DatabaseError'; sub _build_code { return MongoDB::Error::CURSOR_NOT_FOUND() } +sub _is_resumable { 1 } package MongoDB::DecodingError; use Moo; From 9b034d852f42709c9e72ed56f3509b6af9240563 Mon Sep 17 00:00:00 2001 From: Robert Sedlacek Date: Sun, 25 Mar 2018 00:06:03 +0100 Subject: [PATCH 09/23] Change MongoDB::ChangeStream to use MongoDB::Error->is_resumable() for resumability detection and limit to one retry --- lib/MongoDB/ChangeStream.pm | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/lib/MongoDB/ChangeStream.pm b/lib/MongoDB/ChangeStream.pm index 838f1ec9..1121cd92 100644 --- a/lib/MongoDB/ChangeStream.pm +++ b/lib/MongoDB/ChangeStream.pm @@ -123,18 +123,20 @@ sub next { my ($self) = @_; my $change; + my $retried; while (1) { - my $success = try { + last if try { $change = $self->_cursor->next; 1 # successfully fetched result } catch { my $error = $_; if ( - $error->$_isa('MongoDB::ConnectionError') - or - $error->$_isa('MongoDB::CursorNotFoundError') + not($retried) + and $error->$_isa('MongoDB::Error') + and $error->_is_resumable ) { + $retried = 1; $self->_cursor($self->_build_cursor); } else { @@ -142,7 +144,6 @@ sub next { } 0 # failed, cursor was rebuilt }; - last if $success; } # this differs from drivers that block indefinitely. we have to From 1e9d72c0d1a8bd2d1f4a439fd503fc3a46fbd5b3 Mon Sep 17 00:00:00 2001 From: Robert Sedlacek Date: Wed, 28 Mar 2018 17:09:43 +0200 Subject: [PATCH 10/23] Add types to attributes in MongoDB::ChangeStream --- lib/MongoDB/ChangeStream.pm | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/lib/MongoDB/ChangeStream.pm b/lib/MongoDB/ChangeStream.pm index 1121cd92..6491e650 100644 --- a/lib/MongoDB/ChangeStream.pm +++ b/lib/MongoDB/ChangeStream.pm @@ -29,11 +29,21 @@ use MongoDB::Cursor; use MongoDB::Op::_Aggregate; use MongoDB::Error; use Safe::Isa; +use MongoDB::_Types qw( + MongoDBCollection + ArrayOfHashRef +); +use Types::Standard qw( + InstanceOf + HashRef + Str +); use namespace::clean -except => 'meta'; has _cursor => ( is => 'rw', + isa => InstanceOf['MongoDB::QueryResult'], lazy => 1, builder => '_build_cursor', clearer => '_clear_cursor', @@ -41,20 +51,24 @@ has _cursor => ( has collection => ( is => 'ro', + isa => MongoDBCollection, required => 1, ); has options => ( is => 'ro', + isa => HashRef, ); has pipeline => ( is => 'ro', + isa => ArrayOfHashRef, required => 1, ); has full_document => ( is => 'ro', + isa => Str, predicate => '_has_full_document', ); From cfdbd3007530255542956d77aa04eedab4911939 Mon Sep 17 00:00:00 2001 From: Robert Sedlacek Date: Thu, 29 Mar 2018 04:24:38 +0200 Subject: [PATCH 11/23] Rename ChangeStream->_cursor to ->_result --- lib/MongoDB/ChangeStream.pm | 14 +++++++------- t/collection.t | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/lib/MongoDB/ChangeStream.pm b/lib/MongoDB/ChangeStream.pm index 6491e650..2eecd214 100644 --- a/lib/MongoDB/ChangeStream.pm +++ b/lib/MongoDB/ChangeStream.pm @@ -41,12 +41,12 @@ use Types::Standard qw( use namespace::clean -except => 'meta'; -has _cursor => ( +has _result => ( is => 'rw', isa => InstanceOf['MongoDB::QueryResult'], lazy => 1, - builder => '_build_cursor', - clearer => '_clear_cursor', + builder => '_build_result', + clearer => '_clear_result', ); has collection => ( @@ -84,10 +84,10 @@ sub BUILD { my ($self) = @_; # starting point is construction time instead of first next call - $self->_cursor; + $self->_result; } -sub _build_cursor { +sub _build_result { my ($self) = @_; my $pipeline = $self->pipeline; @@ -140,7 +140,7 @@ sub next { my $retried; while (1) { last if try { - $change = $self->_cursor->next; + $change = $self->_result->next; 1 # successfully fetched result } catch { @@ -151,7 +151,7 @@ sub next { and $error->_is_resumable ) { $retried = 1; - $self->_cursor($self->_build_cursor); + $self->_result($self->_build_result); } else { die $error; diff --git a/t/collection.t b/t/collection.t index 6ce2ef05..04a7fe90 100644 --- a/t/collection.t +++ b/t/collection.t @@ -1051,7 +1051,7 @@ subtest 'change streams w/ CursorNotFound reconnection' => sub { $testdb->run_command([ killCursors => $coll->name, - cursors => [$change_stream->_cursor->_cursor_id], + cursors => [$change_stream->_result->_cursor_id], ]); $coll->insert_one({ value => 302 }); From 3ba03dbaed3421b0a93066d143a714005ad100b3 Mon Sep 17 00:00:00 2001 From: Robert Sedlacek Date: Thu, 29 Mar 2018 04:26:10 +0200 Subject: [PATCH 12/23] Remove non-existant resume_token builder in ChangeStream --- lib/MongoDB/ChangeStream.pm | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/MongoDB/ChangeStream.pm b/lib/MongoDB/ChangeStream.pm index 2eecd214..9e8ed9c3 100644 --- a/lib/MongoDB/ChangeStream.pm +++ b/lib/MongoDB/ChangeStream.pm @@ -77,7 +77,6 @@ has _resume_token => ( init_arg => 'resume_after', predicate => '_has_resume_token', lazy => 1, - builder => '_build_resume_token', ); sub BUILD { From d97dffc03c23b0624b3dc145709cdefa023975e1 Mon Sep 17 00:00:00 2001 From: Robert Sedlacek Date: Thu, 29 Mar 2018 04:28:30 +0200 Subject: [PATCH 13/23] Rename ChangeStream->options to ->aggregation_options --- lib/MongoDB/ChangeStream.pm | 4 ++-- lib/MongoDB/Collection.pm | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/MongoDB/ChangeStream.pm b/lib/MongoDB/ChangeStream.pm index 9e8ed9c3..936d8675 100644 --- a/lib/MongoDB/ChangeStream.pm +++ b/lib/MongoDB/ChangeStream.pm @@ -55,7 +55,7 @@ has collection => ( required => 1, ); -has options => ( +has aggregation_options => ( is => 'ro', isa => HashRef, ); @@ -109,7 +109,7 @@ sub _build_result { return $self->collection->aggregate( \@pipeline, { - %{ $self->options || {} }, + %{ $self->aggregation_options || {} }, cursorType => 'tailable_await', }, ); diff --git a/lib/MongoDB/Collection.pm b/lib/MongoDB/Collection.pm index 076d3e3c..ca91eed7 100644 --- a/lib/MongoDB/Collection.pm +++ b/lib/MongoDB/Collection.pm @@ -1070,7 +1070,7 @@ sub watch { collection => $self, client => $self->client, pipeline => $pipeline, - options => $options, + aggregation_options => $options, ); } From 9b2a75fde2e568ff7b5f218dcdab1750894ad489 Mon Sep 17 00:00:00 2001 From: Robert Sedlacek Date: Thu, 29 Mar 2018 04:29:54 +0200 Subject: [PATCH 14/23] Removed leftover unused client argument to ChangeStream construction in Collection --- lib/MongoDB/Collection.pm | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/MongoDB/Collection.pm b/lib/MongoDB/Collection.pm index ca91eed7..733e927a 100644 --- a/lib/MongoDB/Collection.pm +++ b/lib/MongoDB/Collection.pm @@ -1068,7 +1068,6 @@ sub watch { ? (resume_after => delete $options->{resumeAfter}) : (), collection => $self, - client => $self->client, pipeline => $pipeline, aggregation_options => $options, ); From c147c70caa39318b88a77f1d4baf1f30a1f347b9 Mon Sep 17 00:00:00 2001 From: Robert Sedlacek Date: Thu, 29 Mar 2018 04:30:51 +0200 Subject: [PATCH 15/23] Removed unused variable --- lib/MongoDB/ChangeStream.pm | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/MongoDB/ChangeStream.pm b/lib/MongoDB/ChangeStream.pm index 936d8675..a4987bf7 100644 --- a/lib/MongoDB/ChangeStream.pm +++ b/lib/MongoDB/ChangeStream.pm @@ -89,8 +89,6 @@ sub BUILD { sub _build_result { my ($self) = @_; - my $pipeline = $self->pipeline; - my @pipeline = @{ $self->pipeline || [] }; @pipeline = ( {'$changeStream' => { From 8aa871edc638c432a6e5220f0b7bc82ec530c5ab Mon Sep 17 00:00:00 2001 From: Robert Sedlacek Date: Thu, 29 Mar 2018 04:32:44 +0200 Subject: [PATCH 16/23] Removed default empty array/hash ref constructs for pipeline/options since they are covered by type constraints and required --- lib/MongoDB/ChangeStream.pm | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/MongoDB/ChangeStream.pm b/lib/MongoDB/ChangeStream.pm index a4987bf7..11fb94c0 100644 --- a/lib/MongoDB/ChangeStream.pm +++ b/lib/MongoDB/ChangeStream.pm @@ -89,7 +89,7 @@ sub BUILD { sub _build_result { my ($self) = @_; - my @pipeline = @{ $self->pipeline || [] }; + my @pipeline = @{ $self->pipeline }; @pipeline = ( {'$changeStream' => { ($self->_has_full_document @@ -107,7 +107,7 @@ sub _build_result { return $self->collection->aggregate( \@pipeline, { - %{ $self->aggregation_options || {} }, + %{ $self->aggregation_options }, cursorType => 'tailable_await', }, ); From 4622361b7c95e996127bd2c5c2fcc10cc89dd4d7 Mon Sep 17 00:00:00 2001 From: Robert Sedlacek Date: Thu, 29 Mar 2018 04:36:56 +0200 Subject: [PATCH 17/23] Removed cursorType from Op::_Aggregate --- lib/MongoDB/ChangeStream.pm | 5 +---- lib/MongoDB/Op/_Aggregate.pm | 12 ------------ lib/MongoDB/Role/_CommandCursorOp.pm | 3 +-- 3 files changed, 2 insertions(+), 18 deletions(-) diff --git a/lib/MongoDB/ChangeStream.pm b/lib/MongoDB/ChangeStream.pm index 11fb94c0..2261bbe9 100644 --- a/lib/MongoDB/ChangeStream.pm +++ b/lib/MongoDB/ChangeStream.pm @@ -106,10 +106,7 @@ sub _build_result { return $self->collection->aggregate( \@pipeline, - { - %{ $self->aggregation_options }, - cursorType => 'tailable_await', - }, + $self->aggregation_options, ); } diff --git a/lib/MongoDB/Op/_Aggregate.pm b/lib/MongoDB/Op/_Aggregate.pm index 36c9a92a..e5f6d4b3 100644 --- a/lib/MongoDB/Op/_Aggregate.pm +++ b/lib/MongoDB/Op/_Aggregate.pm @@ -28,7 +28,6 @@ use Moo; use MongoDB::Op::_Command; use MongoDB::_Types qw( ArrayOfHashRef - CursorType ); use Types::Standard qw( Bool @@ -68,12 +67,6 @@ has maxAwaitTimeMS => ( isa => Num, ); -has cursorType => ( - is => 'rw', - isa => CursorType, - required => 1, -); - with $_ for qw( MongoDB::Role::_PrivateConstructor MongoDB::Role::_CollectionOp @@ -88,11 +81,6 @@ sub execute { my $options = $self->options; my $is_2_6 = $link->does_write_commands; - my $query_flags = { - tailable => ( $self->cursorType =~ /^tailable/ ? 1 : 0 ), - await_data => $self->cursorType eq 'tailable_await', - }; - # maxTimeMS isn't available until 2.6 and the aggregate command # will reject it as unrecognized delete $options->{maxTimeMS} unless $is_2_6; diff --git a/lib/MongoDB/Role/_CommandCursorOp.pm b/lib/MongoDB/Role/_CommandCursorOp.pm index 52b50ccd..ff2dc3f2 100644 --- a/lib/MongoDB/Role/_CommandCursorOp.pm +++ b/lib/MongoDB/Role/_CommandCursorOp.pm @@ -46,8 +46,7 @@ sub _build_result_from_cursor { $self->cursorType eq 'tailable_await') { $max_time_ms = $self->maxAwaitTimeMS if $self->maxAwaitTimeMS; } - elsif ($self->isa('MongoDB::Op::_Aggregate') && - $self->cursorType eq 'tailable_await') { + elsif ($self->isa('MongoDB::Op::_Aggregate')) { $max_time_ms = $self->maxAwaitTimeMS if $self->maxAwaitTimeMS; } From 0a581e6bb2c380a5766fb4d297f866b4d4c49375 Mon Sep 17 00:00:00 2001 From: Robert Sedlacek Date: Thu, 29 Mar 2018 04:37:59 +0200 Subject: [PATCH 18/23] Fixed ChangeStream->next documentation --- lib/MongoDB/ChangeStream.pm | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/MongoDB/ChangeStream.pm b/lib/MongoDB/ChangeStream.pm index 2261bbe9..2691fc01 100644 --- a/lib/MongoDB/ChangeStream.pm +++ b/lib/MongoDB/ChangeStream.pm @@ -122,8 +122,8 @@ sub _build_result { Waits for the next change in the collection and returns it. B: This method will wait for the amount of milliseconds passed -as C o L or the default. It -will not wait indefinitely. +as C to L or the server's +default wait-time. It will not wait indefinitely. =cut From 5fa52651172a2938abc1b0729b0c0a2237ec1fad Mon Sep 17 00:00:00 2001 From: Robert Sedlacek Date: Thu, 29 Mar 2018 04:39:45 +0200 Subject: [PATCH 19/23] Removed temporary for change _id --- lib/MongoDB/ChangeStream.pm | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/MongoDB/ChangeStream.pm b/lib/MongoDB/ChangeStream.pm index 2691fc01..4421a3d2 100644 --- a/lib/MongoDB/ChangeStream.pm +++ b/lib/MongoDB/ChangeStream.pm @@ -161,8 +161,7 @@ sub next { } if (exists $change->{_id}) { - my $resume_token = $change->{_id}; - $self->_resume_token($resume_token); + $self->_resume_token($change->{_id}); return $change; } else { From da8f5779e6e482620a436f8fb12d4226a32885f4 Mon Sep 17 00:00:00 2001 From: Robert Sedlacek Date: Thu, 29 Mar 2018 04:41:00 +0200 Subject: [PATCH 20/23] explicitly return undef instead of nothing when ChangeStream->next has no more changes --- lib/MongoDB/ChangeStream.pm | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/MongoDB/ChangeStream.pm b/lib/MongoDB/ChangeStream.pm index 4421a3d2..bb569797 100644 --- a/lib/MongoDB/ChangeStream.pm +++ b/lib/MongoDB/ChangeStream.pm @@ -157,7 +157,7 @@ sub next { # this differs from drivers that block indefinitely. we have to # deal with the situation where no results are available. if (not defined $change) { - return; + return undef; } if (exists $change->{_id}) { From c8dd4bb323c210247a1e9f32ec99500ad0ed0044 Mon Sep 17 00:00:00 2001 From: Robert Sedlacek Date: Thu, 29 Mar 2018 04:45:34 +0200 Subject: [PATCH 21/23] Fixed synopsis for ChangeStream --- lib/MongoDB/ChangeStream.pm | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/lib/MongoDB/ChangeStream.pm b/lib/MongoDB/ChangeStream.pm index bb569797..bf7b1f9a 100644 --- a/lib/MongoDB/ChangeStream.pm +++ b/lib/MongoDB/ChangeStream.pm @@ -176,8 +176,13 @@ sub next { =head1 SYNOPSIS $stream = $collection->watch( $pipeline, $options ); - while (my $change = $stream->next) { - ... + while(1) { + + # This inner loop will only iterate until there are no more + # changes available. + while (my $change = $stream->next) { + ... + } } =head1 DESCRIPTION From 27289bff20137e506120f2763a412c722868f51a Mon Sep 17 00:00:00 2001 From: Robert Sedlacek Date: Thu, 29 Mar 2018 04:47:11 +0200 Subject: [PATCH 22/23] Fixed usage example in ChangeStream->watch --- lib/MongoDB/Collection.pm | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/lib/MongoDB/Collection.pm b/lib/MongoDB/Collection.pm index 733e927a..b563ef37 100644 --- a/lib/MongoDB/Collection.pm +++ b/lib/MongoDB/Collection.pm @@ -1014,8 +1014,13 @@ available since MongoDB 3.6. my $stream = $collection->watch( \@pipeline ); my $stream = $collection->watch( \@pipeline, \%options ); - while (my $change = $stream->next) { - # process $change + while (1) { + + # This inner loop will only run until no more changes are + # available. + while (my $change = $stream->next) { + # process $change + } } The returned stream will not block forever waiting for changes. If you From fa7dde82727c95128dccee5b7185d6f0d077e935 Mon Sep 17 00:00:00 2001 From: Robert Sedlacek Date: Thu, 29 Mar 2018 04:49:48 +0200 Subject: [PATCH 23/23] Add note about where to obtain resumeAfter token to Collection->watch documentation --- lib/MongoDB/Collection.pm | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/MongoDB/Collection.pm b/lib/MongoDB/Collection.pm index b563ef37..b703e469 100644 --- a/lib/MongoDB/Collection.pm +++ b/lib/MongoDB/Collection.pm @@ -1046,6 +1046,8 @@ The optional second argument is a hash reference with options: changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred. * C - The logical starting point for this change stream. + This value can be obtained from the C<_id> field of a document returned + by L. * C - The maximum number of milliseconds for the server to wait before responding.