diff --git a/lib/MongoDB/ChangeStream.pm b/lib/MongoDB/ChangeStream.pm new file mode 100644 index 00000000..bf7b1f9a --- /dev/null +++ b/lib/MongoDB/ChangeStream.pm @@ -0,0 +1,199 @@ +# +# Copyright 2009-2018 MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +use strict; +use warnings; +package MongoDB::ChangeStream; + +# ABSTRACT: A stream providing update information for collections. + +use version; +our $VERSION = 'v1.999.0'; + +use Moo; +use Try::Tiny; +use MongoDB::Cursor; +use MongoDB::Op::_Aggregate; +use MongoDB::Error; +use Safe::Isa; +use MongoDB::_Types qw( + MongoDBCollection + ArrayOfHashRef +); +use Types::Standard qw( + InstanceOf + HashRef + Str +); + +use namespace::clean -except => 'meta'; + +has _result => ( + is => 'rw', + isa => InstanceOf['MongoDB::QueryResult'], + lazy => 1, + builder => '_build_result', + clearer => '_clear_result', +); + +has collection => ( + is => 'ro', + isa => MongoDBCollection, + required => 1, +); + +has aggregation_options => ( + is => 'ro', + isa => HashRef, +); + +has pipeline => ( + is => 'ro', + isa => ArrayOfHashRef, + required => 1, +); + +has full_document => ( + is => 'ro', + isa => Str, + predicate => '_has_full_document', +); + +has _resume_token => ( + is => 'rw', + init_arg => 'resume_after', + predicate => '_has_resume_token', + lazy => 1, +); + +sub BUILD { + my ($self) = @_; + + # starting point is construction time instead of first next call + $self->_result; +} + +sub _build_result { + my ($self) = @_; + + my @pipeline = @{ $self->pipeline }; + @pipeline = ( + {'$changeStream' => { + ($self->_has_full_document + ? (fullDocument => $self->full_document) + : () + ), + ($self->_has_resume_token + ? (resumeAfter => $self->_resume_token) + : () + ), + }}, + @pipeline, + ); + + return $self->collection->aggregate( + \@pipeline, + $self->aggregation_options, + ); +} + +=head1 STREAM METHODS + +=cut + +=head2 next + + $change_stream = $collection->watch(...); + $change = $change_stream->next; + +Waits for the next change in the collection and returns it. + +B: This method will wait for the amount of milliseconds passed +as C to L or the server's +default wait-time. It will not wait indefinitely. + +=cut + +sub next { + my ($self) = @_; + + my $change; + my $retried; + while (1) { + last if try { + $change = $self->_result->next; + 1 # successfully fetched result + } + catch { + my $error = $_; + if ( + not($retried) + and $error->$_isa('MongoDB::Error') + and $error->_is_resumable + ) { + $retried = 1; + $self->_result($self->_build_result); + } + else { + die $error; + } + 0 # failed, cursor was rebuilt + }; + } + + # this differs from drivers that block indefinitely. we have to + # deal with the situation where no results are available. + if (not defined $change) { + return undef; + } + + if (exists $change->{_id}) { + $self->_resume_token($change->{_id}); + return $change; + } + else { + MongoDB::InvalidOperationError->throw( + "Cannot provide resume functionality when the ". + "resume token is missing"); + } +} + +1; + +=head1 SYNOPSIS + + $stream = $collection->watch( $pipeline, $options ); + while(1) { + + # This inner loop will only iterate until there are no more + # changes available. + while (my $change = $stream->next) { + ... + } + } + +=head1 DESCRIPTION + +This class models change stream results as returned by the +L method. + +=head1 SEE ALSO + +The L. + +The L. + +=cut diff --git a/lib/MongoDB/Collection.pm b/lib/MongoDB/Collection.pm index 31ddf51d..b703e469 100644 --- a/lib/MongoDB/Collection.pm +++ b/lib/MongoDB/Collection.pm @@ -24,6 +24,7 @@ package MongoDB::Collection; use version; our $VERSION = 'v1.999.0'; +use MongoDB::ChangeStream; use MongoDB::Error; use MongoDB::IndexView; use MongoDB::InsertManyResult; @@ -1000,6 +1001,85 @@ sub find_one_and_update { return $self->_find_one_and_update_or_replace($filter, $update, $options); } +=method watch + +Watches for changes on this collection- + +Perform an aggregation with an implicit initial C<$changeStream> stage +and returns a L result which can be used to +iterate over the changes in the collection. This functionality is +available since MongoDB 3.6. + + my $stream = $collection->watch(); + my $stream = $collection->watch( \@pipeline ); + my $stream = $collection->watch( \@pipeline, \%options ); + + while (1) { + + # This inner loop will only run until no more changes are + # available. + while (my $change = $stream->next) { + # process $change + } + } + +The returned stream will not block forever waiting for changes. If you +want to respond to changes over a longer time use C and +regularly call C in a loop. + +B: Using this helper method is preferred to manually aggregating +with a C<$changeStream> stage, since it will automatically resume when +the connection was terminated. + +The optional first argument must be an array-ref of +L +documents. Each pipeline document must be a hash reference. Not all +pipeline stages are supported after C<$changeStream>. + +The optional second argument is a hash reference with options: + +=for :list +* C - The fullDocument to pass as an option to the + C<$changeStream> stage. Allowed values: C, C. + Defaults to C. When set to C, the change + notification for partial updates will include both a delta describing the + changes to the document, as well as a copy of the entire document that + was changed from some time after the change occurred. +* C - The logical starting point for this change stream. + This value can be obtained from the C<_id> field of a document returned + by L. +* C - The maximum number of milliseconds for the server + to wait before responding. + +See L for more available options. + +See the L +for general usage information on change streams. + +See the L +for details on change streams. + +=cut + +sub watch { + my ( $self, $pipeline, $options ) = @_; + + $pipeline ||= []; + $options ||= {}; + + return MongoDB::ChangeStream->new( + exists($options->{fullDocument}) + ? (full_document => delete $options->{fullDocument}) + : (full_document => 'default'), + exists($options->{resumeAfter}) + ? (resume_after => delete $options->{resumeAfter}) + : (), + collection => $self, + pipeline => $pipeline, + aggregation_options => $options, + ); +} + =method aggregate @pipeline = ( @@ -1082,6 +1162,12 @@ sub aggregate { options => $options, read_concern => $self->read_concern, has_out => $last_op eq '$out', + exists($options->{cursorType}) + ? (cursorType => delete $options->{cursorType}) + : (cursorType => 'non_tailable'), + exists($options->{maxAwaitTimeMS}) + ? (maxAwaitTimeMS => delete $options->{maxAwaitTimeMS}) + : (), %{ $self->_op_args }, ); diff --git a/lib/MongoDB/Error.pm b/lib/MongoDB/Error.pm index 06ba6615..cdc9ba5b 100644 --- a/lib/MongoDB/Error.pm +++ b/lib/MongoDB/Error.pm @@ -45,6 +45,7 @@ BEGIN { UNKNOWN_ERROR => 8, NAMESPACE_NOT_FOUND => 26, INDEX_NOT_FOUND => 27, + CURSOR_NOT_FOUND => 43, EXCEEDED_TIME_LIMIT => 50, COMMAND_NOT_FOUND => 59, WRITE_CONCERN_ERROR => 64, @@ -109,6 +110,10 @@ sub throw { die $throwable; } +# internal flag indicating if an operation should be retried when +# an error occurs. +sub _is_resumable { 1 } + #--------------------------------------------------------------------------# # Subclasses with attributes included inline below #--------------------------------------------------------------------------# @@ -134,6 +139,8 @@ has code => ( sub _build_code { return MongoDB::Error::UNKNOWN_ERROR() } +sub _is_resumable { 0 } + package MongoDB::DocumentError; use Moo; @@ -191,6 +198,8 @@ use Moo; use namespace::clean; extends 'MongoDB::Error'; +sub _is_resumable { 1 } + package MongoDB::HandshakeError; use Moo; use namespace::clean; @@ -229,6 +238,7 @@ use Moo; use namespace::clean; extends 'MongoDB::DatabaseError'; sub _build_code { return MongoDB::Error::NOT_MASTER() } +sub _is_resumable { 1 } package MongoDB::WriteError; use Moo; @@ -250,7 +260,9 @@ extends 'MongoDB::Error'; package MongoDB::CursorNotFoundError; use Moo; use namespace::clean; -extends 'MongoDB::Error'; +extends 'MongoDB::DatabaseError'; +sub _build_code { return MongoDB::Error::CURSOR_NOT_FOUND() } +sub _is_resumable { 1 } package MongoDB::DecodingError; use Moo; @@ -277,6 +289,11 @@ use Moo; use namespace::clean; extends 'MongoDB::Error'; +package MongoDB::InvalidOperationError; +use Moo; +use namespace::clean; +extends 'MongoDB::Error'; + #--------------------------------------------------------------------------# # Private error classes #--------------------------------------------------------------------------# @@ -347,10 +364,10 @@ To retry failures automatically, consider using L. | | | |->MongoDB::NetworkError | - |->MongoDB::CursorNotFoundError - | |->MongoDB::DatabaseError | | + | |->MongoDB::CursorNotFoundError + | | | |->MongoDB::DuplicateKeyError | | | |->MongoDB::NotMasterError @@ -367,6 +384,8 @@ To retry failures automatically, consider using L. | |->MongoDB::InternalError | + |->MongoDB::InvalidOperationError + | |->MongoDB::ProtocolError | |->MongoDB::SelectionError diff --git a/lib/MongoDB/Op/_Aggregate.pm b/lib/MongoDB/Op/_Aggregate.pm index 8b93ba0b..e5f6d4b3 100644 --- a/lib/MongoDB/Op/_Aggregate.pm +++ b/lib/MongoDB/Op/_Aggregate.pm @@ -33,6 +33,7 @@ use Types::Standard qw( Bool HashRef InstanceOf + Num ); use namespace::clean; @@ -61,6 +62,11 @@ has has_out => ( isa => Bool, ); +has maxAwaitTimeMS => ( + is => 'rw', + isa => Num, +); + with $_ for qw( MongoDB::Role::_PrivateConstructor MongoDB::Role::_CollectionOp diff --git a/lib/MongoDB/Role/_CommandCursorOp.pm b/lib/MongoDB/Role/_CommandCursorOp.pm index 21b26f85..ff2dc3f2 100644 --- a/lib/MongoDB/Role/_CommandCursorOp.pm +++ b/lib/MongoDB/Role/_CommandCursorOp.pm @@ -46,6 +46,9 @@ sub _build_result_from_cursor { $self->cursorType eq 'tailable_await') { $max_time_ms = $self->maxAwaitTimeMS if $self->maxAwaitTimeMS; } + elsif ($self->isa('MongoDB::Op::_Aggregate')) { + $max_time_ms = $self->maxAwaitTimeMS if $self->maxAwaitTimeMS; + } my $batch = $c->{firstBatch}; my $qr = MongoDB::QueryResult->_new( diff --git a/lib/MongoDB/Role/_DatabaseErrorThrower.pm b/lib/MongoDB/Role/_DatabaseErrorThrower.pm index a6e07b01..29a7bb24 100644 --- a/lib/MongoDB/Role/_DatabaseErrorThrower.pm +++ b/lib/MongoDB/Role/_DatabaseErrorThrower.pm @@ -49,6 +49,9 @@ sub _throw_database_error { elsif ( grep { $code == $_ } @$ANY_DUP_KEY ) { $error_class = "MongoDB::DuplicateKeyError"; } + elsif ( $code == CURSOR_NOT_FOUND ) { + $error_class = "MongoDB::CursorNotFoundError"; + } elsif ( $self->last_wtimeout ) { $error_class = "MongoDB::WriteConcernError"; } diff --git a/t/collection.t b/t/collection.t index 6ecae76c..04a7fe90 100644 --- a/t/collection.t +++ b/t/collection.t @@ -945,6 +945,121 @@ subtest "querying w/ collation" => sub { } }; +subtest 'change streams' => sub { + plan skip_all => 'MongoDB version 3.6 or higher required' + unless $server_version >= version->parse('v3.6.0'); + plan skip_all => 'MongoDB replica set required' + unless $server_type eq 'RSPrimary'; + + $coll->drop; + $coll->insert_one({ value => 1 }); + $coll->insert_one({ value => 2 }); + + my $change_stream = $coll->watch(); + is $change_stream->next, undef, 'next without changes'; + + for my $index (1..10) { + $coll->insert_one({ value => 100 + $index }); + } + my %changed; + while (my $change = $change_stream->next) { + is $changed{ $change->{fullDocument}{value} }++, 0, + 'first seen '.$change->{fullDocument}{value}; + } + is scalar(keys %changed), 10, 'seen all changes'; +}; + +subtest 'change streams w/ maxAwaitTimeMS' => sub { + plan skip_all => 'MongoDB version 3.6 or higher required' + unless $server_version >= version->parse('v3.6.0'); + plan skip_all => 'MongoDB replica set required' + unless $server_type eq 'RSPrimary'; + + $coll->drop; + my $change_stream = $coll->watch([], { maxAwaitTimeMS => 3000 }); + my $start = time; + is $change_stream->next, undef, 'next without changes'; + my $elapsed = time - $start; + my $min_elapsed = 2; + ok $elapsed > $min_elapsed, "waited for at least $min_elapsed secs"; +}; + +subtest 'change streams w/ fullDocument' => sub { + plan skip_all => 'MongoDB version 3.6 or higher required' + unless $server_version >= version->parse('v3.6.0'); + plan skip_all => 'MongoDB replica set required' + unless $server_type eq 'RSPrimary'; + + $coll->drop; + $coll->insert_one({ value => 1 }); + my $change_stream = $coll->watch( + [], + { fullDocument => 'updateLookup' }, + ); + $coll->update_one( + { value => 1 }, + { '$set' => { updated => 3 }}, + ); + my $change = $change_stream->next; + is $change->{operationType}, 'update', 'change is an update'; + ok exists($change->{fullDocument}), 'delta contains full document'; +}; + +subtest 'change streams w/ resumeAfter' => sub { + plan skip_all => 'MongoDB version 3.6 or higher required' + unless $server_version >= version->parse('v3.6.0'); + plan skip_all => 'MongoDB replica set required' + unless $server_type eq 'RSPrimary'; + + $coll->drop; + my $id = do { + my $change_stream = $coll->watch(); + $coll->insert_one({ value => 200 }); + $coll->insert_one({ value => 201 }); + my $change = $change_stream->next; + ok $change, 'change exists'; + is $change->{fullDocument}{value}, 200, + 'correct change'; + $change->{_id} + }; + do { + my $change_stream = $coll->watch( + [], + { resumeAfter => $id }, + ); + my $change = $change_stream->next; + ok $change, 'change exists after resume'; + is $change->{fullDocument}{value}, 201, + 'correct change after resume'; + is $change_stream->next, undef, 'no more changes'; + }; +}; + +subtest 'change streams w/ CursorNotFound reconnection' => sub { + plan skip_all => 'MongoDB version 3.6 or higher required' + unless $server_version >= version->parse('v3.6.0'); + plan skip_all => 'MongoDB replica set required' + unless $server_type eq 'RSPrimary'; + + $coll->drop; + + my $change_stream = $coll->watch; + $coll->insert_one({ value => 301 }); + my $change = $change_stream->next; + ok $change, 'change received'; + is $change->{fullDocument}{value}, 301, 'correct change'; + + $testdb->run_command([ + killCursors => $coll->name, + cursors => [$change_stream->_result->_cursor_id], + ]); + + $coll->insert_one({ value => 302 }); + $change = $change_stream->next; + ok $change, 'change received after reconnect'; + is $change->{fullDocument}{value}, 302, 'correct change'; +}; + my $js_str = 'function() { return this.a > this.b }'; my $js_obj = MongoDB::Code->new( code => $js_str );