diff --git a/lib/MongoDB/BulkWrite.pm b/lib/MongoDB/BulkWrite.pm index d2a03222..5a924865 100644 --- a/lib/MongoDB/BulkWrite.pm +++ b/lib/MongoDB/BulkWrite.pm @@ -265,7 +265,7 @@ sub execute { $write_concern ||= $self->collection->write_concern; - my $session = $self->collection->_get_session_from_hashref( $options ); + my $session = $self->_client->_get_session_from_hashref( $options ); my $op = MongoDB::Op::_BulkWrite->_new( client => $self->_client, diff --git a/lib/MongoDB/ChangeStream.pm b/lib/MongoDB/ChangeStream.pm index 1bfc9d30..04792eca 100644 --- a/lib/MongoDB/ChangeStream.pm +++ b/lib/MongoDB/ChangeStream.pm @@ -24,17 +24,23 @@ our $VERSION = 'v1.999.1'; use Moo; use Try::Tiny; use MongoDB::Cursor; -use MongoDB::Op::_Aggregate; +use MongoDB::Op::_ChangeStream; use MongoDB::Error; use Safe::Isa; +use BSON::Timestamp; use MongoDB::_Types qw( MongoDBCollection ArrayOfHashRef + Boolish + BSONTimestamp + ClientSession ); use Types::Standard qw( InstanceOf HashRef + Maybe Str + Num ); use namespace::clean -except => 'meta'; @@ -42,70 +48,140 @@ use namespace::clean -except => 'meta'; has _result => ( is => 'rw', isa => InstanceOf['MongoDB::QueryResult'], - lazy => 1, - builder => '_build_result', - clearer => '_clear_result', + init_arg => undef, ); -has collection => ( +has _client => ( is => 'ro', - isa => MongoDBCollection, + isa => InstanceOf['MongoDB::MongoClient'], + init_arg => 'client', required => 1, ); -has aggregation_options => ( +has _op_args => ( is => 'ro', isa => HashRef, + init_arg => 'op_args', + required => 1, ); -has pipeline => ( +has _pipeline => ( is => 'ro', isa => ArrayOfHashRef, + init_arg => 'pipeline', required => 1, ); -has full_document => ( +has _full_document => ( is => 'ro', isa => Str, + init_arg => 'full_document', predicate => '_has_full_document', ); -has _resume_token => ( - is => 'rw', +has _resume_after => ( + is => 'ro', init_arg => 'resume_after', - predicate => '_has_resume_token', - lazy => 1, + predicate => '_has_resume_after', +); + +has _all_changes_for_cluster => ( + is => 'ro', + isa => Boolish, + init_arg => 'all_changes_for_cluster', + default => sub { 0 }, +); + +has _start_at_operation_time => ( + is => 'ro', + isa => BSONTimestamp, + init_arg => 'start_at_operation_time', + predicate => '_has_start_at_operation_time', + coerce => sub { + ref($_[0]) ? $_[0] : BSON::Timestamp->new(seconds => $_[0]) + }, +); + +has _session => ( + is => 'ro', + isa => Maybe[ClientSession], + init_arg => 'session', +); + +has _options => ( + is => 'ro', + isa => HashRef, + init_arg => 'options', + default => sub { {} }, +); + +has _max_await_time_ms => ( + is => 'ro', + isa => Num, + init_arg => 'max_await_time_ms', + predicate => '_has_max_await_time_ms', +); + +has _last_operation_time => ( + is => 'rw', + init_arg => undef, + predicate => '_has_last_operation_time', +); + +has _last_resume_token => ( + is => 'rw', + init_arg => undef, + predicate => '_has_last_resume_token', ); sub BUILD { my ($self) = @_; # starting point is construction time instead of first next call - $self->_result; + $self->_execute_query; } -sub _build_result { +sub _execute_query { my ($self) = @_; - my @pipeline = @{ $self->pipeline }; - @pipeline = ( - {'$changeStream' => { - ($self->_has_full_document - ? (fullDocument => $self->full_document) - : () - ), - ($self->_has_resume_token - ? (resumeAfter => $self->_resume_token) - : () - ), - }}, - @pipeline, - ); + my $resume_opt = {}; + + # seen prior results, continuing after last resume token + if ($self->_has_last_resume_token) { + $resume_opt->{resume_after} = $self->_last_resume_token; + } + # no results yet, but we have operation time from prior query + elsif ($self->_has_last_operation_time) { + $resume_opt->{start_at_operation_time} = $self->_last_operation_time; + } + # no results and no prior operation time, send specified options + else { + $resume_opt->{start_at_operation_time} = $self->_start_at_operation_time + if $self->_has_start_at_operation_time; + $resume_opt->{resume_after} = $self->_resume_after + if $self->_has_resume_after; + } - return $self->collection->aggregate( - \@pipeline, - $self->aggregation_options, + my $op = MongoDB::Op::_ChangeStream->new( + pipeline => $self->_pipeline, + all_changes_for_cluster => $self->_all_changes_for_cluster, + session => $self->_session, + options => $self->_options, + client => $self->_client, + $self->_has_full_document + ? (full_document => $self->_full_document) + : (), + $self->_has_max_await_time_ms + ? (maxAwaitTimeMS => $self->_max_await_time_ms) + : (), + %$resume_opt, + %{ $self->_op_args }, ); + + my $res = $self->_client->send_read_op($op); + $self->_result($res->{result}); + $self->_last_operation_time($res->{operationTime}) + if exists $res->{operationTime}; } =head1 STREAM METHODS @@ -143,7 +219,7 @@ sub next { and $error->_is_resumable ) { $retried = 1; - $self->_result($self->_build_result); + $self->_execute_query; } else { die $error; @@ -159,7 +235,7 @@ sub next { } if (exists $change->{_id}) { - $self->_resume_token($change->{_id}); + $self->_last_resume_token($change->{_id}); return $change; } else { diff --git a/lib/MongoDB/Collection.pm b/lib/MongoDB/Collection.pm index 0472a4fd..a6bceac4 100644 --- a/lib/MongoDB/Collection.pm +++ b/lib/MongoDB/Collection.pm @@ -372,7 +372,7 @@ sub insert_one { return $_[0]->client->send_retryable_write_op( MongoDB::Op::_InsertOne->_new( - session => $_[0]->_get_session_from_hashref( $_[2] ), + session => $_[0]->client->_get_session_from_hashref( $_[2] ), ( defined $_[2] ? (%{$_[2]}) : () ), document => $_[1], %{ $_[0]->_op_args }, @@ -426,7 +426,7 @@ sub insert_many { # default ordered => 1, # user overrides - session => $_[0]->_get_session_from_hashref( $_[2] ), + session => $_[0]->client->_get_session_from_hashref( $_[2] ), ( defined $_[2] ? ( %{ $_[2] } ) : () ), # un-overridable queue => [ map { [ insert => $_ ] } @{ $_[1] } ], @@ -478,7 +478,7 @@ sub delete_one { return $_[0]->client->send_retryable_write_op( MongoDB::Op::_Delete->_new( - session => $_[0]->_get_session_from_hashref( $_[2] ), + session => $_[0]->client->_get_session_from_hashref( $_[2] ), ( defined $_[2] ? (%{$_[2]}) : () ), filter => $_[1], just_one => 1, @@ -515,7 +515,7 @@ sub delete_many { return $_[0]->client->send_write_op( MongoDB::Op::_Delete->_new( - session => $_[0]->_get_session_from_hashref( $_[2] ), + session => $_[0]->client->_get_session_from_hashref( $_[2] ), ( defined $_[2] ? (%{$_[2]}) : () ), filter => $_[1], just_one => 0, @@ -560,7 +560,7 @@ sub replace_one { return $_[0]->client->send_retryable_write_op( MongoDB::Op::_Update->_new( - session => $_[0]->_get_session_from_hashref( $_[3] ), + session => $_[0]->client->_get_session_from_hashref( $_[3] ), ( defined $_[3] ? (%{$_[3]}) : () ), filter => $_[1], update => $_[2], @@ -611,7 +611,7 @@ sub update_one { return $_[0]->client->send_retryable_write_op( MongoDB::Op::_Update->_new( - session => $_[0]->_get_session_from_hashref( $_[3] ), + session => $_[0]->client->_get_session_from_hashref( $_[3] ), ( defined $_[3] ? (%{$_[3]}) : () ), filter => $_[1], update => $_[2], @@ -662,7 +662,7 @@ sub update_many { return $_[0]->client->send_write_op( MongoDB::Op::_Update->_new( - session => $_[0]->_get_session_from_hashref( $_[3] ), + session => $_[0]->client->_get_session_from_hashref( $_[3] ), ( defined $_[3] ? (%{$_[3]}) : () ), filter => $_[1], update => $_[2], @@ -771,7 +771,7 @@ sub find { $options->{maxTimeMS} = $self->max_time_ms; } - my $session = $self->_get_session_from_hashref( $options ); + my $session = $self->client->_get_session_from_hashref( $options ); # coerce to IxHash __ixhash( $options, 'sort' ); @@ -836,7 +836,7 @@ sub find_one { $options->{maxTimeMS} = $self->max_time_ms; } - my $session = $self->_get_session_from_hashref( $options ); + my $session = $self->client->_get_session_from_hashref( $options ); # coerce to IxHash __ixhash( $options, 'sort' ); @@ -919,7 +919,7 @@ sub find_one_and_delete { $options->{maxTimeMS} = $self->max_time_ms; } - my $session = $self->_get_session_from_hashref( $options ); + my $session = $self->client->_get_session_from_hashref( $options ); # coerce to IxHash __ixhash($options, 'sort'); @@ -1081,9 +1081,16 @@ The optional second argument is a hash reference with options: was changed from some time after the change occurred. * C - The logical starting point for this change stream. This value can be obtained from the C<_id> field of a document returned - by L. + by L. Cannot be specified together with + C * C - The maximum number of milliseconds for the server to wait before responding. +* C - A L specifying at what point + in time changes will start being watched. Cannot be specified together + with C. Plain values will be coerced to L + objects. +* C - the session to use for these operations. If not supplied, will + use an implicit session. For more information see L See L for more available options. @@ -1101,16 +1108,26 @@ sub watch { $pipeline ||= []; $options ||= {}; + my $session = $self->client->_get_session_from_hashref( $options ); + return MongoDB::ChangeStream->new( + exists($options->{startAtOperationTime}) + ? (start_at_operation_time => delete $options->{startAtOperationTime}) + : (), exists($options->{fullDocument}) ? (full_document => delete $options->{fullDocument}) : (full_document => 'default'), exists($options->{resumeAfter}) ? (resume_after => delete $options->{resumeAfter}) : (), - collection => $self, + exists($options->{maxAwaitTimeMS}) + ? (max_await_time_ms => delete $options->{maxAwaitTimeMS}) + : (), + client => $self->client, pipeline => $pipeline, - aggregation_options => $options, + session => $session, + options => $options, + op_args => $self->_op_args, ); } @@ -1180,7 +1197,7 @@ sub aggregate { my ( $self, $pipeline, $options ) = @_; $options ||= {}; - my $session = $self->_get_session_from_hashref( $options ); + my $session = $self->client->_get_session_from_hashref( $options ); # boolify some options for my $k (qw/allowDiskUse explain/) { @@ -1357,7 +1374,7 @@ sub distinct { $options->{maxTimeMS} = $self->max_time_ms; } - my $session = $self->_get_session_from_hashref( $options ); + my $session = $self->client->_get_session_from_hashref( $options ); my $op = MongoDB::Op::_Distinct->_new( fieldname => $fieldname, @@ -1461,7 +1478,7 @@ collection. sub rename { my ( $self, $new_name, $options ) = @_; - my $session = $self->_get_session_from_hashref( $options ); + my $session = $self->client->_get_session_from_hashref( $options ); my $op = MongoDB::Op::_RenameCollection->_new( src_ns => $self->full_name, @@ -1487,7 +1504,7 @@ Deletes a collection as well as all of its indexes. sub drop { my ( $self, $options ) = @_; - my $session = $self->_get_session_from_hashref( $options ); + my $session = $self->client->_get_session_from_hashref( $options ); $self->client->send_write_op( MongoDB::Op::_DropCollection->_new( @@ -1629,7 +1646,7 @@ sub bulk_write { my $ordered = exists $options->{ordered} ? delete $options->{ordered} : 1; - my $session = $self->_get_session_from_hashref( $options ); + my $session = $self->client->_get_session_from_hashref( $options ); my $bulk = $ordered ? $self->ordered_bulk($options) : $self->unordered_bulk($options); @@ -1760,7 +1777,7 @@ sub _find_one_and_update_or_replace { # pass separately for MongoDB::Role::_BypassValidation my $bypass = delete $options->{bypassDocumentValidation}; - my $session = $self->_get_session_from_hashref( $options ); + my $session = $self->client->_get_session_from_hashref( $options ); my $op = MongoDB::Op::_FindAndUpdate->_new( filter => $filter, @@ -1776,24 +1793,6 @@ sub _find_one_and_update_or_replace { : $self->client->send_write_op( $op ); } -# Extracts a session from a provided hashref, or returns an implicit session -sub _get_session_from_hashref { - my ( $self, $hashref ) = @_; - - my $session = delete $hashref->{session}; - - if ( defined $session ) { - MongoDB::UsageError->throw( "Cannot use session from another client" ) - if ( $session->client->_id ne $self->client->_id ); - MongoDB::UsageError->throw( "Cannot use session which has ended" ) - if ! defined $session->session_id; - } else { - $session = $self->client->_maybe_get_implicit_session; - } - - return $session; -} - #--------------------------------------------------------------------------# # Deprecated functions #--------------------------------------------------------------------------# @@ -1811,7 +1810,7 @@ sub count { $options->{maxTimeMS} = $self->max_time_ms; } - my $session = $self->_get_session_from_hashref($options); + my $session = $self->client->_get_session_from_hashref($options); # string is OK so we check ref, not just exists __ixhash( $options, 'hint' ) if ref $options->{hint}; diff --git a/lib/MongoDB/Database.pm b/lib/MongoDB/Database.pm index e9bcba52..06e24e2a 100644 --- a/lib/MongoDB/Database.pm +++ b/lib/MongoDB/Database.pm @@ -196,7 +196,7 @@ sub list_collections { $options->{maxTimeMS} = $self->max_time_ms; } - my $session = $self->_get_session_from_hashref( $options ); + my $session = $self->_client->_get_session_from_hashref( $options ); my $op = MongoDB::Op::_ListCollections->_new( db_name => $self->name, @@ -333,7 +333,7 @@ Valid options include: sub drop { my ( $self, $options ) = @_; - my $session = $self->_get_session_from_hashref( $options ); + my $session = $self->_client->_get_session_from_hashref( $options ); return $self->_client->send_write_op( MongoDB::Op::_DropDatabase->_new( @@ -404,7 +404,7 @@ sub run_command { ref($read_pref) ? $read_pref : ( mode => $read_pref ) ) if $read_pref && ref($read_pref) ne 'MongoDB::ReadPreference'; - my $session = $self->_get_session_from_hashref( $options ); + my $session = $self->_client->_get_session_from_hashref( $options ); my $op = MongoDB::Op::_Command->_new( client => $self->_client, @@ -429,7 +429,7 @@ sub _aggregate { my ( $self, $pipeline, $options ) = @_; $options ||= {}; - my $session = $self->_get_session_from_hashref( $options ); + my $session = $self->_client->_get_session_from_hashref( $options ); # boolify some options for my $k (qw/allowDiskUse explain/) { @@ -463,24 +463,73 @@ sub _aggregate { return $self->_client->send_read_op($op); } -# Extracts a session from a provided hashref, or returns an implicit session -# Almost identical to same subroutine in Collection, however in Database the -# client attribute is private. -sub _get_session_from_hashref { - my ( $self, $hashref ) = @_; - - my $session = delete $hashref->{session}; - - if ( defined $session ) { - MongoDB::UsageError->throw( "Cannot use session from another client" ) - if ( $session->client->_id ne $self->_client->_id ); - MongoDB::UsageError->throw( "Cannot use session which has ended" ) - if ! defined $session->session_id; - } else { - $session = $self->_client->_maybe_get_implicit_session; +=method watch + +Watches for changes on this database. + +Perform an aggregation with an implicit initial C<$changeStream> stage +and returns a L result which can be used to +iterate over the changes in the database. This functionality is +available since MongoDB 4.0. + + my $stream = $db->watch(); + my $stream = $db->watch( \@pipeline ); + my $stream = $db->watch( \@pipeline, \%options ); + + while (1) { + + # This inner loop will only run until no more changes are + # available. + while (my $change = $stream->next) { + # process $change + } } - return $session; +The returned stream will not block forever waiting for changes. If you +want to respond to changes over a longer time use C and +regularly call C in a loop. + +See L for details on usage and available +options. + +=cut + +sub watch { + my ( $self, $pipeline, $options ) = @_; + + $pipeline ||= []; + $options ||= {}; + + my $session = $self->_client->_get_session_from_hashref( $options ); + + return MongoDB::ChangeStream->new( + exists($options->{startAtOperationTime}) + ? (start_at_operation_time => delete $options->{startAtOperationTime}) + : (), + exists($options->{fullDocument}) + ? (full_document => delete $options->{fullDocument}) + : (full_document => 'default'), + exists($options->{resumeAfter}) + ? (resume_after => delete $options->{resumeAfter}) + : (), + exists($options->{maxAwaitTimeMS}) + ? (max_await_time_ms => delete $options->{maxAwaitTimeMS}) + : (), + client => $self->_client, + pipeline => $pipeline, + session => $session, + options => $options, + op_args => { + read_concern => $self->read_concern, + bson_codec => $self->bson_codec, + db_name => $self->name, + coll_name => 1, # Magic not-an-actual-collection number + full_name => $self->name . ".1", + read_preference => $self->read_preference, + write_concern => $self->write_concern, + monitoring_callback => $self->_client->monitoring_callback, + }, + ); } 1; diff --git a/lib/MongoDB/MongoClient.pm b/lib/MongoDB/MongoClient.pm index 0919f6f2..1b08d9c3 100644 --- a/lib/MongoDB/MongoClient.pm +++ b/lib/MongoDB/MongoClient.pm @@ -1870,6 +1870,94 @@ sub fsync_unlock { return $self->send_primary_op($op); } +sub _get_session_from_hashref { + my ( $self, $hashref ) = @_; + + my $session = delete $hashref->{session}; + + if ( defined $session ) { + MongoDB::UsageError->throw( "Cannot use session from another client" ) + if ( $session->client->_id ne $self->_id ); + MongoDB::UsageError->throw( "Cannot use session which has ended" ) + if ! defined $session->session_id; + } else { + $session = $self->_maybe_get_implicit_session; + } + + return $session; +} + +=method watch + +Watches for changes on the cluster. + +Perform an aggregation with an implicit initial C<$changeStream> stage +and returns a L result which can be used to +iterate over the changes in the cluster. This functionality is +available since MongoDB 4.0. + + my $stream = $client->watch(); + my $stream = $client->watch( \@pipeline ); + my $stream = $client->watch( \@pipeline, \%options ); + + while (1) { + + # This inner loop will only run until no more changes are + # available. + while (my $change = $stream->next) { + # process $change + } + } + +The returned stream will not block forever waiting for changes. If you +want to respond to changes over a longer time use C and +regularly call C in a loop. + +See L for details on usage and available +options. + +=cut + +sub watch { + my ( $self, $pipeline, $options ) = @_; + + $pipeline ||= []; + $options ||= {}; + + my $session = $self->_get_session_from_hashref( $options ); + + return MongoDB::ChangeStream->new( + exists($options->{startAtOperationTime}) + ? (start_at_operation_time => delete $options->{startAtOperationTime}) + : (), + exists($options->{fullDocument}) + ? (full_document => delete $options->{fullDocument}) + : (full_document => 'default'), + exists($options->{resumeAfter}) + ? (resume_after => delete $options->{resumeAfter}) + : (), + exists($options->{maxAwaitTimeMS}) + ? (max_await_time_ms => delete $options->{maxAwaitTimeMS}) + : (), + client => $self, + all_changes_for_cluster => 1, + pipeline => $pipeline, + session => $session, + options => $options, + op_args => { + read_concern => $self->read_concern, + db_name => 'admin',, + coll_name => 1, + full_name => 'admin.1', + bson_codec => $self->bson_codec, + write_concern => $self->write_concern, + read_concern => $self->read_concern, + read_preference => $self->read_preference, + monitoring_callback => $self->monitoring_callback, + }, + ); +} + 1; diff --git a/lib/MongoDB/Op/_ChangeStream.pm b/lib/MongoDB/Op/_ChangeStream.pm new file mode 100644 index 00000000..dfb8ea44 --- /dev/null +++ b/lib/MongoDB/Op/_ChangeStream.pm @@ -0,0 +1,214 @@ +# Copyright 2018 - present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +use strict; +use warnings; +package MongoDB::Op::_ChangeStream; + +# Encapsulate changestream operation; return MongoDB::QueryResult +# and operationTime if supported + +use version; +our $VERSION = 'v1.999.1'; + +use Moo; + +use boolean; +use BSON::Timestamp; +use MongoDB::Op::_Command; +use MongoDB::_Types qw( + ArrayOfHashRef + Boolish + BSONTimestamp +); +use Types::Standard qw( + HashRef + InstanceOf + Num + Str + Maybe +); + +use namespace::clean; + +has client => ( + is => 'ro', + required => 1, + isa => InstanceOf ['MongoDB::MongoClient'], +); + +has pipeline => ( + is => 'ro', + required => 1, + isa => ArrayOfHashRef, +); + +has options => ( + is => 'ro', + required => 1, + isa => HashRef, +); + +has maxAwaitTimeMS => ( + is => 'rw', + isa => Num, +); + +has full_document => ( + is => 'ro', + isa => Str, + predicate => 'has_full_document', +); + +has resume_after => ( + is => 'ro', + predicate => 'has_resume_after', +); + +has all_changes_for_cluster => ( + is => 'ro', + isa => Boolish, + default => sub { 0 }, +); + +has start_at_operation_time => ( + is => 'ro', + isa => BSONTimestamp, + predicate => 'has_start_at_operation_time', +); + +with $_ for qw( + MongoDB::Role::_PrivateConstructor + MongoDB::Role::_CollectionOp + MongoDB::Role::_ReadOp + MongoDB::Role::_WriteOp + MongoDB::Role::_CommandCursorOp +); + +sub execute { + my ( $self, $link, $topology ) = @_; + + my $options = $self->options; + my $is_2_6 = $link->supports_write_commands; + + # maxTimeMS isn't available until 2.6 and the aggregate command + # will reject it as unrecognized + delete $options->{maxTimeMS} unless $is_2_6; + + # bypassDocumentValidation isn't available until 3.2 (wire version 4) + delete $options->{bypassDocumentValidation} unless $link->supports_document_validation; + + if ( defined $options->{collation} and !$link->supports_collation ) { + MongoDB::UsageError->throw( + "MongoDB host '" . $link->address . "' doesn't support collation" ); + } + + # If 'cursor' is explicitly false, we disable using cursors, even + # for MongoDB 2.6+. This allows users operating with a 2.6+ mongos + # and pre-2.6 mongod in shards to avoid fatal errors. This + # workaround should be removed once MongoDB 2.4 is no longer supported. + my $use_cursor = $is_2_6 + && ( !exists( $options->{cursor} ) || $options->{cursor} ); + + # batchSize is not a command parameter itself like other options + my $batchSize = delete $options->{batchSize}; + + # If we're doing cursors, we first respect an explicit batchSize option; + # next we fallback to the legacy (deprecated) cursor option batchSize; finally we + # just give an empty document. Other than batchSize we ignore any other + # legacy cursor options. If we're not doing cursors, don't send any + # cursor option at all, as servers will choke on it. + if ($use_cursor) { + if ( defined $batchSize ) { + $options->{cursor} = { batchSize => $batchSize }; + } + elsif ( ref $options->{cursor} eq 'HASH' ) { + $batchSize = $options->{cursor}{batchSize}; + $options->{cursor} = defined($batchSize) ? { batchSize => $batchSize } : {}; + } + else { + $options->{cursor} = {}; + } + } + else { + delete $options->{cursor}; + } + + if ( $self->coll_name eq 1 && ! $link->supports_db_aggregation ) { + MongoDB::Error->throw( + "Calling aggregate with a collection name of '1' is not supported on Wire Version < 6" ); + } + + my @pipeline = ( + {'$changeStream' => { + ($self->has_start_at_operation_time + ? (startAtOperationTime => $self->start_at_operation_time) + : () + ), + ($self->all_changes_for_cluster + ? (allChangesForCluster => true) + : () + ), + ($self->has_full_document + ? (fullDocument => $self->full_document) + : () + ), + ($self->has_resume_after + ? (resumeAfter => $self->resume_after) + : () + ), + }}, + @{ $self->pipeline }, + ); + + my @command = ( + aggregate => $self->coll_name, + pipeline => \@pipeline, + %$options, + $link->supports_read_concern + ? @{ $self->read_concern->as_args( $self->session) } + : (), + ); + + my $op = MongoDB::Op::_Command->_new( + db_name => $self->db_name, + query => Tie::IxHash->new(@command), + query_flags => {}, + bson_codec => $self->bson_codec, + read_preference => $self->read_preference, + session => $self->session, + monitoring_callback => $self->monitoring_callback, + ); + + my $res = $op->execute( $link, $topology ); + + # Fake up a single-batch cursor if we didn't get a cursor response. + # We use the 'results' fields as the first (and only) batch + if ( !$res->output->{cursor} ) { + $res->output->{cursor} = { + ns => '', + id => 0, + firstBatch => ( delete $res->output->{result} ) || [], + }; + } + + return { + result => $self->_build_result_from_cursor($res), + $link->supports_4_0_changestreams + ? (operationTime => $res->output->{operationTime}) + : (), + }; +} + +1; diff --git a/lib/MongoDB/Role/_CommandCursorOp.pm b/lib/MongoDB/Role/_CommandCursorOp.pm index f62a1b62..fe70c0fd 100644 --- a/lib/MongoDB/Role/_CommandCursorOp.pm +++ b/lib/MongoDB/Role/_CommandCursorOp.pm @@ -44,7 +44,10 @@ sub _build_result_from_cursor { $self->options->{cursorType} eq 'tailable_await') { $max_time_ms = $self->options->{maxAwaitTimeMS} if $self->options->{maxAwaitTimeMS}; } - elsif ($self->isa('MongoDB::Op::_Aggregate')) { + elsif ( + $self->isa('MongoDB::Op::_Aggregate') || + $self->isa('MongoDB::Op::_ChangeStream') + ) { $max_time_ms = $self->maxAwaitTimeMS if $self->maxAwaitTimeMS; } diff --git a/lib/MongoDB/_Link.pm b/lib/MongoDB/_Link.pm index 7e4dc153..105d42ca 100644 --- a/lib/MongoDB/_Link.pm +++ b/lib/MongoDB/_Link.pm @@ -215,6 +215,12 @@ has supports_retryWrites => ( isa => Boolish, ); +has supports_4_0_changestreams => ( + is => 'rwp', + init_arg => undef, + isa => Boolish, +); + my @connection_state_fields = qw( fh connected rcvbuf last_used fdset is_ssl ); @@ -350,6 +356,9 @@ sub set_metadata { : 0 ); } + if ( $self->accepts_wire_version(7) ) { + $self->_set_supports_4_0_changestreams(1); + } return; } diff --git a/t/changestreams.t b/t/changestreams.t index 39da1ad6..7a2c7125 100644 --- a/t/changestreams.t +++ b/t/changestreams.t @@ -20,129 +20,191 @@ use Test::More 0.96; use MongoDB; use lib "t/lib"; -use MongoDBTest qw/skip_unless_mongod build_client get_test_db server_version server_type/; +use MongoDBTest qw/ + skip_unless_mongod + skip_unless_sessions + uuid_to_string + build_client + get_test_db + server_version + server_type +/; skip_unless_mongod(); -my $conn = build_client(); +my @events; + +my $conn = build_client(monitoring_callback => sub { + push @events, shift; +}); + my $testdb = get_test_db($conn); my $server_version = server_version($conn); my $server_type = server_type($conn); my $coll = $testdb->get_collection('test_collection'); -subtest 'change streams' => sub { - plan skip_all => 'MongoDB version 3.6 or higher required' - unless $server_version >= version->parse('v3.6.0'); - plan skip_all => 'MongoDB replica set required' - unless $server_type eq 'RSPrimary'; - - $coll->drop; - $coll->insert_one({ value => 1 }); - $coll->insert_one({ value => 2 }); - - my $change_stream = $coll->watch(); - is $change_stream->next, undef, 'next without changes'; - - for my $index (1..10) { - $coll->insert_one({ value => 100 + $index }); - } - my %changed; - while (my $change = $change_stream->next) { - is $changed{ $change->{fullDocument}{value} }++, 0, - 'first seen '.$change->{fullDocument}{value}; - } - is scalar(keys %changed), 10, 'seen all changes'; +plan skip_all => 'MongoDB replica set required' + unless $server_type eq 'RSPrimary'; + +subtest 'client' => sub { + plan skip_all => 'MongoDB version 4.0 or higher required' + unless $server_version >= version->parse('v4.0.0'); + run_tests_for($conn); }; -subtest 'change streams w/ maxAwaitTimeMS' => sub { - plan skip_all => 'MongoDB version 3.6 or higher required' - unless $server_version >= version->parse('v3.6.0'); - plan skip_all => 'MongoDB replica set required' - unless $server_type eq 'RSPrimary'; - - $coll->drop; - my $change_stream = $coll->watch([], { maxAwaitTimeMS => 3000 }); - my $start = time; - is $change_stream->next, undef, 'next without changes'; - my $elapsed = time - $start; - my $min_elapsed = 2; - ok $elapsed > $min_elapsed, "waited for at least $min_elapsed secs"; +subtest 'database' => sub { + plan skip_all => 'MongoDB version 4.0 or higher required' + unless $server_version >= version->parse('v4.0.0'); + run_tests_for($testdb); }; -subtest 'change streams w/ fullDocument' => sub { +subtest 'collection' => sub { plan skip_all => 'MongoDB version 3.6 or higher required' unless $server_version >= version->parse('v3.6.0'); - plan skip_all => 'MongoDB replica set required' - unless $server_type eq 'RSPrimary'; - - $coll->drop; - $coll->insert_one({ value => 1 }); - my $change_stream = $coll->watch( - [], - { fullDocument => 'updateLookup' }, - ); - $coll->update_one( - { value => 1 }, - { '$set' => { updated => 3 }}, - ); - my $change = $change_stream->next; - is $change->{operationType}, 'update', 'change is an update'; - ok exists($change->{fullDocument}), 'delta contains full document'; + run_tests_for($coll); }; -subtest 'change streams w/ resumeAfter' => sub { - plan skip_all => 'MongoDB version 3.6 or higher required' - unless $server_version >= version->parse('v3.6.0'); - plan skip_all => 'MongoDB replica set required' - unless $server_type eq 'RSPrimary'; - - $coll->drop; - my $id = do { - my $change_stream = $coll->watch(); - $coll->insert_one({ value => 200 }); - $coll->insert_one({ value => 201 }); - my $change = $change_stream->next; - ok $change, 'change exists'; - is $change->{fullDocument}{value}, 200, - 'correct change'; - $change->{_id} +done_testing; + +sub run_tests_for { + my ($watchable) = @_; + + subtest 'basic' => sub { + $coll->drop; + $coll->insert_one({ value => 1 }); + $coll->insert_one({ value => 2 }); + + my $change_stream = $watchable->watch(); + is $change_stream->next, undef, 'next without changes'; + + for my $index (1..10) { + $coll->insert_one({ value => 100 + $index }); + } + my %changed; + while (my $change = $change_stream->next) { + is $changed{ $change->{fullDocument}{value} }++, 0, + 'first seen '.$change->{fullDocument}{value}; + } + is scalar(keys %changed), 10, 'seen all changes'; + }; + + subtest 'change streams w/ maxAwaitTimeMS' => sub { + $coll->drop; + my $change_stream = $watchable->watch([], { maxAwaitTimeMS => 3000 }); + my $start = time; + is $change_stream->next, undef, 'next without changes'; + my $elapsed = time - $start; + my $min_elapsed = 2; + ok $elapsed > $min_elapsed, "waited for at least $min_elapsed secs"; }; - do { - my $change_stream = $coll->watch( + + subtest 'change streams w/ fullDocument' => sub { + $coll->drop; + $coll->insert_one({ value => 1 }); + my $change_stream = $watchable->watch( [], - { resumeAfter => $id }, + { fullDocument => 'updateLookup' }, + ); + $coll->update_one( + { value => 1 }, + { '$set' => { updated => 3 }}, ); my $change = $change_stream->next; - ok $change, 'change exists after resume'; - is $change->{fullDocument}{value}, 201, - 'correct change after resume'; - is $change_stream->next, undef, 'no more changes'; + is $change->{operationType}, 'update', 'change is an update'; + ok exists($change->{fullDocument}), 'delta contains full document'; }; -}; -subtest 'change streams w/ CursorNotFound reconnection' => sub { - plan skip_all => 'MongoDB version 3.6 or higher required' - unless $server_version >= version->parse('v3.6.0'); - plan skip_all => 'MongoDB replica set required' - unless $server_type eq 'RSPrimary'; - - $coll->drop; - - my $change_stream = $coll->watch; - $coll->insert_one({ value => 301 }); - my $change = $change_stream->next; - ok $change, 'change received'; - is $change->{fullDocument}{value}, 301, 'correct change'; - - $testdb->run_command([ - killCursors => $coll->name, - cursors => [$change_stream->_result->_cursor_id], - ]); - - $coll->insert_one({ value => 302 }); - $change = $change_stream->next; - ok $change, 'change received after reconnect'; - is $change->{fullDocument}{value}, 302, 'correct change'; -}; + subtest 'change streams w/ resumeAfter' => sub { + $coll->drop; + my $id = do { + my $change_stream = $watchable->watch(); + $coll->insert_one({ value => 200 }); + $coll->insert_one({ value => 201 }); + my $change = $change_stream->next; + ok $change, 'change exists'; + is $change->{fullDocument}{value}, 200, + 'correct change'; + $change->{_id} + }; + do { + my $change_stream = $watchable->watch( + [], + { resumeAfter => $id }, + ); + my $change = $change_stream->next; + ok $change, 'change exists after resume'; + is $change->{fullDocument}{value}, 201, + 'correct change after resume'; + is $change_stream->next, undef, 'no more changes'; + }; + }; -done_testing; + subtest 'change streams w/ CursorNotFound reconnection' => sub { + $coll->drop; + + my $change_stream = $watchable->watch; + $coll->insert_one({ value => 301 }); + my $change = $change_stream->next; + ok $change, 'change received'; + is $change->{fullDocument}{value}, 301, 'correct change'; + + $testdb->run_command([ + killCursors => $coll->name, + cursors => [$change_stream->_result->_cursor_id], + ]); + + $coll->insert_one({ value => 302 }); + $change = $change_stream->next; + ok $change, 'change received after reconnect'; + is $change->{fullDocument}{value}, 302, 'correct change'; + }; + + subtest 'startAtOperationTime' => sub { + plan skip_all => 'MongoDB version 4.0 or higher required' + unless $server_version >= version->parse('v4.0.0'); + + $coll->drop; + + my $change_stream = $watchable->watch([], { + startAtOperationTime => scalar(time + 3), + }); + $coll->insert_one({ value => 401 }); + sleep 4; + $coll->insert_one({ value => 402 }); + + my $change = $change_stream->next; + ok $change, 'change received'; + is $change->{fullDocument}{value}, 402, 'correct change'; + + ok !defined($change_stream->next), 'no more changes'; + }; + + subtest 'sessions' => sub { + skip_unless_sessions(); + @events = (); + + my $session = $conn->start_session; + + my $change_stream = $watchable->watch([], { + session => $session, + }); + $change_stream->next; + + my ($event) = grep { + $_->{commandName} eq 'aggregate' and + $_->{type} eq 'command_started' + } @events; + + ok(defined($event), 'found event') + or return; + + my $lsid = uuid_to_string( + $session->_server_session->session_id->{id}->data, + ); + + my $command = $event->{command}; + my $command_sid = uuid_to_string($command->{lsid}{id}->data); + + is $command_sid, $lsid, 'command has correct session id'; + }; +} diff --git a/t/changestreams_spec.t b/t/changestreams_spec.t new file mode 100644 index 00000000..e25fc941 --- /dev/null +++ b/t/changestreams_spec.t @@ -0,0 +1,276 @@ +# Copyright 2018 - present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +use strict; +use warnings; +use utf8; +use Test::More 0.96; +use Test::Deep; +use Safe::Isa; + +use MongoDB; + +use lib "t/lib"; +use MongoDBTest qw/skip_unless_mongod build_client get_test_db server_version server_type/; +use MongoDBSpecTest qw/foreach_spec_test/; + +skip_unless_mongod(); + +my $global_client = build_client(); +my $server_version = server_version($global_client); +my $server_type = server_type($global_client); +my $server_topology = + $server_type eq 'RSPrimary' ? 'replicaset' : + $server_type eq 'Standalone' ? 'single' : + 'unknown'; + +my ($db1, $db2); + +foreach_spec_test('t/data/change-streams', sub { + my ($test, $plan) = @_; + + plan skip_all => sprintf( + "Test only runs on (%s) topology", + join('|', @{ $test->{topology} || [] }), + ) unless grep { $_ eq $server_topology } @{ $test->{topology} || [] }; + + my $min_version = defined($test->{minServerVersion}) + ? version->parse('v'.$test->{minServerVersion}) + : undef; + plan skip_all => "Test requires version $min_version" + if defined($min_version) and $server_version < $min_version; + + $db1->drop if defined $db1; + $db2->drop if defined $db2; + + $db1 = get_test_db($global_client); + $db2 = get_test_db($global_client); + + my @events; + my $client = build_client(monitoring_callback => sub { + push @events, shift; + }); + $db1 = $client->get_database($db1->name); + $db2 = $client->get_database($db2->name); + + $db1->run_command([create => $plan->{database_name}]); + $db2->run_command([create => $plan->{database2_name}]); + + my $coll1 = $db1->get_collection($plan->{collection_name}); + my $coll2 = $db2->get_collection($plan->{collection2_name}); + + my $stream_target = + $test->{target} eq 'collection' ? $coll1 : + $test->{target} eq 'database' ? $db1 : + $test->{target} eq 'client' ? $client : + die "Unknown target: ".$test->{target}; + + my $resolve_db = sub { + $_[0] eq $plan->{database_name} ? $db1 : + $_[0] eq $plan->{database2_name} ? $db2 : + undef + }; + + my $stream; + eval { + $stream = $stream_target->watch( + $test->{changeStreamPipeline} || [], + $test->{changeStreamOptions} || {}, + ); + }; + my $stream_error = $@; + if ($stream_error) { + ok(defined($test->{result}{error}), 'expected error') + or diag("Stream Error: $stream_error"); + if (defined(my $code = $test->{result}{error}{code})) { + is $stream_error->code, $code, "error code $code"; + } + } + else { + ok(defined($stream), 'change stream') + or return; + } + + for my $operation (@{ $test->{operations} || [] }) { + my ($op_db, $op_coll, $op_name) + = @{ $operation }{qw( database collection name )}; + $op_db = $op_db->$resolve_db; + $op_coll = $op_db->get_collection($op_coll); + + my $op_sub = __PACKAGE__->can("operation_${op_name}"); + $op_sub->($op_db, $op_coll, $operation->{arguments}); + } + + if (my @expected_events = @{ $test->{expectations} || [] }) { + subtest 'events' => sub { + my $index = 0; + for my $expected (@expected_events) { + $index++; + my $found; + subtest "event (index $index)" => sub { + while (@events) { + my $current = shift @events; + my $data = event_matches($current, $expected); + if (defined $data) { + check_event( + $current, + prepare_spec($data, $resolve_db), + $resolve_db, + ); + return; + } + } + ok 0, 'missing expected event'; + }; + } + }; + } + + if (@{ $test->{result}{success} || [] }) { + subtest 'success' => sub { + my @changes; + while (defined(my $change = $stream->next)) { + push @changes, $change; + } + my @expected_changes = @{ $test->{result}{success} || [] }; + is scalar(@changes), scalar(@expected_changes), + 'expected number'; + if (@changes == @expected_changes) { + for my $index (0 .. $#changes) { + subtest "result (index $index)" => sub { + check_result( + $changes[$index], + prepare_spec( + $expected_changes[$index], + $resolve_db, + ), + $resolve_db + ); + }; + } + } + }; + } +}); + +sub event_matches { + my ($event, $expected) = @_; + + my $data; + if ($data = $expected->{command_started_event}) { + return undef + unless $event->{type} eq 'command_started'; + } + elsif ($data = $expected->{command_succeeded_event}) { + return undef + unless $event->{type} eq 'command_succeeded'; + } + else { + die "Unrecognized event"; + } + + return undef + unless $event->{commandName} eq $data->{command_name}; + + return $data; +} + +sub prepare_spec { + my ($data, $resolve_db) = @_; + if (not defined $data) { + return undef; + } + elsif ($data->$_isa('JSON::PP::Boolean')) { + my $value = !!$data; + return code(sub { + ($_[0] and $value) ? (1) : + (!$_[0] and !$value) ? (1) : + (0, "boolean mismatch") + }); + } + elsif ($data eq 42) { + return code(sub { + defined($_[0]) + ? (1) + : (0, 'value is defined'); + }); + } + elsif (ref $data eq 'HASH') { + if (exists $data->{'$numberInt'}) { + return 0+$data->{'$numberInt'}; + } + return +{map { + ($_, prepare_spec($data->{$_}, $resolve_db)); + } keys %$data}; + } + elsif (ref $data eq 'ARRAY') { + return [map { + prepare_spec($_, $resolve_db); + } @$data]; + } + elsif ( + not ref $data + and defined $data + and defined(my $real_db = $data->$resolve_db) + ) { + return $real_db->name; + } + else { + return $data; + } +} + +sub check_event { + my ($event, $expected) = @_; + + is $event->{databaseName}, $expected->{database_name}, + 'database name', + if exists $expected->{database_name}; + + if (my $command = $expected->{command}) { + for my $key (sort keys %$command) { + cmp_deeply( + ($event->{command} || $event->{reply})->{$key}, + $command->{$key}, + $key, + ); + } + } +} + +sub check_result { + my ($change, $expected, $resolve_db) = @_; + + for my $key (sort keys %$expected) { + if ($key eq 'fullDocument') { + for my $doc_key (sort keys %{ $expected->{$key} }) { + cmp_deeply( + $change->{$key}{$doc_key}, + $expected->{$key}{$doc_key}, + "$key/$doc_key", + ); + } + } + else { + cmp_deeply($change->{$key}, $expected->{$key}, $key); + } + } +} + +sub operation_insertOne { + my ($db, $coll, $args) = @_; + $coll->insert_one($args->{document}); +} + +done_testing; diff --git a/t/data/change-streams/README.rst b/t/data/change-streams/README.rst new file mode 100644 index 00000000..3e45104c --- /dev/null +++ b/t/data/change-streams/README.rst @@ -0,0 +1,156 @@ +.. role:: javascript(code) + :language: javascript + +============== +Change Streams +============== + +.. contents:: + +-------- + +Introduction +============ + +The YAML and JSON files in this directory are platform-independent tests that +drivers can use to prove their conformance to the Change Streams Spec. + +Several prose tests, which are not easily expressed in YAML, are also presented +in this file. Those tests will need to be manually implemented by each driver. + +Spec Test Format +================ + +Each YAML file has the following keys: + +- ``database_name``: The default database +- ``collection_name``: The default collection +- ``database2_name``: Another database +- ``collection2_name``: Another collection +- ``tests``: An array of tests that are to be run independently of each other. + Each test will have some of the following fields: + + - ``description``: The name of the test. + - ``minServerVersion``: The minimum server version to run this test against. If not present, assume there is no minimum server version. + - ``maxServerVersion``: Reserved for later use + - ``failPoint``: Reserved for later use + - ``target``: The entity on which to run the change stream. Valid values are: + + - ``collection``: Watch changes on collection ``database_name.collection_name`` + - ``database``: Watch changes on database ``database_name`` + - ``client``: Watch changes on entire clusters + - ``topology``: An array of server topologies against which to run the test. + Valid topologies are ``single``, ``replicaset``, and ``sharded``. + - ``changeStreamPipeline``: An array of additional aggregation pipeline stages to add to the change stream + - ``changeStreamOptions``: Additional options to add to the changeStream + - ``operations``: Array of documents, each describing an operation. Each document has the following fields: + + - ``database``: Database against which to run the operation + - ``collection``: Collection against which to run the operation + - ``name``: Name of the command to run + - ``arguments``: Object of arguments for the command (ex: document to insert) + + - ``expectations``: Optional list of command-started events in Extended JSON format + - ``result``: Document with ONE of the following fields: + + - ``error``: Describes an error received during the test + - ``success``: An Extended JSON array of documents expected to be received from the changeStream + +Spec Test Match Function +======================== + +The definition of MATCH or MATCHES in the Spec Test Runner is as follows: + +- MATCH takes two values, ``expected`` and ``actual`` +- Notation is "Assert [actual] MATCHES [expected] +- Assertion passes if ``expected`` is a subset of ``actual``, with the values ``42`` and ``"42"`` acting as placeholders for "any value" + +Pseudocode implementation of ``actual`` MATCHES ``expected``: + +:: + + If expected is "42" or 42: + Assert that actual exists (is not null or undefined) + Else: + Assert that actual is of the same JSON type as expected + If expected is a JSON array: + For every idx/value in expected: + Assert that actual[idx] MATCHES value + Else if expected is a JSON object: + For every key/value in expected + Assert that actual[key] MATCHES value + Else: + Assert that expected equals actual + +The expected values for ``result.success`` and ``expectations`` are written in Extended JSON. Drivers may adopt any of the following approaches to comparisons, as long as they are consistent: + +- Convert ``actual`` to Extended JSON and compare to ``expected`` +- Convert ``expected`` and ``actual`` to BSON, and compare them +- Convert ``expected`` and ``actual`` to native equivalents of JSON, and compare them + +Spec Test Runner +================ + +Before running the tests + +- Create a MongoClient ``globalClient``, and connect to the server + +For each YAML file, for each element in ``tests``: + +- If ``topology`` does not include the topology of the server instance(s), skip this test. +- Use ``globalClient`` to + + - Drop the database ``database_name`` + - Drop the database ``database2_name`` + - Create the database ``database_name`` and the collection ``database_name.collection_name`` + - Create the database ``database2_name`` and the collection ``database2_name.collection2_name`` + +- Create a new MongoClient ``client`` +- Begin monitoring all APM events for ``client``. (If the driver uses global listeners, filter out all events that do not originate with ``client``). Filter out any "internal" commands (e.g. ``isMaster``) +- Using ``client``, create a changeStream ``changeStream`` against the specified ``target``. Use ``changeStreamPipeline`` and ``changeStreamOptions`` if they are non-empty +- Using ``globalClient``, run every operation in ``operations`` in serial against the server +- Wait until either: + + - An error occurs + - All operations have been successful AND the changeStream has received as many changes as there are in ``result.success`` + +- Close ``changeStream`` +- If there was an error: + + - Assert that an error was expected for the test. + - Assert that the error MATCHES ``results.error`` + +- Else: + + - Assert that no error was expected for the test + - Assert that the changes received from ``changeStream`` MATCH the results in ``results.success`` + +- If there are any ``expectations`` + + - For each (``expected``, ``idx``) in ``expectations`` + + - Assert that ``actual[idx]`` MATCHES ``expected`` + +- Close the MongoClient ``client`` + +After running all tests + +- Close the MongoClient ``globalClient`` +- Drop database ``database_name`` +- Drop database ``database2_name`` + + +Prose Tests +=========== + +The following tests have not yet been automated, but MUST still be tested + +1. ``ChangeStream`` must continuously track the last seen ``resumeToken`` +2. ``ChangeStream`` will throw an exception if the server response is missing the resume token +3. ``ChangeStream`` will automatically resume one time on a resumable error (including `not master`) with the initial pipeline and options, except for the addition/update of a ``resumeToken``. +4. ``ChangeStream`` will not attempt to resume on a server error +5. ``ChangeStream`` will perform server selection before attempting to resume, using initial ``readPreference`` +6. Ensure that a cursor returned from an aggregate command with a cursor id and an initial empty batch is not closed on the driver side. +7. The ``killCursors`` command sent during the "Resume Process" must not be allowed to throw an exception. +8. ``$changeStream`` stage for ``ChangeStream`` against a server ``>=4.0`` that has not received any results yet MUST include a ``startAtOperationTime`` option when resuming a changestream. +9. ``ChangeStream`` will resume after a ``killCursors`` command is issued for its child cursor. diff --git a/t/data/change-streams/change-streams-errors.json b/t/data/change-streams/change-streams-errors.json new file mode 100644 index 00000000..053ac43e --- /dev/null +++ b/t/data/change-streams/change-streams-errors.json @@ -0,0 +1,78 @@ +{ + "collection_name": "test", + "database_name": "change-stream-tests", + "collection2_name": "test2", + "database2_name": "change-stream-tests-2", + "tests": [ + { + "description": "The watch helper must not throw a custom exception when executed against a single server topology, but instead depend on a server error", + "minServerVersion": "3.6.0", + "target": "collection", + "topology": [ + "single" + ], + "changeStreamPipeline": [], + "changeStreamOptions": {}, + "operations": [], + "expectations": [], + "result": { + "error": { + "code": 40573 + } + } + }, + { + "description": "Change Stream should error when an invalid aggregation stage is passed in", + "minServerVersion": "3.6.0", + "target": "collection", + "topology": [ + "replicaset" + ], + "changeStreamPipeline": [ + { + "$unsupported": "foo" + } + ], + "changeStreamOptions": {}, + "operations": [ + { + "database": "change-stream-tests", + "collection": "test", + "name": "insertOne", + "arguments": { + "document": { + "z": 3 + } + } + } + ], + "expectations": [ + { + "command_started_event": { + "command": { + "aggregate": "test", + "cursor": {}, + "pipeline": [ + { + "$changeStream": { + "fullDocument": "default" + } + }, + { + "$unsupported": "foo" + } + ] + }, + "command_name": "aggregate", + "database_name": "change-stream-tests" + } + } + ], + "result": { + "error": { + "code": 40324 + } + } + } + ] +} diff --git a/t/data/change-streams/change-streams-errors.yml b/t/data/change-streams/change-streams-errors.yml new file mode 100644 index 00000000..1286e865 --- /dev/null +++ b/t/data/change-streams/change-streams-errors.yml @@ -0,0 +1,53 @@ +collection_name: &collection_name "test" +database_name: &database_name "change-stream-tests" +collection2_name: &collection2_name "test2" +database2_name: &database2_name "change-stream-tests-2" +tests: + - + description: The watch helper must not throw a custom exception when executed against a single server topology, but instead depend on a server error + minServerVersion: "3.6.0" + target: collection + topology: + - single + changeStreamPipeline: [] + changeStreamOptions: {} + operations: [] + expectations: [] + result: + error: + code: 40573 + - + description: Change Stream should error when an invalid aggregation stage is passed in + minServerVersion: "3.6.0" + target: collection + topology: + - replicaset + changeStreamPipeline: + - + $unsupported: foo + changeStreamOptions: {} + operations: + - + database: *database_name + collection: *collection_name + name: insertOne + arguments: + document: + z: 3 + expectations: + - + command_started_event: + command: + aggregate: *collection_name + cursor: {} + pipeline: + - + $changeStream: + fullDocument: default + - + $unsupported: foo + command_name: aggregate + database_name: *database_name + result: + error: + code: 40324 \ No newline at end of file diff --git a/t/data/change-streams/change-streams.json b/t/data/change-streams/change-streams.json new file mode 100644 index 00000000..5c28b627 --- /dev/null +++ b/t/data/change-streams/change-streams.json @@ -0,0 +1,445 @@ +{ + "collection_name": "test", + "database_name": "change-stream-tests", + "collection2_name": "test2", + "database2_name": "change-stream-tests-2", + "tests": [ + { + "description": "$changeStream must be the first stage in a change stream pipeline sent to the server", + "minServerVersion": "3.6.0", + "target": "collection", + "topology": [ + "replicaset" + ], + "changeStreamPipeline": [], + "changeStreamOptions": {}, + "operations": [ + { + "database": "change-stream-tests", + "collection": "test", + "name": "insertOne", + "arguments": { + "document": { + "x": 1 + } + } + } + ], + "expectations": [ + { + "command_started_event": { + "command": { + "aggregate": "test", + "cursor": {}, + "pipeline": [ + { + "$changeStream": { + "fullDocument": "default" + } + } + ] + }, + "command_name": "aggregate", + "database_name": "change-stream-tests" + } + } + ], + "result": { + "success": [] + } + }, + { + "description": "The server returns change stream responses in the specified server response format", + "minServerVersion": "3.6.0", + "target": "collection", + "topology": [ + "replicaset" + ], + "changeStreamPipeline": [], + "changeStreamOptions": {}, + "operations": [ + { + "database": "change-stream-tests", + "collection": "test", + "name": "insertOne", + "arguments": { + "document": { + "x": 1 + } + } + } + ], + "expectations": [], + "result": { + "success": [ + { + "_id": "42", + "documentKey": "42", + "operationType": "insert", + "ns": { + "db": "change-stream-tests", + "coll": "test" + }, + "fullDocument": { + "x": { + "$numberInt": "1" + } + } + } + ] + } + }, + { + "description": "Executing a watch helper on a Collection results in notifications for changes to the specified collection", + "minServerVersion": "3.6.0", + "target": "collection", + "topology": [ + "replicaset" + ], + "changeStreamPipeline": [], + "changeStreamOptions": {}, + "operations": [ + { + "database": "change-stream-tests", + "collection": "test2", + "name": "insertOne", + "arguments": { + "document": { + "x": 1 + } + } + }, + { + "database": "change-stream-tests-2", + "collection": "test", + "name": "insertOne", + "arguments": { + "document": { + "y": 2 + } + } + }, + { + "database": "change-stream-tests", + "collection": "test", + "name": "insertOne", + "arguments": { + "document": { + "z": 3 + } + } + } + ], + "expectations": [ + { + "command_started_event": { + "command": { + "aggregate": "test", + "cursor": {}, + "pipeline": [ + { + "$changeStream": { + "fullDocument": "default" + } + } + ] + }, + "command_name": "aggregate", + "database_name": "change-stream-tests" + } + } + ], + "result": { + "success": [ + { + "operationType": "insert", + "ns": { + "db": "change-stream-tests", + "coll": "test" + }, + "fullDocument": { + "z": { + "$numberInt": "3" + } + } + } + ] + } + }, + { + "description": "Change Stream should allow valid aggregate pipeline stages", + "minServerVersion": "3.6.0", + "target": "collection", + "topology": [ + "replicaset" + ], + "changeStreamPipeline": [ + { + "$match": { + "fullDocument.z": 3 + } + } + ], + "changeStreamOptions": {}, + "operations": [ + { + "database": "change-stream-tests", + "collection": "test", + "name": "insertOne", + "arguments": { + "document": { + "y": 2 + } + } + }, + { + "database": "change-stream-tests", + "collection": "test", + "name": "insertOne", + "arguments": { + "document": { + "z": 3 + } + } + } + ], + "expectations": [ + { + "command_started_event": { + "command": { + "aggregate": "test", + "cursor": {}, + "pipeline": [ + { + "$changeStream": { + "fullDocument": "default" + } + }, + { + "$match": { + "fullDocument.z": { + "$numberInt": "3" + } + } + } + ] + }, + "command_name": "aggregate", + "database_name": "change-stream-tests" + } + } + ], + "result": { + "success": [ + { + "operationType": "insert", + "ns": { + "db": "change-stream-tests", + "coll": "test" + }, + "fullDocument": { + "z": { + "$numberInt": "3" + } + } + } + ] + } + }, + { + "description": "Executing a watch helper on a Database results in notifications for changes to all collections in the specified database.", + "minServerVersion": "3.8.0", + "target": "database", + "topology": [ + "replicaset" + ], + "changeStreamPipeline": [], + "changeStreamOptions": {}, + "operations": [ + { + "database": "change-stream-tests", + "collection": "test2", + "name": "insertOne", + "arguments": { + "document": { + "x": 1 + } + } + }, + { + "database": "change-stream-tests-2", + "collection": "test", + "name": "insertOne", + "arguments": { + "document": { + "y": 2 + } + } + }, + { + "database": "change-stream-tests", + "collection": "test", + "name": "insertOne", + "arguments": { + "document": { + "z": 3 + } + } + } + ], + "expectations": [ + { + "command_started_event": { + "command": { + "aggregate": { + "$numberInt": "1" + }, + "cursor": {}, + "pipeline": [ + { + "$changeStream": { + "fullDocument": "default" + } + } + ] + }, + "command_name": "aggregate", + "database_name": "change-stream-tests" + } + } + ], + "result": { + "success": [ + { + "operationType": "insert", + "ns": { + "db": "change-stream-tests", + "coll": "test2" + }, + "fullDocument": { + "x": { + "$numberInt": "1" + } + } + }, + { + "operationType": "insert", + "ns": { + "db": "change-stream-tests", + "coll": "test" + }, + "fullDocument": { + "z": { + "$numberInt": "3" + } + } + } + ] + } + }, + { + "description": "Executing a watch helper on a MongoClient results in notifications for changes to all collections in all databases in the cluster.", + "minServerVersion": "3.8.0", + "target": "client", + "topology": [ + "replicaset" + ], + "changeStreamPipeline": [], + "changeStreamOptions": {}, + "operations": [ + { + "database": "change-stream-tests", + "collection": "test2", + "name": "insertOne", + "arguments": { + "document": { + "x": 1 + } + } + }, + { + "database": "change-stream-tests-2", + "collection": "test", + "name": "insertOne", + "arguments": { + "document": { + "y": 2 + } + } + }, + { + "database": "change-stream-tests", + "collection": "test", + "name": "insertOne", + "arguments": { + "document": { + "z": 3 + } + } + } + ], + "expectations": [ + { + "command_started_event": { + "command": { + "aggregate": { + "$numberInt": "1" + }, + "cursor": {}, + "pipeline": [ + { + "$changeStream": { + "fullDocument": "default", + "allChangesForCluster": true + } + } + ] + }, + "command_name": "aggregate", + "database_name": "admin" + } + } + ], + "result": { + "success": [ + { + "operationType": "insert", + "ns": { + "db": "change-stream-tests", + "coll": "test2" + }, + "fullDocument": { + "x": { + "$numberInt": "1" + } + } + }, + { + "operationType": "insert", + "ns": { + "db": "change-stream-tests-2", + "coll": "test" + }, + "fullDocument": { + "y": { + "$numberInt": "2" + } + } + }, + { + "operationType": "insert", + "ns": { + "db": "change-stream-tests", + "coll": "test" + }, + "fullDocument": { + "z": { + "$numberInt": "3" + } + } + } + ] + } + } + ] +} diff --git a/t/data/change-streams/change-streams.yml b/t/data/change-streams/change-streams.yml new file mode 100644 index 00000000..720a22f9 --- /dev/null +++ b/t/data/change-streams/change-streams.yml @@ -0,0 +1,299 @@ +collection_name: &collection_name "test" +database_name: &database_name "change-stream-tests" +collection2_name: &collection2_name "test2" +database2_name: &database2_name "change-stream-tests-2" +tests: + - + description: "$changeStream must be the first stage in a change stream pipeline sent to the server" + minServerVersion: "3.6.0" + target: collection + topology: + - replicaset + changeStreamPipeline: [] + changeStreamOptions: {} + operations: + - + database: *database_name + collection: *collection_name + name: insertOne + arguments: + document: + x: 1 + expectations: + - + command_started_event: + command: + aggregate: *collection_name + cursor: {} + pipeline: + - + $changeStream: + fullDocument: default + command_name: aggregate + database_name: *database_name + result: + success: [] + - + description: The server returns change stream responses in the specified server response format + minServerVersion: "3.6.0" + target: collection + topology: + - replicaset + changeStreamPipeline: [] + changeStreamOptions: {} + operations: + - + database: *database_name + collection: *collection_name + name: insertOne + arguments: + document: + x: 1 + expectations: [] + result: + success: + - + _id: "42" + documentKey: "42" + operationType: insert + ns: + db: *database_name + coll: *collection_name + fullDocument: + x: + $numberInt: "1" + - + description: Executing a watch helper on a Collection results in notifications for changes to the specified collection + minServerVersion: "3.6.0" + target: collection + topology: + - replicaset + changeStreamPipeline: [] + changeStreamOptions: {} + operations: + - + database: *database_name + collection: *collection2_name + name: insertOne + arguments: + document: + x: 1 + - + database: *database2_name + collection: *collection_name + name: insertOne + arguments: + document: + y: 2 + - + database: *database_name + collection: *collection_name + name: insertOne + arguments: + document: + z: 3 + expectations: + - + command_started_event: + command: + aggregate: *collection_name + cursor: {} + pipeline: + - + $changeStream: + fullDocument: default + command_name: aggregate + database_name: *database_name + result: + success: + - + operationType: insert + ns: + db: *database_name + coll: *collection_name + fullDocument: + z: + $numberInt: "3" + - + description: Change Stream should allow valid aggregate pipeline stages + minServerVersion: "3.6.0" + target: collection + topology: + - replicaset + changeStreamPipeline: + - + $match: + "fullDocument.z": 3 + changeStreamOptions: {} + operations: + - + database: *database_name + collection: *collection_name + name: insertOne + arguments: + document: + y: 2 + - + database: *database_name + collection: *collection_name + name: insertOne + arguments: + document: + z: 3 + expectations: + - + command_started_event: + command: + aggregate: *collection_name + cursor: {} + pipeline: + - + $changeStream: + fullDocument: default + - + $match: + "fullDocument.z": + $numberInt: "3" + command_name: aggregate + database_name: *database_name + result: + success: + - + operationType: insert + ns: + db: *database_name + coll: *collection_name + fullDocument: + z: + $numberInt: "3" + - + description: Executing a watch helper on a Database results in notifications for changes to all collections in the specified database. + minServerVersion: "3.8.0" + target: database + topology: + - replicaset + changeStreamPipeline: [] + changeStreamOptions: {} + operations: + - + database: *database_name + collection: *collection2_name + name: insertOne + arguments: + document: + x: 1 + - + database: *database2_name + collection: *collection_name + name: insertOne + arguments: + document: + y: 2 + - + database: *database_name + collection: *collection_name + name: insertOne + arguments: + document: + z: 3 + expectations: + - + command_started_event: + command: + aggregate: + $numberInt: "1" + cursor: {} + pipeline: + - + $changeStream: + fullDocument: default + command_name: aggregate + database_name: *database_name + result: + success: + - + operationType: insert + ns: + db: *database_name + coll: *collection2_name + fullDocument: + x: + $numberInt: "1" + - + operationType: insert + ns: + db: *database_name + coll: *collection_name + fullDocument: + z: + $numberInt: "3" + - + description: Executing a watch helper on a MongoClient results in notifications for changes to all collections in all databases in the cluster. + minServerVersion: "3.8.0" + target: client + topology: + - replicaset + changeStreamPipeline: [] + changeStreamOptions: {} + operations: + - + database: *database_name + collection: *collection2_name + name: insertOne + arguments: + document: + x: 1 + - + database: *database2_name + collection: *collection_name + name: insertOne + arguments: + document: + y: 2 + - + database: *database_name + collection: *collection_name + name: insertOne + arguments: + document: + z: 3 + expectations: + - + command_started_event: + command: + aggregate: + $numberInt: "1" + cursor: {} + pipeline: + - + $changeStream: + fullDocument: default + allChangesForCluster: true + command_name: aggregate + database_name: admin + result: + success: + - + operationType: insert + ns: + db: *database_name + coll: *collection2_name + fullDocument: + x: + $numberInt: "1" + - + operationType: insert + ns: + db: *database2_name + coll: *collection_name + fullDocument: + y: + $numberInt: "2" + - + operationType: insert + ns: + db: *database_name + coll: *collection_name + fullDocument: + z: + $numberInt: "3" diff --git a/t/lib/MongoDBSpecTest.pm b/t/lib/MongoDBSpecTest.pm new file mode 100644 index 00000000..3ae2966c --- /dev/null +++ b/t/lib/MongoDBSpecTest.pm @@ -0,0 +1,55 @@ +# Copyright 2013 - present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +package MongoDBSpecTest; + +use strict; +use warnings; + +use Exporter 'import'; +use Test::More; +use Path::Tiny; +use JSON::MaybeXS qw( is_bool decode_json ); + +our @EXPORT_OK = qw( + foreach_spec_test +); + +sub foreach_spec_test { + my ($dir, $callback) = @_; + + $dir = path($dir); + my $iterator = $dir->iterator( { recurse => 1 } ); + + while ( my $path = $iterator->() ) { + next unless -f $path && $path =~ /\.json$/; + + my $plan = eval { decode_json( $path->slurp_utf8 ) }; + if ($@) { + die "Error decoding $path: $@"; + } + + my $name = $path->relative($dir)->basename(".json"); + + subtest $name => sub { + for my $test ( @{ $plan->{tests} } ) { + subtest $test->{description} => sub { + $callback->($test, $plan); + }; + } + }; + } +} + +1;