From e56965a0482359c0d41cc567c240677e53068a62 Mon Sep 17 00:00:00 2001 From: Robert Sedlacek Date: Wed, 20 Jun 2018 05:31:36 +0200 Subject: [PATCH 1/8] Create Op::_ChangeStream, add ->watch() to databases and clients --- lib/MongoDB/ChangeStream.pm | 67 +++++-- lib/MongoDB/Collection.pm | 38 +++- lib/MongoDB/Database.pm | 84 ++++++++ lib/MongoDB/MongoClient.pm | 103 ++++++++++ lib/MongoDB/Op/_ChangeStream.pm | 278 +++++++++++++++++++++++++++ lib/MongoDB/Role/_CommandCursorOp.pm | 5 +- lib/MongoDB/_Link.pm | 9 + 7 files changed, 560 insertions(+), 24 deletions(-) create mode 100644 lib/MongoDB/Op/_ChangeStream.pm diff --git a/lib/MongoDB/ChangeStream.pm b/lib/MongoDB/ChangeStream.pm index 1bfc9d30..3bb32c19 100644 --- a/lib/MongoDB/ChangeStream.pm +++ b/lib/MongoDB/ChangeStream.pm @@ -24,7 +24,7 @@ our $VERSION = 'v1.999.1'; use Moo; use Try::Tiny; use MongoDB::Cursor; -use MongoDB::Op::_Aggregate; +use MongoDB::Op::_ChangeStream; use MongoDB::Error; use Safe::Isa; use MongoDB::_Types qw( @@ -32,8 +32,11 @@ use MongoDB::_Types qw( ArrayOfHashRef ); use Types::Standard qw( + Bool InstanceOf + Int HashRef + Maybe Str ); @@ -47,15 +50,22 @@ has _result => ( clearer => '_clear_result', ); -has collection => ( +has client => ( is => 'ro', - isa => MongoDBCollection, + isa => InstanceOf['MongoDB::MongoClient'], required => 1, ); -has aggregation_options => ( +has _op_args => ( is => 'ro', isa => HashRef, + default => sub { {} }, + init_arg => 'op_args', +); + +has collection => ( + is => 'ro', + isa => MongoDBCollection, ); has pipeline => ( @@ -77,6 +87,22 @@ has _resume_token => ( lazy => 1, ); +has all_changes_for_cluster => ( + is => 'ro', + isa => Bool, + default => sub { 0 }, +); + +has _changes_received => ( + is => 'rw', + default => sub { 0 }, +); + +has start_at_operation_time => ( + is => 'ro', + isa => Maybe[Int], +); + sub BUILD { my ($self) = @_; @@ -87,25 +113,23 @@ sub BUILD { sub _build_result { my ($self) = @_; - my @pipeline = @{ $self->pipeline }; - @pipeline = ( - {'$changeStream' => { - ($self->_has_full_document - ? (fullDocument => $self->full_document) - : () - ), - ($self->_has_resume_token - ? (resumeAfter => $self->_resume_token) - : () - ), - }}, - @pipeline, + my $op = MongoDB::Op::_ChangeStream->new( + pipeline => $self->pipeline, + all_changes_for_cluster => $self->all_changes_for_cluster, + changes_received => $self->_changes_received, + defined($self->start_at_operation_time) + ? (start_at_operation_time => $self->start_at_operation_time) + : (), + $self->_has_full_document + ? (full_document => $self->full_document) + : (), + $self->_has_resume_token + ? (resume_after => $self->_resume_token) + : (), + %{ $self->_op_args }, ); - return $self->collection->aggregate( - \@pipeline, - $self->aggregation_options, - ); + return $self->client->send_read_op($op); } =head1 STREAM METHODS @@ -160,6 +184,7 @@ sub next { if (exists $change->{_id}) { $self->_resume_token($change->{_id}); + $self->_changes_received(1); return $change; } else { diff --git a/lib/MongoDB/Collection.pm b/lib/MongoDB/Collection.pm index 0472a4fd..5b2906d2 100644 --- a/lib/MongoDB/Collection.pm +++ b/lib/MongoDB/Collection.pm @@ -1081,9 +1081,15 @@ The optional second argument is a hash reference with options: was changed from some time after the change occurred. * C - The logical starting point for this change stream. This value can be obtained from the C<_id> field of a document returned - by L. + by L. Cannot be specified together with + C * C - The maximum number of milliseconds for the server to wait before responding. +* C - A timestamp specifying at what point in time + changes will start being watched. Cannot be specified together with + C. +* C - the session to use for these operations. If not supplied, will + use an implicit session. For more information see L See L for more available options. @@ -1101,16 +1107,44 @@ sub watch { $pipeline ||= []; $options ||= {}; + my $session = $self->_get_session_from_hashref( $options ); + + # boolify some options + for my $k (qw/allowDiskUse explain/) { + $options->{$k} = ( $options->{$k} ? true : false ) if exists $options->{$k}; + } + + # possibly fallback to default maxTimeMS + if ( ! exists $options->{maxTimeMS} && $self->max_time_ms ) { + $options->{maxTimeMS} = $self->max_time_ms; + } + + # read preferences are ignored if the last stage is $out + my ($last_op) = keys %{ $pipeline->[-1] || {} }; + return MongoDB::ChangeStream->new( + exists($options->{startAtOperationTime}) + ? (start_at_operation_time => delete $options->{startAtOperationTime}) + : (), exists($options->{fullDocument}) ? (full_document => delete $options->{fullDocument}) : (full_document => 'default'), exists($options->{resumeAfter}) ? (resume_after => delete $options->{resumeAfter}) : (), + client => $self->client, collection => $self, pipeline => $pipeline, - aggregation_options => $options, + op_args => { + options => $options, + read_concern => $self->read_concern, + has_out => defined($last_op) && $last_op eq '$out', + exists($options->{maxAwaitTimeMS}) + ? (maxAwaitTimeMS => delete $options->{maxAwaitTimeMS}) + : (), + session => $session, + %{ $self->_op_args }, + }, ); } diff --git a/lib/MongoDB/Database.pm b/lib/MongoDB/Database.pm index e9bcba52..2340a0e3 100644 --- a/lib/MongoDB/Database.pm +++ b/lib/MongoDB/Database.pm @@ -483,6 +483,90 @@ sub _get_session_from_hashref { return $session; } +=method watch + +Watches for changes on this database. + +Perform an aggregation with an implicit initial C<$changeStream> stage +and returns a L result which can be used to +iterate over the changes in the database. This functionality is +available since MongoDB 4.0. + + my $stream = $db->watch(); + my $stream = $db->watch( \@pipeline ); + my $stream = $db->watch( \@pipeline, \%options ); + + while (1) { + + # This inner loop will only run until no more changes are + # available. + while (my $change = $stream->next) { + # process $change + } + } + +The returned stream will not block forever waiting for changes. If you +want to respond to changes over a longer time use C and +regularly call C in a loop. + +See L for details on usage and available +options. + +=cut + +sub watch { + my ( $self, $pipeline, $options ) = @_; + + $pipeline ||= []; + $options ||= {}; + + my $session = $self->_get_session_from_hashref( $options ); + + # boolify some options + for my $k (qw/allowDiskUse explain/) { + $options->{$k} = ( $options->{$k} ? true : false ) if exists $options->{$k}; + } + + # possibly fallback to default maxTimeMS + if ( ! exists $options->{maxTimeMS} && $self->max_time_ms ) { + $options->{maxTimeMS} = $self->max_time_ms; + } + + # read preferences are ignored if the last stage is $out + my ($last_op) = keys %{ $pipeline->[-1] || {} }; + + return MongoDB::ChangeStream->new( + exists($options->{startAtOperationTime}) + ? (start_at_operation_time => delete $options->{startAtOperationTime}) + : (), + exists($options->{fullDocument}) + ? (full_document => delete $options->{fullDocument}) + : (full_document => 'default'), + exists($options->{resumeAfter}) + ? (resume_after => delete $options->{resumeAfter}) + : (), + client => $self->_client, + pipeline => $pipeline, + op_args => { + options => $options, + read_concern => $self->read_concern, + has_out => defined($last_op) && $last_op eq '$out', + exists($options->{maxAwaitTimeMS}) + ? (maxAwaitTimeMS => delete $options->{maxAwaitTimeMS}) + : (), + session => $session, + client => $self->_client, + bson_codec => $self->bson_codec, + db_name => $self->name, + coll_name => 1, # Magic not-an-actual-collection number + full_name => $self->name . ".1", + read_preference => $self->read_preference, + write_concern => $self->write_concern, + monitoring_callback => $self->_client->monitoring_callback, + }, + ); +} + 1; __END__ diff --git a/lib/MongoDB/MongoClient.pm b/lib/MongoDB/MongoClient.pm index 0919f6f2..1a427379 100644 --- a/lib/MongoDB/MongoClient.pm +++ b/lib/MongoDB/MongoClient.pm @@ -1870,6 +1870,109 @@ sub fsync_unlock { return $self->send_primary_op($op); } +sub _get_session_from_hashref { + my ( $self, $hashref ) = @_; + + my $session = delete $hashref->{session}; + + if ( defined $session ) { + MongoDB::UsageError->throw( "Cannot use session from another client" ) + if ( $session->client->_id ne $self->_id ); + MongoDB::UsageError->throw( "Cannot use session which has ended" ) + if ! defined $session->session_id; + } else { + $session = $self->_maybe_get_implicit_session; + } + + return $session; +} + +=method watch + +Watches for changes on the cluster. + +Perform an aggregation with an implicit initial C<$changeStream> stage +and returns a L result which can be used to +iterate over the changes in the cluster. This functionality is +available since MongoDB 4.0. + + my $stream = $client->watch(); + my $stream = $client->watch( \@pipeline ); + my $stream = $client->watch( \@pipeline, \%options ); + + while (1) { + + # This inner loop will only run until no more changes are + # available. + while (my $change = $stream->next) { + # process $change + } + } + +The returned stream will not block forever waiting for changes. If you +want to respond to changes over a longer time use C and +regularly call C in a loop. + +See L for details on usage and available +options. + +=cut + +sub watch { + my ( $self, $pipeline, $options ) = @_; + + $pipeline ||= []; + $options ||= {}; + + my $session = $self->_get_session_from_hashref( $options ); + + # boolify some options + for my $k (qw/allowDiskUse explain/) { + $options->{$k} = ( $options->{$k} ? true : false ) if exists $options->{$k}; + } + + # possibly fallback to default maxTimeMS + if ( ! exists $options->{maxTimeMS} && $self->max_time_ms ) { + $options->{maxTimeMS} = $self->max_time_ms; + } + + # read preferences are ignored if the last stage is $out + my ($last_op) = keys %{ $pipeline->[-1] || {} }; + + return MongoDB::ChangeStream->new( + exists($options->{startAtOperationTime}) + ? (start_at_operation_time => delete $options->{startAtOperationTime}) + : (), + exists($options->{fullDocument}) + ? (full_document => delete $options->{fullDocument}) + : (full_document => 'default'), + exists($options->{resumeAfter}) + ? (resume_after => delete $options->{resumeAfter}) + : (), + client => $self, + all_changes_for_cluster => 1, + pipeline => $pipeline, + op_args => { + options => $options, + read_concern => $self->read_concern, + has_out => defined($last_op) && $last_op eq '$out', + exists($options->{maxAwaitTimeMS}) + ? (maxAwaitTimeMS => delete $options->{maxAwaitTimeMS}) + : (), + session => $session, + client => $self, + db_name => 'admin',, + coll_name => 1, + full_name => 'admin.1', + bson_codec => $self->bson_codec, + write_concern => $self->write_concern, + read_concern => $self->read_concern, + read_preference => $self->read_preference, + monitoring_callback => $self->monitoring_callback, + }, + ); +} + 1; diff --git a/lib/MongoDB/Op/_ChangeStream.pm b/lib/MongoDB/Op/_ChangeStream.pm new file mode 100644 index 00000000..47ac6420 --- /dev/null +++ b/lib/MongoDB/Op/_ChangeStream.pm @@ -0,0 +1,278 @@ +# Copyright 2015 - present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +use strict; +use warnings; +package MongoDB::Op::_ChangeStream; + +# Encapsulate aggregate operation; return MongoDB::QueryResult + +use version; +our $VERSION = 'v1.999.1'; + +use Moo; + +use boolean; +use BSON::Timestamp; +use MongoDB::Op::_Command; +use MongoDB::_Types qw( + ArrayOfHashRef + Boolish +); +use Types::Standard qw( + HashRef + InstanceOf + Num + Str + Maybe + Int + Bool +); + +use namespace::clean; + +has client => ( + is => 'ro', + required => 1, + isa => InstanceOf ['MongoDB::MongoClient'], +); + +has pipeline => ( + is => 'ro', + required => 1, + isa => ArrayOfHashRef, +); + +has options => ( + is => 'ro', + required => 1, + isa => HashRef, +); + +has has_out => ( + is => 'ro', + required => 1, + isa => Boolish, +); + +has maxAwaitTimeMS => ( + is => 'rw', + isa => Num, +); + +has full_document => ( + is => 'ro', + isa => Str, + predicate => 'has_full_document', +); + +has resume_after => ( + is => 'ro', + predicate => 'has_resume_after', +); + +has all_changes_for_cluster => ( + is => 'ro', + isa => Bool, + default => sub { 0 }, +); + +has start_at_operation_time => ( + is => 'ro', + isa => Maybe[Int], +); + +has changes_received => ( + is => 'ro', + default => sub { 0 }, +); + +with $_ for qw( + MongoDB::Role::_PrivateConstructor + MongoDB::Role::_CollectionOp + MongoDB::Role::_ReadOp + MongoDB::Role::_WriteOp + MongoDB::Role::_CommandCursorOp +); + +sub execute { + my ( $self, $link, $topology ) = @_; + + my $options = $self->options; + my $is_2_6 = $link->supports_write_commands; + + # maxTimeMS isn't available until 2.6 and the aggregate command + # will reject it as unrecognized + delete $options->{maxTimeMS} unless $is_2_6; + + # bypassDocumentValidation isn't available until 3.2 (wire version 4) + delete $options->{bypassDocumentValidation} unless $link->supports_document_validation; + + if ( defined $options->{collation} and !$link->supports_collation ) { + MongoDB::UsageError->throw( + "MongoDB host '" . $link->address . "' doesn't support collation" ); + } + + # If 'cursor' is explicitly false, we disable using cursors, even + # for MongoDB 2.6+. This allows users operating with a 2.6+ mongos + # and pre-2.6 mongod in shards to avoid fatal errors. This + # workaround should be removed once MongoDB 2.4 is no longer supported. + my $use_cursor = $is_2_6 + && ( !exists( $options->{cursor} ) || $options->{cursor} ); + + # batchSize is not a command parameter itself like other options + my $batchSize = delete $options->{batchSize}; + + # If we're doing cursors, we first respect an explicit batchSize option; + # next we fallback to the legacy (deprecated) cursor option batchSize; finally we + # just give an empty document. Other than batchSize we ignore any other + # legacy cursor options. If we're not doing cursors, don't send any + # cursor option at all, as servers will choke on it. + if ($use_cursor) { + if ( defined $batchSize ) { + $options->{cursor} = { batchSize => $batchSize }; + } + elsif ( ref $options->{cursor} eq 'HASH' ) { + $batchSize = $options->{cursor}{batchSize}; + $options->{cursor} = defined($batchSize) ? { batchSize => $batchSize } : {}; + } + else { + $options->{cursor} = {}; + } + } + else { + delete $options->{cursor}; + } + + my $has_out = $self->has_out; + + if ( $self->coll_name eq 1 && ! $link->supports_db_aggregation ) { + MongoDB::Error->throw( + "Calling aggregate with a collection name of '1' is not supported on Wire Version < 6" ); + } + + my $start_op_time; + + # preset startAtOperationTime + if (defined $self->start_at_operation_time) { + $start_op_time = $self->start_at_operation_time; + } + + # default startAtOperationTime when supported and not + if ( + !defined($start_op_time) && + !$self->changes_received && + $link->supports_4_0_changestreams + ) { + if ( + defined($link->server) && + defined(my $op_time = $link->server->is_master->{operationTime}) + ) { + $start_op_time = $op_time; + } + + if ( + !defined($start_op_time) && + defined($self->session) && + defined(my $latest_cl_time = $self->session->get_latest_cluster_time) + ) { + if (defined(my $cl_time = $latest_cl_time->{clusterTime})) { + $start_op_time = $cl_time; + } + } + } + + my @pipeline = ( + {'$changeStream' => { + (defined($self->start_at_operation_time) + ? (startAtOperationTime => BSON::Timestamp->new( + $self->start_at_operation_time, + )) + : () + ), + ($self->all_changes_for_cluster + ? (allChangesForCluster => true) + : () + ), + ($self->has_full_document + ? (fullDocument => $self->full_document) + : () + ), + ($self->has_resume_after + ? (resumeAfter => $self->resume_after) + : () + ), + }}, + @{ $self->pipeline }, + ); + + my @command = ( + aggregate => $self->coll_name, + pipeline => \@pipeline, + %$options, + ( + !$has_out && $link->supports_read_concern ? @{ $self->read_concern->as_args( $self->session) } : () + ), + ( + $has_out && $link->supports_helper_write_concern ? @{ $self->write_concern->as_args } : () + ), + ); + + my $op = MongoDB::Op::_Command->_new( + db_name => $self->db_name, + query => Tie::IxHash->new(@command), + query_flags => {}, + bson_codec => $self->bson_codec, + ( $has_out ? () : ( read_preference => $self->read_preference ) ), + session => $self->session, + monitoring_callback => $self->monitoring_callback, + ); + + my $res = $op->execute( $link, $topology ); + + $res->assert_no_write_concern_error if $has_out; + + # For explain, we give the whole response as fields have changed in + # different server versions + if ( $options->{explain} ) { + return MongoDB::QueryResult->_new( + _client => $self->client, + _address => $link->address, + _full_name => '', + _bson_codec => $self->bson_codec, + _batch_size => 1, + _cursor_at => 0, + _limit => 0, + _cursor_id => 0, + _cursor_start => 0, + _cursor_flags => {}, + _cursor_num => 1, + _docs => [ $res->output ], + ); + } + + # Fake up a single-batch cursor if we didn't get a cursor response. + # We use the 'results' fields as the first (and only) batch + if ( !$res->output->{cursor} ) { + $res->output->{cursor} = { + ns => '', + id => 0, + firstBatch => ( delete $res->output->{result} ) || [], + }; + } + + return $self->_build_result_from_cursor($res); +} + +1; diff --git a/lib/MongoDB/Role/_CommandCursorOp.pm b/lib/MongoDB/Role/_CommandCursorOp.pm index f62a1b62..fe70c0fd 100644 --- a/lib/MongoDB/Role/_CommandCursorOp.pm +++ b/lib/MongoDB/Role/_CommandCursorOp.pm @@ -44,7 +44,10 @@ sub _build_result_from_cursor { $self->options->{cursorType} eq 'tailable_await') { $max_time_ms = $self->options->{maxAwaitTimeMS} if $self->options->{maxAwaitTimeMS}; } - elsif ($self->isa('MongoDB::Op::_Aggregate')) { + elsif ( + $self->isa('MongoDB::Op::_Aggregate') || + $self->isa('MongoDB::Op::_ChangeStream') + ) { $max_time_ms = $self->maxAwaitTimeMS if $self->maxAwaitTimeMS; } diff --git a/lib/MongoDB/_Link.pm b/lib/MongoDB/_Link.pm index 7e4dc153..105d42ca 100644 --- a/lib/MongoDB/_Link.pm +++ b/lib/MongoDB/_Link.pm @@ -215,6 +215,12 @@ has supports_retryWrites => ( isa => Boolish, ); +has supports_4_0_changestreams => ( + is => 'rwp', + init_arg => undef, + isa => Boolish, +); + my @connection_state_fields = qw( fh connected rcvbuf last_used fdset is_ssl ); @@ -350,6 +356,9 @@ sub set_metadata { : 0 ); } + if ( $self->accepts_wire_version(7) ) { + $self->_set_supports_4_0_changestreams(1); + } return; } From d052069efbf8246f38aef5f20828b3a0cac6fb49 Mon Sep 17 00:00:00 2001 From: Robert Sedlacek Date: Wed, 20 Jun 2018 05:32:38 +0200 Subject: [PATCH 2/8] Run changestream tests on databases and clients as well, add tests for sessions --- t/changestreams.t | 268 ++++++++++++++++++++++++++++------------------ 1 file changed, 165 insertions(+), 103 deletions(-) diff --git a/t/changestreams.t b/t/changestreams.t index 39da1ad6..7a2c7125 100644 --- a/t/changestreams.t +++ b/t/changestreams.t @@ -20,129 +20,191 @@ use Test::More 0.96; use MongoDB; use lib "t/lib"; -use MongoDBTest qw/skip_unless_mongod build_client get_test_db server_version server_type/; +use MongoDBTest qw/ + skip_unless_mongod + skip_unless_sessions + uuid_to_string + build_client + get_test_db + server_version + server_type +/; skip_unless_mongod(); -my $conn = build_client(); +my @events; + +my $conn = build_client(monitoring_callback => sub { + push @events, shift; +}); + my $testdb = get_test_db($conn); my $server_version = server_version($conn); my $server_type = server_type($conn); my $coll = $testdb->get_collection('test_collection'); -subtest 'change streams' => sub { - plan skip_all => 'MongoDB version 3.6 or higher required' - unless $server_version >= version->parse('v3.6.0'); - plan skip_all => 'MongoDB replica set required' - unless $server_type eq 'RSPrimary'; - - $coll->drop; - $coll->insert_one({ value => 1 }); - $coll->insert_one({ value => 2 }); - - my $change_stream = $coll->watch(); - is $change_stream->next, undef, 'next without changes'; - - for my $index (1..10) { - $coll->insert_one({ value => 100 + $index }); - } - my %changed; - while (my $change = $change_stream->next) { - is $changed{ $change->{fullDocument}{value} }++, 0, - 'first seen '.$change->{fullDocument}{value}; - } - is scalar(keys %changed), 10, 'seen all changes'; +plan skip_all => 'MongoDB replica set required' + unless $server_type eq 'RSPrimary'; + +subtest 'client' => sub { + plan skip_all => 'MongoDB version 4.0 or higher required' + unless $server_version >= version->parse('v4.0.0'); + run_tests_for($conn); }; -subtest 'change streams w/ maxAwaitTimeMS' => sub { - plan skip_all => 'MongoDB version 3.6 or higher required' - unless $server_version >= version->parse('v3.6.0'); - plan skip_all => 'MongoDB replica set required' - unless $server_type eq 'RSPrimary'; - - $coll->drop; - my $change_stream = $coll->watch([], { maxAwaitTimeMS => 3000 }); - my $start = time; - is $change_stream->next, undef, 'next without changes'; - my $elapsed = time - $start; - my $min_elapsed = 2; - ok $elapsed > $min_elapsed, "waited for at least $min_elapsed secs"; +subtest 'database' => sub { + plan skip_all => 'MongoDB version 4.0 or higher required' + unless $server_version >= version->parse('v4.0.0'); + run_tests_for($testdb); }; -subtest 'change streams w/ fullDocument' => sub { +subtest 'collection' => sub { plan skip_all => 'MongoDB version 3.6 or higher required' unless $server_version >= version->parse('v3.6.0'); - plan skip_all => 'MongoDB replica set required' - unless $server_type eq 'RSPrimary'; - - $coll->drop; - $coll->insert_one({ value => 1 }); - my $change_stream = $coll->watch( - [], - { fullDocument => 'updateLookup' }, - ); - $coll->update_one( - { value => 1 }, - { '$set' => { updated => 3 }}, - ); - my $change = $change_stream->next; - is $change->{operationType}, 'update', 'change is an update'; - ok exists($change->{fullDocument}), 'delta contains full document'; + run_tests_for($coll); }; -subtest 'change streams w/ resumeAfter' => sub { - plan skip_all => 'MongoDB version 3.6 or higher required' - unless $server_version >= version->parse('v3.6.0'); - plan skip_all => 'MongoDB replica set required' - unless $server_type eq 'RSPrimary'; - - $coll->drop; - my $id = do { - my $change_stream = $coll->watch(); - $coll->insert_one({ value => 200 }); - $coll->insert_one({ value => 201 }); - my $change = $change_stream->next; - ok $change, 'change exists'; - is $change->{fullDocument}{value}, 200, - 'correct change'; - $change->{_id} +done_testing; + +sub run_tests_for { + my ($watchable) = @_; + + subtest 'basic' => sub { + $coll->drop; + $coll->insert_one({ value => 1 }); + $coll->insert_one({ value => 2 }); + + my $change_stream = $watchable->watch(); + is $change_stream->next, undef, 'next without changes'; + + for my $index (1..10) { + $coll->insert_one({ value => 100 + $index }); + } + my %changed; + while (my $change = $change_stream->next) { + is $changed{ $change->{fullDocument}{value} }++, 0, + 'first seen '.$change->{fullDocument}{value}; + } + is scalar(keys %changed), 10, 'seen all changes'; + }; + + subtest 'change streams w/ maxAwaitTimeMS' => sub { + $coll->drop; + my $change_stream = $watchable->watch([], { maxAwaitTimeMS => 3000 }); + my $start = time; + is $change_stream->next, undef, 'next without changes'; + my $elapsed = time - $start; + my $min_elapsed = 2; + ok $elapsed > $min_elapsed, "waited for at least $min_elapsed secs"; }; - do { - my $change_stream = $coll->watch( + + subtest 'change streams w/ fullDocument' => sub { + $coll->drop; + $coll->insert_one({ value => 1 }); + my $change_stream = $watchable->watch( [], - { resumeAfter => $id }, + { fullDocument => 'updateLookup' }, + ); + $coll->update_one( + { value => 1 }, + { '$set' => { updated => 3 }}, ); my $change = $change_stream->next; - ok $change, 'change exists after resume'; - is $change->{fullDocument}{value}, 201, - 'correct change after resume'; - is $change_stream->next, undef, 'no more changes'; + is $change->{operationType}, 'update', 'change is an update'; + ok exists($change->{fullDocument}), 'delta contains full document'; }; -}; -subtest 'change streams w/ CursorNotFound reconnection' => sub { - plan skip_all => 'MongoDB version 3.6 or higher required' - unless $server_version >= version->parse('v3.6.0'); - plan skip_all => 'MongoDB replica set required' - unless $server_type eq 'RSPrimary'; - - $coll->drop; - - my $change_stream = $coll->watch; - $coll->insert_one({ value => 301 }); - my $change = $change_stream->next; - ok $change, 'change received'; - is $change->{fullDocument}{value}, 301, 'correct change'; - - $testdb->run_command([ - killCursors => $coll->name, - cursors => [$change_stream->_result->_cursor_id], - ]); - - $coll->insert_one({ value => 302 }); - $change = $change_stream->next; - ok $change, 'change received after reconnect'; - is $change->{fullDocument}{value}, 302, 'correct change'; -}; + subtest 'change streams w/ resumeAfter' => sub { + $coll->drop; + my $id = do { + my $change_stream = $watchable->watch(); + $coll->insert_one({ value => 200 }); + $coll->insert_one({ value => 201 }); + my $change = $change_stream->next; + ok $change, 'change exists'; + is $change->{fullDocument}{value}, 200, + 'correct change'; + $change->{_id} + }; + do { + my $change_stream = $watchable->watch( + [], + { resumeAfter => $id }, + ); + my $change = $change_stream->next; + ok $change, 'change exists after resume'; + is $change->{fullDocument}{value}, 201, + 'correct change after resume'; + is $change_stream->next, undef, 'no more changes'; + }; + }; -done_testing; + subtest 'change streams w/ CursorNotFound reconnection' => sub { + $coll->drop; + + my $change_stream = $watchable->watch; + $coll->insert_one({ value => 301 }); + my $change = $change_stream->next; + ok $change, 'change received'; + is $change->{fullDocument}{value}, 301, 'correct change'; + + $testdb->run_command([ + killCursors => $coll->name, + cursors => [$change_stream->_result->_cursor_id], + ]); + + $coll->insert_one({ value => 302 }); + $change = $change_stream->next; + ok $change, 'change received after reconnect'; + is $change->{fullDocument}{value}, 302, 'correct change'; + }; + + subtest 'startAtOperationTime' => sub { + plan skip_all => 'MongoDB version 4.0 or higher required' + unless $server_version >= version->parse('v4.0.0'); + + $coll->drop; + + my $change_stream = $watchable->watch([], { + startAtOperationTime => scalar(time + 3), + }); + $coll->insert_one({ value => 401 }); + sleep 4; + $coll->insert_one({ value => 402 }); + + my $change = $change_stream->next; + ok $change, 'change received'; + is $change->{fullDocument}{value}, 402, 'correct change'; + + ok !defined($change_stream->next), 'no more changes'; + }; + + subtest 'sessions' => sub { + skip_unless_sessions(); + @events = (); + + my $session = $conn->start_session; + + my $change_stream = $watchable->watch([], { + session => $session, + }); + $change_stream->next; + + my ($event) = grep { + $_->{commandName} eq 'aggregate' and + $_->{type} eq 'command_started' + } @events; + + ok(defined($event), 'found event') + or return; + + my $lsid = uuid_to_string( + $session->_server_session->session_id->{id}->data, + ); + + my $command = $event->{command}; + my $command_sid = uuid_to_string($command->{lsid}{id}->data); + + is $command_sid, $lsid, 'command has correct session id'; + }; +} From a4308493b5f306d7e65ead7e3ad0e148d734e404 Mon Sep 17 00:00:00 2001 From: Robert Sedlacek Date: Wed, 20 Jun 2018 05:33:16 +0200 Subject: [PATCH 3/8] ChangeStream specification tests --- t/changestreams_spec.t | 276 +++++++++++ t/data/change-streams/README.rst | 156 ++++++ .../change-streams/change-streams-errors.json | 78 +++ .../change-streams/change-streams-errors.yml | 53 +++ t/data/change-streams/change-streams.json | 445 ++++++++++++++++++ t/data/change-streams/change-streams.yml | 299 ++++++++++++ t/lib/MongoDBSpecTest.pm | 55 +++ 7 files changed, 1362 insertions(+) create mode 100644 t/changestreams_spec.t create mode 100644 t/data/change-streams/README.rst create mode 100644 t/data/change-streams/change-streams-errors.json create mode 100644 t/data/change-streams/change-streams-errors.yml create mode 100644 t/data/change-streams/change-streams.json create mode 100644 t/data/change-streams/change-streams.yml create mode 100644 t/lib/MongoDBSpecTest.pm diff --git a/t/changestreams_spec.t b/t/changestreams_spec.t new file mode 100644 index 00000000..e25fc941 --- /dev/null +++ b/t/changestreams_spec.t @@ -0,0 +1,276 @@ +# Copyright 2018 - present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +use strict; +use warnings; +use utf8; +use Test::More 0.96; +use Test::Deep; +use Safe::Isa; + +use MongoDB; + +use lib "t/lib"; +use MongoDBTest qw/skip_unless_mongod build_client get_test_db server_version server_type/; +use MongoDBSpecTest qw/foreach_spec_test/; + +skip_unless_mongod(); + +my $global_client = build_client(); +my $server_version = server_version($global_client); +my $server_type = server_type($global_client); +my $server_topology = + $server_type eq 'RSPrimary' ? 'replicaset' : + $server_type eq 'Standalone' ? 'single' : + 'unknown'; + +my ($db1, $db2); + +foreach_spec_test('t/data/change-streams', sub { + my ($test, $plan) = @_; + + plan skip_all => sprintf( + "Test only runs on (%s) topology", + join('|', @{ $test->{topology} || [] }), + ) unless grep { $_ eq $server_topology } @{ $test->{topology} || [] }; + + my $min_version = defined($test->{minServerVersion}) + ? version->parse('v'.$test->{minServerVersion}) + : undef; + plan skip_all => "Test requires version $min_version" + if defined($min_version) and $server_version < $min_version; + + $db1->drop if defined $db1; + $db2->drop if defined $db2; + + $db1 = get_test_db($global_client); + $db2 = get_test_db($global_client); + + my @events; + my $client = build_client(monitoring_callback => sub { + push @events, shift; + }); + $db1 = $client->get_database($db1->name); + $db2 = $client->get_database($db2->name); + + $db1->run_command([create => $plan->{database_name}]); + $db2->run_command([create => $plan->{database2_name}]); + + my $coll1 = $db1->get_collection($plan->{collection_name}); + my $coll2 = $db2->get_collection($plan->{collection2_name}); + + my $stream_target = + $test->{target} eq 'collection' ? $coll1 : + $test->{target} eq 'database' ? $db1 : + $test->{target} eq 'client' ? $client : + die "Unknown target: ".$test->{target}; + + my $resolve_db = sub { + $_[0] eq $plan->{database_name} ? $db1 : + $_[0] eq $plan->{database2_name} ? $db2 : + undef + }; + + my $stream; + eval { + $stream = $stream_target->watch( + $test->{changeStreamPipeline} || [], + $test->{changeStreamOptions} || {}, + ); + }; + my $stream_error = $@; + if ($stream_error) { + ok(defined($test->{result}{error}), 'expected error') + or diag("Stream Error: $stream_error"); + if (defined(my $code = $test->{result}{error}{code})) { + is $stream_error->code, $code, "error code $code"; + } + } + else { + ok(defined($stream), 'change stream') + or return; + } + + for my $operation (@{ $test->{operations} || [] }) { + my ($op_db, $op_coll, $op_name) + = @{ $operation }{qw( database collection name )}; + $op_db = $op_db->$resolve_db; + $op_coll = $op_db->get_collection($op_coll); + + my $op_sub = __PACKAGE__->can("operation_${op_name}"); + $op_sub->($op_db, $op_coll, $operation->{arguments}); + } + + if (my @expected_events = @{ $test->{expectations} || [] }) { + subtest 'events' => sub { + my $index = 0; + for my $expected (@expected_events) { + $index++; + my $found; + subtest "event (index $index)" => sub { + while (@events) { + my $current = shift @events; + my $data = event_matches($current, $expected); + if (defined $data) { + check_event( + $current, + prepare_spec($data, $resolve_db), + $resolve_db, + ); + return; + } + } + ok 0, 'missing expected event'; + }; + } + }; + } + + if (@{ $test->{result}{success} || [] }) { + subtest 'success' => sub { + my @changes; + while (defined(my $change = $stream->next)) { + push @changes, $change; + } + my @expected_changes = @{ $test->{result}{success} || [] }; + is scalar(@changes), scalar(@expected_changes), + 'expected number'; + if (@changes == @expected_changes) { + for my $index (0 .. $#changes) { + subtest "result (index $index)" => sub { + check_result( + $changes[$index], + prepare_spec( + $expected_changes[$index], + $resolve_db, + ), + $resolve_db + ); + }; + } + } + }; + } +}); + +sub event_matches { + my ($event, $expected) = @_; + + my $data; + if ($data = $expected->{command_started_event}) { + return undef + unless $event->{type} eq 'command_started'; + } + elsif ($data = $expected->{command_succeeded_event}) { + return undef + unless $event->{type} eq 'command_succeeded'; + } + else { + die "Unrecognized event"; + } + + return undef + unless $event->{commandName} eq $data->{command_name}; + + return $data; +} + +sub prepare_spec { + my ($data, $resolve_db) = @_; + if (not defined $data) { + return undef; + } + elsif ($data->$_isa('JSON::PP::Boolean')) { + my $value = !!$data; + return code(sub { + ($_[0] and $value) ? (1) : + (!$_[0] and !$value) ? (1) : + (0, "boolean mismatch") + }); + } + elsif ($data eq 42) { + return code(sub { + defined($_[0]) + ? (1) + : (0, 'value is defined'); + }); + } + elsif (ref $data eq 'HASH') { + if (exists $data->{'$numberInt'}) { + return 0+$data->{'$numberInt'}; + } + return +{map { + ($_, prepare_spec($data->{$_}, $resolve_db)); + } keys %$data}; + } + elsif (ref $data eq 'ARRAY') { + return [map { + prepare_spec($_, $resolve_db); + } @$data]; + } + elsif ( + not ref $data + and defined $data + and defined(my $real_db = $data->$resolve_db) + ) { + return $real_db->name; + } + else { + return $data; + } +} + +sub check_event { + my ($event, $expected) = @_; + + is $event->{databaseName}, $expected->{database_name}, + 'database name', + if exists $expected->{database_name}; + + if (my $command = $expected->{command}) { + for my $key (sort keys %$command) { + cmp_deeply( + ($event->{command} || $event->{reply})->{$key}, + $command->{$key}, + $key, + ); + } + } +} + +sub check_result { + my ($change, $expected, $resolve_db) = @_; + + for my $key (sort keys %$expected) { + if ($key eq 'fullDocument') { + for my $doc_key (sort keys %{ $expected->{$key} }) { + cmp_deeply( + $change->{$key}{$doc_key}, + $expected->{$key}{$doc_key}, + "$key/$doc_key", + ); + } + } + else { + cmp_deeply($change->{$key}, $expected->{$key}, $key); + } + } +} + +sub operation_insertOne { + my ($db, $coll, $args) = @_; + $coll->insert_one($args->{document}); +} + +done_testing; diff --git a/t/data/change-streams/README.rst b/t/data/change-streams/README.rst new file mode 100644 index 00000000..3e45104c --- /dev/null +++ b/t/data/change-streams/README.rst @@ -0,0 +1,156 @@ +.. role:: javascript(code) + :language: javascript + +============== +Change Streams +============== + +.. contents:: + +-------- + +Introduction +============ + +The YAML and JSON files in this directory are platform-independent tests that +drivers can use to prove their conformance to the Change Streams Spec. + +Several prose tests, which are not easily expressed in YAML, are also presented +in this file. Those tests will need to be manually implemented by each driver. + +Spec Test Format +================ + +Each YAML file has the following keys: + +- ``database_name``: The default database +- ``collection_name``: The default collection +- ``database2_name``: Another database +- ``collection2_name``: Another collection +- ``tests``: An array of tests that are to be run independently of each other. + Each test will have some of the following fields: + + - ``description``: The name of the test. + - ``minServerVersion``: The minimum server version to run this test against. If not present, assume there is no minimum server version. + - ``maxServerVersion``: Reserved for later use + - ``failPoint``: Reserved for later use + - ``target``: The entity on which to run the change stream. Valid values are: + + - ``collection``: Watch changes on collection ``database_name.collection_name`` + - ``database``: Watch changes on database ``database_name`` + - ``client``: Watch changes on entire clusters + - ``topology``: An array of server topologies against which to run the test. + Valid topologies are ``single``, ``replicaset``, and ``sharded``. + - ``changeStreamPipeline``: An array of additional aggregation pipeline stages to add to the change stream + - ``changeStreamOptions``: Additional options to add to the changeStream + - ``operations``: Array of documents, each describing an operation. Each document has the following fields: + + - ``database``: Database against which to run the operation + - ``collection``: Collection against which to run the operation + - ``name``: Name of the command to run + - ``arguments``: Object of arguments for the command (ex: document to insert) + + - ``expectations``: Optional list of command-started events in Extended JSON format + - ``result``: Document with ONE of the following fields: + + - ``error``: Describes an error received during the test + - ``success``: An Extended JSON array of documents expected to be received from the changeStream + +Spec Test Match Function +======================== + +The definition of MATCH or MATCHES in the Spec Test Runner is as follows: + +- MATCH takes two values, ``expected`` and ``actual`` +- Notation is "Assert [actual] MATCHES [expected] +- Assertion passes if ``expected`` is a subset of ``actual``, with the values ``42`` and ``"42"`` acting as placeholders for "any value" + +Pseudocode implementation of ``actual`` MATCHES ``expected``: + +:: + + If expected is "42" or 42: + Assert that actual exists (is not null or undefined) + Else: + Assert that actual is of the same JSON type as expected + If expected is a JSON array: + For every idx/value in expected: + Assert that actual[idx] MATCHES value + Else if expected is a JSON object: + For every key/value in expected + Assert that actual[key] MATCHES value + Else: + Assert that expected equals actual + +The expected values for ``result.success`` and ``expectations`` are written in Extended JSON. Drivers may adopt any of the following approaches to comparisons, as long as they are consistent: + +- Convert ``actual`` to Extended JSON and compare to ``expected`` +- Convert ``expected`` and ``actual`` to BSON, and compare them +- Convert ``expected`` and ``actual`` to native equivalents of JSON, and compare them + +Spec Test Runner +================ + +Before running the tests + +- Create a MongoClient ``globalClient``, and connect to the server + +For each YAML file, for each element in ``tests``: + +- If ``topology`` does not include the topology of the server instance(s), skip this test. +- Use ``globalClient`` to + + - Drop the database ``database_name`` + - Drop the database ``database2_name`` + - Create the database ``database_name`` and the collection ``database_name.collection_name`` + - Create the database ``database2_name`` and the collection ``database2_name.collection2_name`` + +- Create a new MongoClient ``client`` +- Begin monitoring all APM events for ``client``. (If the driver uses global listeners, filter out all events that do not originate with ``client``). Filter out any "internal" commands (e.g. ``isMaster``) +- Using ``client``, create a changeStream ``changeStream`` against the specified ``target``. Use ``changeStreamPipeline`` and ``changeStreamOptions`` if they are non-empty +- Using ``globalClient``, run every operation in ``operations`` in serial against the server +- Wait until either: + + - An error occurs + - All operations have been successful AND the changeStream has received as many changes as there are in ``result.success`` + +- Close ``changeStream`` +- If there was an error: + + - Assert that an error was expected for the test. + - Assert that the error MATCHES ``results.error`` + +- Else: + + - Assert that no error was expected for the test + - Assert that the changes received from ``changeStream`` MATCH the results in ``results.success`` + +- If there are any ``expectations`` + + - For each (``expected``, ``idx``) in ``expectations`` + + - Assert that ``actual[idx]`` MATCHES ``expected`` + +- Close the MongoClient ``client`` + +After running all tests + +- Close the MongoClient ``globalClient`` +- Drop database ``database_name`` +- Drop database ``database2_name`` + + +Prose Tests +=========== + +The following tests have not yet been automated, but MUST still be tested + +1. ``ChangeStream`` must continuously track the last seen ``resumeToken`` +2. ``ChangeStream`` will throw an exception if the server response is missing the resume token +3. ``ChangeStream`` will automatically resume one time on a resumable error (including `not master`) with the initial pipeline and options, except for the addition/update of a ``resumeToken``. +4. ``ChangeStream`` will not attempt to resume on a server error +5. ``ChangeStream`` will perform server selection before attempting to resume, using initial ``readPreference`` +6. Ensure that a cursor returned from an aggregate command with a cursor id and an initial empty batch is not closed on the driver side. +7. The ``killCursors`` command sent during the "Resume Process" must not be allowed to throw an exception. +8. ``$changeStream`` stage for ``ChangeStream`` against a server ``>=4.0`` that has not received any results yet MUST include a ``startAtOperationTime`` option when resuming a changestream. +9. ``ChangeStream`` will resume after a ``killCursors`` command is issued for its child cursor. diff --git a/t/data/change-streams/change-streams-errors.json b/t/data/change-streams/change-streams-errors.json new file mode 100644 index 00000000..053ac43e --- /dev/null +++ b/t/data/change-streams/change-streams-errors.json @@ -0,0 +1,78 @@ +{ + "collection_name": "test", + "database_name": "change-stream-tests", + "collection2_name": "test2", + "database2_name": "change-stream-tests-2", + "tests": [ + { + "description": "The watch helper must not throw a custom exception when executed against a single server topology, but instead depend on a server error", + "minServerVersion": "3.6.0", + "target": "collection", + "topology": [ + "single" + ], + "changeStreamPipeline": [], + "changeStreamOptions": {}, + "operations": [], + "expectations": [], + "result": { + "error": { + "code": 40573 + } + } + }, + { + "description": "Change Stream should error when an invalid aggregation stage is passed in", + "minServerVersion": "3.6.0", + "target": "collection", + "topology": [ + "replicaset" + ], + "changeStreamPipeline": [ + { + "$unsupported": "foo" + } + ], + "changeStreamOptions": {}, + "operations": [ + { + "database": "change-stream-tests", + "collection": "test", + "name": "insertOne", + "arguments": { + "document": { + "z": 3 + } + } + } + ], + "expectations": [ + { + "command_started_event": { + "command": { + "aggregate": "test", + "cursor": {}, + "pipeline": [ + { + "$changeStream": { + "fullDocument": "default" + } + }, + { + "$unsupported": "foo" + } + ] + }, + "command_name": "aggregate", + "database_name": "change-stream-tests" + } + } + ], + "result": { + "error": { + "code": 40324 + } + } + } + ] +} diff --git a/t/data/change-streams/change-streams-errors.yml b/t/data/change-streams/change-streams-errors.yml new file mode 100644 index 00000000..1286e865 --- /dev/null +++ b/t/data/change-streams/change-streams-errors.yml @@ -0,0 +1,53 @@ +collection_name: &collection_name "test" +database_name: &database_name "change-stream-tests" +collection2_name: &collection2_name "test2" +database2_name: &database2_name "change-stream-tests-2" +tests: + - + description: The watch helper must not throw a custom exception when executed against a single server topology, but instead depend on a server error + minServerVersion: "3.6.0" + target: collection + topology: + - single + changeStreamPipeline: [] + changeStreamOptions: {} + operations: [] + expectations: [] + result: + error: + code: 40573 + - + description: Change Stream should error when an invalid aggregation stage is passed in + minServerVersion: "3.6.0" + target: collection + topology: + - replicaset + changeStreamPipeline: + - + $unsupported: foo + changeStreamOptions: {} + operations: + - + database: *database_name + collection: *collection_name + name: insertOne + arguments: + document: + z: 3 + expectations: + - + command_started_event: + command: + aggregate: *collection_name + cursor: {} + pipeline: + - + $changeStream: + fullDocument: default + - + $unsupported: foo + command_name: aggregate + database_name: *database_name + result: + error: + code: 40324 \ No newline at end of file diff --git a/t/data/change-streams/change-streams.json b/t/data/change-streams/change-streams.json new file mode 100644 index 00000000..5c28b627 --- /dev/null +++ b/t/data/change-streams/change-streams.json @@ -0,0 +1,445 @@ +{ + "collection_name": "test", + "database_name": "change-stream-tests", + "collection2_name": "test2", + "database2_name": "change-stream-tests-2", + "tests": [ + { + "description": "$changeStream must be the first stage in a change stream pipeline sent to the server", + "minServerVersion": "3.6.0", + "target": "collection", + "topology": [ + "replicaset" + ], + "changeStreamPipeline": [], + "changeStreamOptions": {}, + "operations": [ + { + "database": "change-stream-tests", + "collection": "test", + "name": "insertOne", + "arguments": { + "document": { + "x": 1 + } + } + } + ], + "expectations": [ + { + "command_started_event": { + "command": { + "aggregate": "test", + "cursor": {}, + "pipeline": [ + { + "$changeStream": { + "fullDocument": "default" + } + } + ] + }, + "command_name": "aggregate", + "database_name": "change-stream-tests" + } + } + ], + "result": { + "success": [] + } + }, + { + "description": "The server returns change stream responses in the specified server response format", + "minServerVersion": "3.6.0", + "target": "collection", + "topology": [ + "replicaset" + ], + "changeStreamPipeline": [], + "changeStreamOptions": {}, + "operations": [ + { + "database": "change-stream-tests", + "collection": "test", + "name": "insertOne", + "arguments": { + "document": { + "x": 1 + } + } + } + ], + "expectations": [], + "result": { + "success": [ + { + "_id": "42", + "documentKey": "42", + "operationType": "insert", + "ns": { + "db": "change-stream-tests", + "coll": "test" + }, + "fullDocument": { + "x": { + "$numberInt": "1" + } + } + } + ] + } + }, + { + "description": "Executing a watch helper on a Collection results in notifications for changes to the specified collection", + "minServerVersion": "3.6.0", + "target": "collection", + "topology": [ + "replicaset" + ], + "changeStreamPipeline": [], + "changeStreamOptions": {}, + "operations": [ + { + "database": "change-stream-tests", + "collection": "test2", + "name": "insertOne", + "arguments": { + "document": { + "x": 1 + } + } + }, + { + "database": "change-stream-tests-2", + "collection": "test", + "name": "insertOne", + "arguments": { + "document": { + "y": 2 + } + } + }, + { + "database": "change-stream-tests", + "collection": "test", + "name": "insertOne", + "arguments": { + "document": { + "z": 3 + } + } + } + ], + "expectations": [ + { + "command_started_event": { + "command": { + "aggregate": "test", + "cursor": {}, + "pipeline": [ + { + "$changeStream": { + "fullDocument": "default" + } + } + ] + }, + "command_name": "aggregate", + "database_name": "change-stream-tests" + } + } + ], + "result": { + "success": [ + { + "operationType": "insert", + "ns": { + "db": "change-stream-tests", + "coll": "test" + }, + "fullDocument": { + "z": { + "$numberInt": "3" + } + } + } + ] + } + }, + { + "description": "Change Stream should allow valid aggregate pipeline stages", + "minServerVersion": "3.6.0", + "target": "collection", + "topology": [ + "replicaset" + ], + "changeStreamPipeline": [ + { + "$match": { + "fullDocument.z": 3 + } + } + ], + "changeStreamOptions": {}, + "operations": [ + { + "database": "change-stream-tests", + "collection": "test", + "name": "insertOne", + "arguments": { + "document": { + "y": 2 + } + } + }, + { + "database": "change-stream-tests", + "collection": "test", + "name": "insertOne", + "arguments": { + "document": { + "z": 3 + } + } + } + ], + "expectations": [ + { + "command_started_event": { + "command": { + "aggregate": "test", + "cursor": {}, + "pipeline": [ + { + "$changeStream": { + "fullDocument": "default" + } + }, + { + "$match": { + "fullDocument.z": { + "$numberInt": "3" + } + } + } + ] + }, + "command_name": "aggregate", + "database_name": "change-stream-tests" + } + } + ], + "result": { + "success": [ + { + "operationType": "insert", + "ns": { + "db": "change-stream-tests", + "coll": "test" + }, + "fullDocument": { + "z": { + "$numberInt": "3" + } + } + } + ] + } + }, + { + "description": "Executing a watch helper on a Database results in notifications for changes to all collections in the specified database.", + "minServerVersion": "3.8.0", + "target": "database", + "topology": [ + "replicaset" + ], + "changeStreamPipeline": [], + "changeStreamOptions": {}, + "operations": [ + { + "database": "change-stream-tests", + "collection": "test2", + "name": "insertOne", + "arguments": { + "document": { + "x": 1 + } + } + }, + { + "database": "change-stream-tests-2", + "collection": "test", + "name": "insertOne", + "arguments": { + "document": { + "y": 2 + } + } + }, + { + "database": "change-stream-tests", + "collection": "test", + "name": "insertOne", + "arguments": { + "document": { + "z": 3 + } + } + } + ], + "expectations": [ + { + "command_started_event": { + "command": { + "aggregate": { + "$numberInt": "1" + }, + "cursor": {}, + "pipeline": [ + { + "$changeStream": { + "fullDocument": "default" + } + } + ] + }, + "command_name": "aggregate", + "database_name": "change-stream-tests" + } + } + ], + "result": { + "success": [ + { + "operationType": "insert", + "ns": { + "db": "change-stream-tests", + "coll": "test2" + }, + "fullDocument": { + "x": { + "$numberInt": "1" + } + } + }, + { + "operationType": "insert", + "ns": { + "db": "change-stream-tests", + "coll": "test" + }, + "fullDocument": { + "z": { + "$numberInt": "3" + } + } + } + ] + } + }, + { + "description": "Executing a watch helper on a MongoClient results in notifications for changes to all collections in all databases in the cluster.", + "minServerVersion": "3.8.0", + "target": "client", + "topology": [ + "replicaset" + ], + "changeStreamPipeline": [], + "changeStreamOptions": {}, + "operations": [ + { + "database": "change-stream-tests", + "collection": "test2", + "name": "insertOne", + "arguments": { + "document": { + "x": 1 + } + } + }, + { + "database": "change-stream-tests-2", + "collection": "test", + "name": "insertOne", + "arguments": { + "document": { + "y": 2 + } + } + }, + { + "database": "change-stream-tests", + "collection": "test", + "name": "insertOne", + "arguments": { + "document": { + "z": 3 + } + } + } + ], + "expectations": [ + { + "command_started_event": { + "command": { + "aggregate": { + "$numberInt": "1" + }, + "cursor": {}, + "pipeline": [ + { + "$changeStream": { + "fullDocument": "default", + "allChangesForCluster": true + } + } + ] + }, + "command_name": "aggregate", + "database_name": "admin" + } + } + ], + "result": { + "success": [ + { + "operationType": "insert", + "ns": { + "db": "change-stream-tests", + "coll": "test2" + }, + "fullDocument": { + "x": { + "$numberInt": "1" + } + } + }, + { + "operationType": "insert", + "ns": { + "db": "change-stream-tests-2", + "coll": "test" + }, + "fullDocument": { + "y": { + "$numberInt": "2" + } + } + }, + { + "operationType": "insert", + "ns": { + "db": "change-stream-tests", + "coll": "test" + }, + "fullDocument": { + "z": { + "$numberInt": "3" + } + } + } + ] + } + } + ] +} diff --git a/t/data/change-streams/change-streams.yml b/t/data/change-streams/change-streams.yml new file mode 100644 index 00000000..720a22f9 --- /dev/null +++ b/t/data/change-streams/change-streams.yml @@ -0,0 +1,299 @@ +collection_name: &collection_name "test" +database_name: &database_name "change-stream-tests" +collection2_name: &collection2_name "test2" +database2_name: &database2_name "change-stream-tests-2" +tests: + - + description: "$changeStream must be the first stage in a change stream pipeline sent to the server" + minServerVersion: "3.6.0" + target: collection + topology: + - replicaset + changeStreamPipeline: [] + changeStreamOptions: {} + operations: + - + database: *database_name + collection: *collection_name + name: insertOne + arguments: + document: + x: 1 + expectations: + - + command_started_event: + command: + aggregate: *collection_name + cursor: {} + pipeline: + - + $changeStream: + fullDocument: default + command_name: aggregate + database_name: *database_name + result: + success: [] + - + description: The server returns change stream responses in the specified server response format + minServerVersion: "3.6.0" + target: collection + topology: + - replicaset + changeStreamPipeline: [] + changeStreamOptions: {} + operations: + - + database: *database_name + collection: *collection_name + name: insertOne + arguments: + document: + x: 1 + expectations: [] + result: + success: + - + _id: "42" + documentKey: "42" + operationType: insert + ns: + db: *database_name + coll: *collection_name + fullDocument: + x: + $numberInt: "1" + - + description: Executing a watch helper on a Collection results in notifications for changes to the specified collection + minServerVersion: "3.6.0" + target: collection + topology: + - replicaset + changeStreamPipeline: [] + changeStreamOptions: {} + operations: + - + database: *database_name + collection: *collection2_name + name: insertOne + arguments: + document: + x: 1 + - + database: *database2_name + collection: *collection_name + name: insertOne + arguments: + document: + y: 2 + - + database: *database_name + collection: *collection_name + name: insertOne + arguments: + document: + z: 3 + expectations: + - + command_started_event: + command: + aggregate: *collection_name + cursor: {} + pipeline: + - + $changeStream: + fullDocument: default + command_name: aggregate + database_name: *database_name + result: + success: + - + operationType: insert + ns: + db: *database_name + coll: *collection_name + fullDocument: + z: + $numberInt: "3" + - + description: Change Stream should allow valid aggregate pipeline stages + minServerVersion: "3.6.0" + target: collection + topology: + - replicaset + changeStreamPipeline: + - + $match: + "fullDocument.z": 3 + changeStreamOptions: {} + operations: + - + database: *database_name + collection: *collection_name + name: insertOne + arguments: + document: + y: 2 + - + database: *database_name + collection: *collection_name + name: insertOne + arguments: + document: + z: 3 + expectations: + - + command_started_event: + command: + aggregate: *collection_name + cursor: {} + pipeline: + - + $changeStream: + fullDocument: default + - + $match: + "fullDocument.z": + $numberInt: "3" + command_name: aggregate + database_name: *database_name + result: + success: + - + operationType: insert + ns: + db: *database_name + coll: *collection_name + fullDocument: + z: + $numberInt: "3" + - + description: Executing a watch helper on a Database results in notifications for changes to all collections in the specified database. + minServerVersion: "3.8.0" + target: database + topology: + - replicaset + changeStreamPipeline: [] + changeStreamOptions: {} + operations: + - + database: *database_name + collection: *collection2_name + name: insertOne + arguments: + document: + x: 1 + - + database: *database2_name + collection: *collection_name + name: insertOne + arguments: + document: + y: 2 + - + database: *database_name + collection: *collection_name + name: insertOne + arguments: + document: + z: 3 + expectations: + - + command_started_event: + command: + aggregate: + $numberInt: "1" + cursor: {} + pipeline: + - + $changeStream: + fullDocument: default + command_name: aggregate + database_name: *database_name + result: + success: + - + operationType: insert + ns: + db: *database_name + coll: *collection2_name + fullDocument: + x: + $numberInt: "1" + - + operationType: insert + ns: + db: *database_name + coll: *collection_name + fullDocument: + z: + $numberInt: "3" + - + description: Executing a watch helper on a MongoClient results in notifications for changes to all collections in all databases in the cluster. + minServerVersion: "3.8.0" + target: client + topology: + - replicaset + changeStreamPipeline: [] + changeStreamOptions: {} + operations: + - + database: *database_name + collection: *collection2_name + name: insertOne + arguments: + document: + x: 1 + - + database: *database2_name + collection: *collection_name + name: insertOne + arguments: + document: + y: 2 + - + database: *database_name + collection: *collection_name + name: insertOne + arguments: + document: + z: 3 + expectations: + - + command_started_event: + command: + aggregate: + $numberInt: "1" + cursor: {} + pipeline: + - + $changeStream: + fullDocument: default + allChangesForCluster: true + command_name: aggregate + database_name: admin + result: + success: + - + operationType: insert + ns: + db: *database_name + coll: *collection2_name + fullDocument: + x: + $numberInt: "1" + - + operationType: insert + ns: + db: *database2_name + coll: *collection_name + fullDocument: + y: + $numberInt: "2" + - + operationType: insert + ns: + db: *database_name + coll: *collection_name + fullDocument: + z: + $numberInt: "3" diff --git a/t/lib/MongoDBSpecTest.pm b/t/lib/MongoDBSpecTest.pm new file mode 100644 index 00000000..3ae2966c --- /dev/null +++ b/t/lib/MongoDBSpecTest.pm @@ -0,0 +1,55 @@ +# Copyright 2013 - present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +package MongoDBSpecTest; + +use strict; +use warnings; + +use Exporter 'import'; +use Test::More; +use Path::Tiny; +use JSON::MaybeXS qw( is_bool decode_json ); + +our @EXPORT_OK = qw( + foreach_spec_test +); + +sub foreach_spec_test { + my ($dir, $callback) = @_; + + $dir = path($dir); + my $iterator = $dir->iterator( { recurse => 1 } ); + + while ( my $path = $iterator->() ) { + next unless -f $path && $path =~ /\.json$/; + + my $plan = eval { decode_json( $path->slurp_utf8 ) }; + if ($@) { + die "Error decoding $path: $@"; + } + + my $name = $path->relative($dir)->basename(".json"); + + subtest $name => sub { + for my $test ( @{ $plan->{tests} } ) { + subtest $test->{description} => sub { + $callback->($test, $plan); + }; + } + }; + } +} + +1; From f909536373b90b510e287c036006267dc31a9570 Mon Sep 17 00:00:00 2001 From: Robert Sedlacek Date: Wed, 20 Jun 2018 05:43:09 +0200 Subject: [PATCH 4/8] Fixed startAtOperationTime fallback and disabled it due to spec test failures --- lib/MongoDB/Op/_ChangeStream.pm | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/MongoDB/Op/_ChangeStream.pm b/lib/MongoDB/Op/_ChangeStream.pm index 47ac6420..125b966d 100644 --- a/lib/MongoDB/Op/_ChangeStream.pm +++ b/lib/MongoDB/Op/_ChangeStream.pm @@ -171,6 +171,7 @@ sub execute { # default startAtOperationTime when supported and not if ( + 0 && # TODO: Not in spec tests, throws when included !defined($start_op_time) && !$self->changes_received && $link->supports_4_0_changestreams @@ -195,9 +196,9 @@ sub execute { my @pipeline = ( {'$changeStream' => { - (defined($self->start_at_operation_time) + (defined($start_op_time) ? (startAtOperationTime => BSON::Timestamp->new( - $self->start_at_operation_time, + $start_op_time, )) : () ), From 866adcb0785d00423f8a9c6633df1d439dd3c04c Mon Sep 17 00:00:00 2001 From: Robert Sedlacek Date: Thu, 21 Jun 2018 17:33:39 +0200 Subject: [PATCH 5/8] ChangeStream properties fixes and refactoring cleanups --- lib/MongoDB/BulkWrite.pm | 2 +- lib/MongoDB/ChangeStream.pm | 80 +++++++++++++++++++++-------- lib/MongoDB/Collection.pm | 91 ++++++++++----------------------- lib/MongoDB/Database.pm | 55 ++++---------------- lib/MongoDB/MongoClient.pm | 25 ++------- lib/MongoDB/Op/_ChangeStream.pm | 55 ++++---------------- 6 files changed, 112 insertions(+), 196 deletions(-) diff --git a/lib/MongoDB/BulkWrite.pm b/lib/MongoDB/BulkWrite.pm index d2a03222..5a924865 100644 --- a/lib/MongoDB/BulkWrite.pm +++ b/lib/MongoDB/BulkWrite.pm @@ -265,7 +265,7 @@ sub execute { $write_concern ||= $self->collection->write_concern; - my $session = $self->collection->_get_session_from_hashref( $options ); + my $session = $self->_client->_get_session_from_hashref( $options ); my $op = MongoDB::Op::_BulkWrite->_new( client => $self->_client, diff --git a/lib/MongoDB/ChangeStream.pm b/lib/MongoDB/ChangeStream.pm index 3bb32c19..681c06c1 100644 --- a/lib/MongoDB/ChangeStream.pm +++ b/lib/MongoDB/ChangeStream.pm @@ -27,17 +27,21 @@ use MongoDB::Cursor; use MongoDB::Op::_ChangeStream; use MongoDB::Error; use Safe::Isa; +use BSON::Timestamp; use MongoDB::_Types qw( MongoDBCollection ArrayOfHashRef + Boolish + Intish + BSONTimestamp + ClientSession ); use Types::Standard qw( - Bool InstanceOf - Int HashRef Maybe Str + Num ); use namespace::clean -except => 'meta'; @@ -45,38 +49,37 @@ use namespace::clean -except => 'meta'; has _result => ( is => 'rw', isa => InstanceOf['MongoDB::QueryResult'], + init_arg => undef, lazy => 1, builder => '_build_result', clearer => '_clear_result', ); -has client => ( +has _client => ( is => 'ro', isa => InstanceOf['MongoDB::MongoClient'], + init_arg => 'client', required => 1, ); has _op_args => ( is => 'ro', isa => HashRef, - default => sub { {} }, init_arg => 'op_args', + required => 1, ); -has collection => ( - is => 'ro', - isa => MongoDBCollection, -); - -has pipeline => ( +has _pipeline => ( is => 'ro', isa => ArrayOfHashRef, + init_arg => 'pipeline', required => 1, ); -has full_document => ( +has _full_document => ( is => 'ro', isa => Str, + init_arg => 'full_document', predicate => '_has_full_document', ); @@ -84,23 +87,50 @@ has _resume_token => ( is => 'rw', init_arg => 'resume_after', predicate => '_has_resume_token', - lazy => 1, ); -has all_changes_for_cluster => ( +has _all_changes_for_cluster => ( is => 'ro', - isa => Bool, + isa => Boolish, + init_arg => 'all_changes_for_cluster', default => sub { 0 }, ); has _changes_received => ( is => 'rw', + isa => Boolish, + init_arg => 'changes_received', default => sub { 0 }, ); -has start_at_operation_time => ( +has _start_at_operation_time => ( + is => 'ro', + isa => BSONTimestamp, + init_arg => 'start_at_operation_time', + predicate => '_has_start_at_operation_time', + coerce => sub { + ref($_[0]) ? $_[0] : BSON::Timestamp->new($_[0]) + }, +); + +has _session => ( is => 'ro', - isa => Maybe[Int], + isa => Maybe[ClientSession], + init_arg => 'session', +); + +has _options => ( + is => 'ro', + isa => HashRef, + init_arg => 'options', + default => sub { {} }, +); + +has _max_await_time_ms => ( + is => 'ro', + isa => Num, + init_arg => 'max_await_time_ms', + predicate => '_has_max_await_time_ms', ); sub BUILD { @@ -114,22 +144,28 @@ sub _build_result { my ($self) = @_; my $op = MongoDB::Op::_ChangeStream->new( - pipeline => $self->pipeline, - all_changes_for_cluster => $self->all_changes_for_cluster, + pipeline => $self->_pipeline, + all_changes_for_cluster => $self->_all_changes_for_cluster, changes_received => $self->_changes_received, - defined($self->start_at_operation_time) - ? (start_at_operation_time => $self->start_at_operation_time) + session => $self->_session, + options => $self->_options, + client => $self->_client, + $self->_has_start_at_operation_time + ? (start_at_operation_time => $self->_start_at_operation_time) : (), $self->_has_full_document - ? (full_document => $self->full_document) + ? (full_document => $self->_full_document) : (), $self->_has_resume_token ? (resume_after => $self->_resume_token) : (), + $self->_has_max_await_time_ms + ? (maxAwaitTimeMS => $self->_max_await_time_ms) + : (), %{ $self->_op_args }, ); - return $self->client->send_read_op($op); + return $self->_client->send_read_op($op); } =head1 STREAM METHODS diff --git a/lib/MongoDB/Collection.pm b/lib/MongoDB/Collection.pm index 5b2906d2..a6bceac4 100644 --- a/lib/MongoDB/Collection.pm +++ b/lib/MongoDB/Collection.pm @@ -372,7 +372,7 @@ sub insert_one { return $_[0]->client->send_retryable_write_op( MongoDB::Op::_InsertOne->_new( - session => $_[0]->_get_session_from_hashref( $_[2] ), + session => $_[0]->client->_get_session_from_hashref( $_[2] ), ( defined $_[2] ? (%{$_[2]}) : () ), document => $_[1], %{ $_[0]->_op_args }, @@ -426,7 +426,7 @@ sub insert_many { # default ordered => 1, # user overrides - session => $_[0]->_get_session_from_hashref( $_[2] ), + session => $_[0]->client->_get_session_from_hashref( $_[2] ), ( defined $_[2] ? ( %{ $_[2] } ) : () ), # un-overridable queue => [ map { [ insert => $_ ] } @{ $_[1] } ], @@ -478,7 +478,7 @@ sub delete_one { return $_[0]->client->send_retryable_write_op( MongoDB::Op::_Delete->_new( - session => $_[0]->_get_session_from_hashref( $_[2] ), + session => $_[0]->client->_get_session_from_hashref( $_[2] ), ( defined $_[2] ? (%{$_[2]}) : () ), filter => $_[1], just_one => 1, @@ -515,7 +515,7 @@ sub delete_many { return $_[0]->client->send_write_op( MongoDB::Op::_Delete->_new( - session => $_[0]->_get_session_from_hashref( $_[2] ), + session => $_[0]->client->_get_session_from_hashref( $_[2] ), ( defined $_[2] ? (%{$_[2]}) : () ), filter => $_[1], just_one => 0, @@ -560,7 +560,7 @@ sub replace_one { return $_[0]->client->send_retryable_write_op( MongoDB::Op::_Update->_new( - session => $_[0]->_get_session_from_hashref( $_[3] ), + session => $_[0]->client->_get_session_from_hashref( $_[3] ), ( defined $_[3] ? (%{$_[3]}) : () ), filter => $_[1], update => $_[2], @@ -611,7 +611,7 @@ sub update_one { return $_[0]->client->send_retryable_write_op( MongoDB::Op::_Update->_new( - session => $_[0]->_get_session_from_hashref( $_[3] ), + session => $_[0]->client->_get_session_from_hashref( $_[3] ), ( defined $_[3] ? (%{$_[3]}) : () ), filter => $_[1], update => $_[2], @@ -662,7 +662,7 @@ sub update_many { return $_[0]->client->send_write_op( MongoDB::Op::_Update->_new( - session => $_[0]->_get_session_from_hashref( $_[3] ), + session => $_[0]->client->_get_session_from_hashref( $_[3] ), ( defined $_[3] ? (%{$_[3]}) : () ), filter => $_[1], update => $_[2], @@ -771,7 +771,7 @@ sub find { $options->{maxTimeMS} = $self->max_time_ms; } - my $session = $self->_get_session_from_hashref( $options ); + my $session = $self->client->_get_session_from_hashref( $options ); # coerce to IxHash __ixhash( $options, 'sort' ); @@ -836,7 +836,7 @@ sub find_one { $options->{maxTimeMS} = $self->max_time_ms; } - my $session = $self->_get_session_from_hashref( $options ); + my $session = $self->client->_get_session_from_hashref( $options ); # coerce to IxHash __ixhash( $options, 'sort' ); @@ -919,7 +919,7 @@ sub find_one_and_delete { $options->{maxTimeMS} = $self->max_time_ms; } - my $session = $self->_get_session_from_hashref( $options ); + my $session = $self->client->_get_session_from_hashref( $options ); # coerce to IxHash __ixhash($options, 'sort'); @@ -1085,9 +1085,10 @@ The optional second argument is a hash reference with options: C * C - The maximum number of milliseconds for the server to wait before responding. -* C - A timestamp specifying at what point in time - changes will start being watched. Cannot be specified together with - C. +* C - A L specifying at what point + in time changes will start being watched. Cannot be specified together + with C. Plain values will be coerced to L + objects. * C - the session to use for these operations. If not supplied, will use an implicit session. For more information see L @@ -1107,20 +1108,7 @@ sub watch { $pipeline ||= []; $options ||= {}; - my $session = $self->_get_session_from_hashref( $options ); - - # boolify some options - for my $k (qw/allowDiskUse explain/) { - $options->{$k} = ( $options->{$k} ? true : false ) if exists $options->{$k}; - } - - # possibly fallback to default maxTimeMS - if ( ! exists $options->{maxTimeMS} && $self->max_time_ms ) { - $options->{maxTimeMS} = $self->max_time_ms; - } - - # read preferences are ignored if the last stage is $out - my ($last_op) = keys %{ $pipeline->[-1] || {} }; + my $session = $self->client->_get_session_from_hashref( $options ); return MongoDB::ChangeStream->new( exists($options->{startAtOperationTime}) @@ -1132,19 +1120,14 @@ sub watch { exists($options->{resumeAfter}) ? (resume_after => delete $options->{resumeAfter}) : (), + exists($options->{maxAwaitTimeMS}) + ? (max_await_time_ms => delete $options->{maxAwaitTimeMS}) + : (), client => $self->client, - collection => $self, pipeline => $pipeline, - op_args => { - options => $options, - read_concern => $self->read_concern, - has_out => defined($last_op) && $last_op eq '$out', - exists($options->{maxAwaitTimeMS}) - ? (maxAwaitTimeMS => delete $options->{maxAwaitTimeMS}) - : (), - session => $session, - %{ $self->_op_args }, - }, + session => $session, + options => $options, + op_args => $self->_op_args, ); } @@ -1214,7 +1197,7 @@ sub aggregate { my ( $self, $pipeline, $options ) = @_; $options ||= {}; - my $session = $self->_get_session_from_hashref( $options ); + my $session = $self->client->_get_session_from_hashref( $options ); # boolify some options for my $k (qw/allowDiskUse explain/) { @@ -1391,7 +1374,7 @@ sub distinct { $options->{maxTimeMS} = $self->max_time_ms; } - my $session = $self->_get_session_from_hashref( $options ); + my $session = $self->client->_get_session_from_hashref( $options ); my $op = MongoDB::Op::_Distinct->_new( fieldname => $fieldname, @@ -1495,7 +1478,7 @@ collection. sub rename { my ( $self, $new_name, $options ) = @_; - my $session = $self->_get_session_from_hashref( $options ); + my $session = $self->client->_get_session_from_hashref( $options ); my $op = MongoDB::Op::_RenameCollection->_new( src_ns => $self->full_name, @@ -1521,7 +1504,7 @@ Deletes a collection as well as all of its indexes. sub drop { my ( $self, $options ) = @_; - my $session = $self->_get_session_from_hashref( $options ); + my $session = $self->client->_get_session_from_hashref( $options ); $self->client->send_write_op( MongoDB::Op::_DropCollection->_new( @@ -1663,7 +1646,7 @@ sub bulk_write { my $ordered = exists $options->{ordered} ? delete $options->{ordered} : 1; - my $session = $self->_get_session_from_hashref( $options ); + my $session = $self->client->_get_session_from_hashref( $options ); my $bulk = $ordered ? $self->ordered_bulk($options) : $self->unordered_bulk($options); @@ -1794,7 +1777,7 @@ sub _find_one_and_update_or_replace { # pass separately for MongoDB::Role::_BypassValidation my $bypass = delete $options->{bypassDocumentValidation}; - my $session = $self->_get_session_from_hashref( $options ); + my $session = $self->client->_get_session_from_hashref( $options ); my $op = MongoDB::Op::_FindAndUpdate->_new( filter => $filter, @@ -1810,24 +1793,6 @@ sub _find_one_and_update_or_replace { : $self->client->send_write_op( $op ); } -# Extracts a session from a provided hashref, or returns an implicit session -sub _get_session_from_hashref { - my ( $self, $hashref ) = @_; - - my $session = delete $hashref->{session}; - - if ( defined $session ) { - MongoDB::UsageError->throw( "Cannot use session from another client" ) - if ( $session->client->_id ne $self->client->_id ); - MongoDB::UsageError->throw( "Cannot use session which has ended" ) - if ! defined $session->session_id; - } else { - $session = $self->client->_maybe_get_implicit_session; - } - - return $session; -} - #--------------------------------------------------------------------------# # Deprecated functions #--------------------------------------------------------------------------# @@ -1845,7 +1810,7 @@ sub count { $options->{maxTimeMS} = $self->max_time_ms; } - my $session = $self->_get_session_from_hashref($options); + my $session = $self->client->_get_session_from_hashref($options); # string is OK so we check ref, not just exists __ixhash( $options, 'hint' ) if ref $options->{hint}; diff --git a/lib/MongoDB/Database.pm b/lib/MongoDB/Database.pm index 2340a0e3..06e24e2a 100644 --- a/lib/MongoDB/Database.pm +++ b/lib/MongoDB/Database.pm @@ -196,7 +196,7 @@ sub list_collections { $options->{maxTimeMS} = $self->max_time_ms; } - my $session = $self->_get_session_from_hashref( $options ); + my $session = $self->_client->_get_session_from_hashref( $options ); my $op = MongoDB::Op::_ListCollections->_new( db_name => $self->name, @@ -333,7 +333,7 @@ Valid options include: sub drop { my ( $self, $options ) = @_; - my $session = $self->_get_session_from_hashref( $options ); + my $session = $self->_client->_get_session_from_hashref( $options ); return $self->_client->send_write_op( MongoDB::Op::_DropDatabase->_new( @@ -404,7 +404,7 @@ sub run_command { ref($read_pref) ? $read_pref : ( mode => $read_pref ) ) if $read_pref && ref($read_pref) ne 'MongoDB::ReadPreference'; - my $session = $self->_get_session_from_hashref( $options ); + my $session = $self->_client->_get_session_from_hashref( $options ); my $op = MongoDB::Op::_Command->_new( client => $self->_client, @@ -429,7 +429,7 @@ sub _aggregate { my ( $self, $pipeline, $options ) = @_; $options ||= {}; - my $session = $self->_get_session_from_hashref( $options ); + my $session = $self->_client->_get_session_from_hashref( $options ); # boolify some options for my $k (qw/allowDiskUse explain/) { @@ -463,26 +463,6 @@ sub _aggregate { return $self->_client->send_read_op($op); } -# Extracts a session from a provided hashref, or returns an implicit session -# Almost identical to same subroutine in Collection, however in Database the -# client attribute is private. -sub _get_session_from_hashref { - my ( $self, $hashref ) = @_; - - my $session = delete $hashref->{session}; - - if ( defined $session ) { - MongoDB::UsageError->throw( "Cannot use session from another client" ) - if ( $session->client->_id ne $self->_client->_id ); - MongoDB::UsageError->throw( "Cannot use session which has ended" ) - if ! defined $session->session_id; - } else { - $session = $self->_client->_maybe_get_implicit_session; - } - - return $session; -} - =method watch Watches for changes on this database. @@ -520,20 +500,7 @@ sub watch { $pipeline ||= []; $options ||= {}; - my $session = $self->_get_session_from_hashref( $options ); - - # boolify some options - for my $k (qw/allowDiskUse explain/) { - $options->{$k} = ( $options->{$k} ? true : false ) if exists $options->{$k}; - } - - # possibly fallback to default maxTimeMS - if ( ! exists $options->{maxTimeMS} && $self->max_time_ms ) { - $options->{maxTimeMS} = $self->max_time_ms; - } - - # read preferences are ignored if the last stage is $out - my ($last_op) = keys %{ $pipeline->[-1] || {} }; + my $session = $self->_client->_get_session_from_hashref( $options ); return MongoDB::ChangeStream->new( exists($options->{startAtOperationTime}) @@ -545,17 +512,15 @@ sub watch { exists($options->{resumeAfter}) ? (resume_after => delete $options->{resumeAfter}) : (), + exists($options->{maxAwaitTimeMS}) + ? (max_await_time_ms => delete $options->{maxAwaitTimeMS}) + : (), client => $self->_client, pipeline => $pipeline, + session => $session, + options => $options, op_args => { - options => $options, read_concern => $self->read_concern, - has_out => defined($last_op) && $last_op eq '$out', - exists($options->{maxAwaitTimeMS}) - ? (maxAwaitTimeMS => delete $options->{maxAwaitTimeMS}) - : (), - session => $session, - client => $self->_client, bson_codec => $self->bson_codec, db_name => $self->name, coll_name => 1, # Magic not-an-actual-collection number diff --git a/lib/MongoDB/MongoClient.pm b/lib/MongoDB/MongoClient.pm index 1a427379..1b08d9c3 100644 --- a/lib/MongoDB/MongoClient.pm +++ b/lib/MongoDB/MongoClient.pm @@ -1926,19 +1926,6 @@ sub watch { my $session = $self->_get_session_from_hashref( $options ); - # boolify some options - for my $k (qw/allowDiskUse explain/) { - $options->{$k} = ( $options->{$k} ? true : false ) if exists $options->{$k}; - } - - # possibly fallback to default maxTimeMS - if ( ! exists $options->{maxTimeMS} && $self->max_time_ms ) { - $options->{maxTimeMS} = $self->max_time_ms; - } - - # read preferences are ignored if the last stage is $out - my ($last_op) = keys %{ $pipeline->[-1] || {} }; - return MongoDB::ChangeStream->new( exists($options->{startAtOperationTime}) ? (start_at_operation_time => delete $options->{startAtOperationTime}) @@ -1949,18 +1936,16 @@ sub watch { exists($options->{resumeAfter}) ? (resume_after => delete $options->{resumeAfter}) : (), + exists($options->{maxAwaitTimeMS}) + ? (max_await_time_ms => delete $options->{maxAwaitTimeMS}) + : (), client => $self, all_changes_for_cluster => 1, pipeline => $pipeline, + session => $session, + options => $options, op_args => { - options => $options, read_concern => $self->read_concern, - has_out => defined($last_op) && $last_op eq '$out', - exists($options->{maxAwaitTimeMS}) - ? (maxAwaitTimeMS => delete $options->{maxAwaitTimeMS}) - : (), - session => $session, - client => $self, db_name => 'admin',, coll_name => 1, full_name => 'admin.1', diff --git a/lib/MongoDB/Op/_ChangeStream.pm b/lib/MongoDB/Op/_ChangeStream.pm index 125b966d..1cc787ce 100644 --- a/lib/MongoDB/Op/_ChangeStream.pm +++ b/lib/MongoDB/Op/_ChangeStream.pm @@ -1,4 +1,4 @@ -# Copyright 2015 - present MongoDB, Inc. +# Copyright 2018 - present MongoDB, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,7 +16,7 @@ use strict; use warnings; package MongoDB::Op::_ChangeStream; -# Encapsulate aggregate operation; return MongoDB::QueryResult +# Encapsulate changestream operation; return MongoDB::QueryResult use version; our $VERSION = 'v1.999.1'; @@ -29,6 +29,7 @@ use MongoDB::Op::_Command; use MongoDB::_Types qw( ArrayOfHashRef Boolish + BSONTimestamp ); use Types::Standard qw( HashRef @@ -36,8 +37,6 @@ use Types::Standard qw( Num Str Maybe - Int - Bool ); use namespace::clean; @@ -60,12 +59,6 @@ has options => ( isa => HashRef, ); -has has_out => ( - is => 'ro', - required => 1, - isa => Boolish, -); - has maxAwaitTimeMS => ( is => 'rw', isa => Num, @@ -84,13 +77,13 @@ has resume_after => ( has all_changes_for_cluster => ( is => 'ro', - isa => Bool, + isa => Boolish, default => sub { 0 }, ); has start_at_operation_time => ( is => 'ro', - isa => Maybe[Int], + isa => BSONTimestamp, ); has changes_received => ( @@ -155,8 +148,6 @@ sub execute { delete $options->{cursor}; } - my $has_out = $self->has_out; - if ( $self->coll_name eq 1 && ! $link->supports_db_aggregation ) { MongoDB::Error->throw( "Calling aggregate with a collection name of '1' is not supported on Wire Version < 6" ); @@ -197,9 +188,7 @@ sub execute { my @pipeline = ( {'$changeStream' => { (defined($start_op_time) - ? (startAtOperationTime => BSON::Timestamp->new( - $start_op_time, - )) + ? (startAtOperationTime => $start_op_time) : () ), ($self->all_changes_for_cluster @@ -222,12 +211,9 @@ sub execute { aggregate => $self->coll_name, pipeline => \@pipeline, %$options, - ( - !$has_out && $link->supports_read_concern ? @{ $self->read_concern->as_args( $self->session) } : () - ), - ( - $has_out && $link->supports_helper_write_concern ? @{ $self->write_concern->as_args } : () - ), + $link->supports_read_concern + ? @{ $self->read_concern->as_args( $self->session) } + : (), ); my $op = MongoDB::Op::_Command->_new( @@ -235,34 +221,13 @@ sub execute { query => Tie::IxHash->new(@command), query_flags => {}, bson_codec => $self->bson_codec, - ( $has_out ? () : ( read_preference => $self->read_preference ) ), + read_preference => $self->read_preference, session => $self->session, monitoring_callback => $self->monitoring_callback, ); my $res = $op->execute( $link, $topology ); - $res->assert_no_write_concern_error if $has_out; - - # For explain, we give the whole response as fields have changed in - # different server versions - if ( $options->{explain} ) { - return MongoDB::QueryResult->_new( - _client => $self->client, - _address => $link->address, - _full_name => '', - _bson_codec => $self->bson_codec, - _batch_size => 1, - _cursor_at => 0, - _limit => 0, - _cursor_id => 0, - _cursor_start => 0, - _cursor_flags => {}, - _cursor_num => 1, - _docs => [ $res->output ], - ); - } - # Fake up a single-batch cursor if we didn't get a cursor response. # We use the 'results' fields as the first (and only) batch if ( !$res->output->{cursor} ) { From b423da99823ae7b326aadba6038ff0212e473f76 Mon Sep 17 00:00:00 2001 From: Robert Sedlacek Date: Thu, 21 Jun 2018 17:36:07 +0200 Subject: [PATCH 6/8] Don't send ChangeStream preset when resuming --- lib/MongoDB/Op/_ChangeStream.pm | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/MongoDB/Op/_ChangeStream.pm b/lib/MongoDB/Op/_ChangeStream.pm index 1cc787ce..a40faf18 100644 --- a/lib/MongoDB/Op/_ChangeStream.pm +++ b/lib/MongoDB/Op/_ChangeStream.pm @@ -156,7 +156,7 @@ sub execute { my $start_op_time; # preset startAtOperationTime - if (defined $self->start_at_operation_time) { + if (!$self->changes_received && defined $self->start_at_operation_time) { $start_op_time = $self->start_at_operation_time; } From cad51397cc66370f0d903af102a15df1c4154ff5 Mon Sep 17 00:00:00 2001 From: Robert Sedlacek Date: Thu, 21 Jun 2018 19:55:25 +0200 Subject: [PATCH 7/8] PERL-940, rework changestream resume logic --- lib/MongoDB/ChangeStream.pm | 69 ++++++++++++++++++++------------- lib/MongoDB/Op/_ChangeStream.pm | 50 +++++------------------- 2 files changed, 52 insertions(+), 67 deletions(-) diff --git a/lib/MongoDB/ChangeStream.pm b/lib/MongoDB/ChangeStream.pm index 681c06c1..8181b18d 100644 --- a/lib/MongoDB/ChangeStream.pm +++ b/lib/MongoDB/ChangeStream.pm @@ -32,7 +32,6 @@ use MongoDB::_Types qw( MongoDBCollection ArrayOfHashRef Boolish - Intish BSONTimestamp ClientSession ); @@ -50,9 +49,6 @@ has _result => ( is => 'rw', isa => InstanceOf['MongoDB::QueryResult'], init_arg => undef, - lazy => 1, - builder => '_build_result', - clearer => '_clear_result', ); has _client => ( @@ -83,10 +79,10 @@ has _full_document => ( predicate => '_has_full_document', ); -has _resume_token => ( - is => 'rw', +has _resume_after => ( + is => 'ro', init_arg => 'resume_after', - predicate => '_has_resume_token', + predicate => '_has_resume_after', ); has _all_changes_for_cluster => ( @@ -96,13 +92,6 @@ has _all_changes_for_cluster => ( default => sub { 0 }, ); -has _changes_received => ( - is => 'rw', - isa => Boolish, - init_arg => 'changes_received', - default => sub { 0 }, -); - has _start_at_operation_time => ( is => 'ro', isa => BSONTimestamp, @@ -133,39 +122,66 @@ has _max_await_time_ms => ( predicate => '_has_max_await_time_ms', ); +has _last_operation_time => ( + is => 'rw', + init_arg => undef, + predicate => '_has_last_operation_time', +); + +has _last_resume_token => ( + is => 'rw', + init_arg => undef, + predicate => '_has_last_resume_token', +); + sub BUILD { my ($self) = @_; # starting point is construction time instead of first next call - $self->_result; + $self->_execute_query; } -sub _build_result { +sub _execute_query { my ($self) = @_; + my $resume_opt = {}; + + # seen prior results, continuing after last resume token + if ($self->_has_last_resume_token) { + $resume_opt->{resume_after} = $self->_last_resume_token; + } + # no results yet, but we have operation time from prior query + elsif ($self->_has_last_operation_time) { + $resume_opt->{start_at_operation_time} = $self->_last_operation_time; + } + # no results and no prior operation time, send specified options + else { + $resume_opt->{start_at_operation_time} = $self->_start_at_operation_time + if $self->_has_start_at_operation_time; + $resume_opt->{resume_after} = $self->_resume_after + if $self->_has_resume_after; + } + my $op = MongoDB::Op::_ChangeStream->new( pipeline => $self->_pipeline, all_changes_for_cluster => $self->_all_changes_for_cluster, - changes_received => $self->_changes_received, session => $self->_session, options => $self->_options, client => $self->_client, - $self->_has_start_at_operation_time - ? (start_at_operation_time => $self->_start_at_operation_time) - : (), $self->_has_full_document ? (full_document => $self->_full_document) : (), - $self->_has_resume_token - ? (resume_after => $self->_resume_token) - : (), $self->_has_max_await_time_ms ? (maxAwaitTimeMS => $self->_max_await_time_ms) : (), + %$resume_opt, %{ $self->_op_args }, ); - return $self->_client->send_read_op($op); + my $res = $self->_client->send_read_op($op); + $self->_result($res->{result}); + $self->_last_operation_time($res->{operationTime}) + if exists $res->{operationTime}; } =head1 STREAM METHODS @@ -203,7 +219,7 @@ sub next { and $error->_is_resumable ) { $retried = 1; - $self->_result($self->_build_result); + $self->_execute_query; } else { die $error; @@ -219,8 +235,7 @@ sub next { } if (exists $change->{_id}) { - $self->_resume_token($change->{_id}); - $self->_changes_received(1); + $self->_last_resume_token($change->{_id}); return $change; } else { diff --git a/lib/MongoDB/Op/_ChangeStream.pm b/lib/MongoDB/Op/_ChangeStream.pm index a40faf18..dfb8ea44 100644 --- a/lib/MongoDB/Op/_ChangeStream.pm +++ b/lib/MongoDB/Op/_ChangeStream.pm @@ -17,6 +17,7 @@ use warnings; package MongoDB::Op::_ChangeStream; # Encapsulate changestream operation; return MongoDB::QueryResult +# and operationTime if supported use version; our $VERSION = 'v1.999.1'; @@ -84,11 +85,7 @@ has all_changes_for_cluster => ( has start_at_operation_time => ( is => 'ro', isa => BSONTimestamp, -); - -has changes_received => ( - is => 'ro', - default => sub { 0 }, + predicate => 'has_start_at_operation_time', ); with $_ for qw( @@ -153,42 +150,10 @@ sub execute { "Calling aggregate with a collection name of '1' is not supported on Wire Version < 6" ); } - my $start_op_time; - - # preset startAtOperationTime - if (!$self->changes_received && defined $self->start_at_operation_time) { - $start_op_time = $self->start_at_operation_time; - } - - # default startAtOperationTime when supported and not - if ( - 0 && # TODO: Not in spec tests, throws when included - !defined($start_op_time) && - !$self->changes_received && - $link->supports_4_0_changestreams - ) { - if ( - defined($link->server) && - defined(my $op_time = $link->server->is_master->{operationTime}) - ) { - $start_op_time = $op_time; - } - - if ( - !defined($start_op_time) && - defined($self->session) && - defined(my $latest_cl_time = $self->session->get_latest_cluster_time) - ) { - if (defined(my $cl_time = $latest_cl_time->{clusterTime})) { - $start_op_time = $cl_time; - } - } - } - my @pipeline = ( {'$changeStream' => { - (defined($start_op_time) - ? (startAtOperationTime => $start_op_time) + ($self->has_start_at_operation_time + ? (startAtOperationTime => $self->start_at_operation_time) : () ), ($self->all_changes_for_cluster @@ -238,7 +203,12 @@ sub execute { }; } - return $self->_build_result_from_cursor($res); + return { + result => $self->_build_result_from_cursor($res), + $link->supports_4_0_changestreams + ? (operationTime => $res->output->{operationTime}) + : (), + }; } 1; From 6c1b0c5a8f638e5f1e11a9874d4e7a7b053533de Mon Sep 17 00:00:00 2001 From: Robert Sedlacek Date: Thu, 21 Jun 2018 20:07:22 +0200 Subject: [PATCH 8/8] move BSON::Timestamp constructor in coercion to use explicit seconds argument name --- lib/MongoDB/ChangeStream.pm | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/MongoDB/ChangeStream.pm b/lib/MongoDB/ChangeStream.pm index 8181b18d..04792eca 100644 --- a/lib/MongoDB/ChangeStream.pm +++ b/lib/MongoDB/ChangeStream.pm @@ -98,7 +98,7 @@ has _start_at_operation_time => ( init_arg => 'start_at_operation_time', predicate => '_has_start_at_operation_time', coerce => sub { - ref($_[0]) ? $_[0] : BSON::Timestamp->new($_[0]) + ref($_[0]) ? $_[0] : BSON::Timestamp->new(seconds => $_[0]) }, );