diff --git a/Makefile.PL b/Makefile.PL index 2e782a80..41e10ab8 100644 --- a/Makefile.PL +++ b/Makefile.PL @@ -34,6 +34,7 @@ requires 'ExtUtils::ParseXS' => 3.21; requires 'IO::File'; requires 'IO::Socket' => ( $^O eq 'MSWin32' ? '1.31' : '0' ); requires 'List::Util'; +requires 'Math::BigInt'; requires 'MIME::Base64'; requires 'Moo' => '2'; requires 'Moo::Role'; @@ -68,7 +69,6 @@ test_requires 'File::Spec'; test_requires 'File::Temp' => '0.17'; test_requires 'FileHandle'; test_requires 'JSON::MaybeXS' => '1.002005'; -test_requires 'Math::BigInt'; test_requires 'Path::Tiny' => '0.054'; test_requires 'Test::Deep' => '0.111'; test_requires 'Test::Fatal'; diff --git a/devel/config/replicaset-multi-3.6.yml b/devel/config/replicaset-multi-3.6.yml new file mode 100644 index 00000000..b23264ae --- /dev/null +++ b/devel/config/replicaset-multi-3.6.yml @@ -0,0 +1,11 @@ +--- +type: replica +setName: foo +default_args: -v --noprealloc --smallfiles --bind_ip 0.0.0.0 --nssize 6 --quiet +default_version: 3.6 +mongod: + - name: host1 + - name: host2 + - name: host3 + +# vim: ts=4 sts=4 sw=4 et: diff --git a/devel/lib/MongoDBTest/Role/Server.pm b/devel/lib/MongoDBTest/Role/Server.pm index 34e2065b..ceb87a1f 100644 --- a/devel/lib/MongoDBTest/Role/Server.pm +++ b/devel/lib/MongoDBTest/Role/Server.pm @@ -214,6 +214,7 @@ sub _build_tempdir { return Path::Tiny->tempdir( TEMPLATE => $self->name . "-XXXXXX", ($ENV{DATA_DIR} ? (DIR => $ENV{DATA_DIR}) : ()), + ($ENV{MONGO_PRESERVE} ? (CLEANUP => 0) : ()), ); } diff --git a/devel/t-dynamic/PERL-792-replica-set-failover.t b/devel/t-dynamic/PERL-792-replica-set-failover.t new file mode 100644 index 00000000..f8a1213e --- /dev/null +++ b/devel/t-dynamic/PERL-792-replica-set-failover.t @@ -0,0 +1,156 @@ +# +# Copyright 2009-2013 MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Test in t-dynamic as not sure if failover should be tested on install? + +use strict; +use warnings; +use JSON::MaybeXS; +use Path::Tiny 0.054; # basename with suffix +use Test::More 0.88; +use Test::Fatal; +use boolean; + +use lib "t/lib"; +use lib "devel/lib"; + +use MongoDBTest::Orchestrator; + +use MongoDBTest qw/ + build_client + get_test_db + clear_testdbs + get_unique_collection + server_version + server_type + check_min_server_version + get_feature_compat_version +/; + +my $orc = +MongoDBTest::Orchestrator->new( + config_file => "devel/config/replicaset-multi-3.6.yml" ); +$orc->start; + +$ENV{MONGOD} = $orc->as_uri; + +my @events; + +sub clear_events { @events = () } +sub event_count { scalar @events } +sub event_cb { push @events, $_[0] } + +my $conn = build_client( + retry_writes => 1, + heartbeat_frequency_ms => 60 * 1000, + # build client modifies this so we set it explicitly to the default + server_selection_timeout_ms => 30 * 1000, + server_selection_try_once => 0, + monitoring_callback => \&event_cb, +); +my $testdb = get_test_db($conn); +my $coll = get_unique_collection( $testdb, 'retry_failover' ); +my $server_version = server_version($conn); +my $server_type = server_type($conn); +my $feat_compat_ver = get_feature_compat_version($conn); + +plan skip_all => "standalone servers dont support retryableWrites" + if $server_type eq 'Standalone'; + +plan skip_all => "retryableWrites requires featureCompatibilityVersion 3.6 - got $feat_compat_ver" + if ( $feat_compat_ver < 3.6 ); + +my $primary = $conn->_topology->current_primary; + +my $fail_conn = build_client( host => $primary->address ); + +my $step_down_conn = build_client(); + +my $ret = $coll->insert_one( { _id => 1, test => 'value' } ); + +is $ret->inserted_id, 1, 'write succeeded'; + +my $result = $coll->find_one( { _id => 1 } ); + +is $result->{test}, 'value', 'Successful write'; + +$fail_conn->send_admin_command([ + configureFailPoint => 'onPrimaryTransactionalWrite', + mode => 'alwaysOn', +]); + +# wrapped in eval as this will just drop connection +eval { + $step_down_conn->send_admin_command([ + replSetStepDown => 60, + force => true, + ]); +}; +my $err = $@; +isa_ok( $err, 'MongoDB::NetworkError', 'Step down successfully errored' ); + +clear_events(); + +my $post_stepdown_ret = $coll->insert_one( { _id => 2, test => 'again' } ); + +is $post_stepdown_ret->inserted_id, 2, 'write succeeded'; + +# All this is to make sure we dont make assumptions on position of the actual +# event, just that the failed one comes first. +my $first_insert_index; + +for my $f_idx ( 0 .. $#events - 1 ) { + my $event = $events[ $f_idx ]; + if ( $event->{ commandName } eq 'insert' + && $event->{ type } eq 'command_started' ) { + my $next_event = $events[ $f_idx + 1 ]; + is $next_event->{ commandName }, 'insert', 'found insert reply'; + is $next_event->{ type }, 'command_failed', 'found failed reply'; + $first_insert_index = $f_idx; + last; + } +} + +ok defined( $first_insert_index ), 'found first command'; + +my $second_insert_index; + +if ( $first_insert_index + 2 > $#events - 1 ) { + fail 'not enough events captured'; +} + +for my $s_idx ( $first_insert_index + 2 .. $#events - 1 ) { + my $event = $events[ $s_idx ]; + if ( $event->{ commandName } eq 'insert' + && $event->{ type } eq 'command_started' ) { + my $next_event = $events[ $s_idx + 1 ]; + is $next_event->{ commandName }, 'insert', 'found insert reply'; + is $next_event->{ type }, 'command_succeeded', 'found success reply'; + $second_insert_index = $s_idx; + last; + } +} + +ok defined( $second_insert_index ), 'found second command'; + +$fail_conn->send_admin_command([ + configureFailPoint => 'onPrimaryTransactionalWrite', + mode => 'off', +]); + +clear_testdbs; + +done_testing; diff --git a/lib/MongoDB/BulkWrite.pm b/lib/MongoDB/BulkWrite.pm index c8b34471..69324f18 100644 --- a/lib/MongoDB/BulkWrite.pm +++ b/lib/MongoDB/BulkWrite.pm @@ -121,6 +121,12 @@ sub _build__client { return $self->_database->_client; } +has _retryable => ( + is => 'rw', + isa => Bool, + default => 1, +); + with $_ for qw( MongoDB::Role::_DeprecationWarner ); @@ -268,6 +274,7 @@ sub execute { my $session = $self->collection->_get_session_from_hashref( $options ); my $op = MongoDB::Op::_BulkWrite->_new( + client => $self->_client, db_name => $self->_database->name, coll_name => $self->collection->name, full_name => $self->collection->full_name, @@ -278,8 +285,10 @@ sub execute { write_concern => $write_concern, session => $session, monitoring_callback => $self->_client->monitoring_callback, + _retryable => $self->_retryable, ); + # Op::_BulkWrite internally does retryable writes return $self->_client->send_write_op( $op ); } diff --git a/lib/MongoDB/BulkWriteView.pm b/lib/MongoDB/BulkWriteView.pm index fa9dfc69..96474df9 100644 --- a/lib/MongoDB/BulkWriteView.pm +++ b/lib/MongoDB/BulkWriteView.pm @@ -126,6 +126,9 @@ sub _update { $doc = Tie::IxHash->new(%$doc); } + $self->_bulk->_retryable( 0 ) if $method eq 'update_many'; + $self->_bulk->_retryable( 0 ) if $method eq 'delete_many'; + my $update = { q => $self->_query, u => $doc, diff --git a/lib/MongoDB/Collection.pm b/lib/MongoDB/Collection.pm index 95361c9e..c74d8a22 100644 --- a/lib/MongoDB/Collection.pm +++ b/lib/MongoDB/Collection.pm @@ -372,7 +372,7 @@ sub insert_one { MongoDB::UsageError->throw("document argument must be a reference") unless ref( $_[1] ); - return $_[0]->client->send_write_op( + return $_[0]->client->send_retryable_write_op( MongoDB::Op::_InsertOne->_new( session => $_[0]->_get_session_from_hashref( $_[2] ), ( defined $_[2] ? (%{$_[2]}) : () ), @@ -422,6 +422,7 @@ sub insert_many { MongoDB::UsageError->throw("documents argument must be an array reference") unless ref( $_[1] ) eq 'ARRAY'; + # internally ends up performing a retryable write if possible, see OP::_BulkWrite my $res = $_[0]->client->send_write_op( MongoDB::Op::_BulkWrite->_new( # default @@ -431,6 +432,8 @@ sub insert_many { ( defined $_[2] ? ( %{ $_[2] } ) : () ), # un-overridable queue => [ map { [ insert => $_ ] } @{ $_[1] } ], + # insert_many is specifically retryable (PERL-792) + _retryable => 1, %{ $_[0]->_op_args }, ) ); @@ -475,7 +478,7 @@ sub delete_one { MongoDB::UsageError->throw("filter argument must be a reference") unless ref( $_[1] ); - return $_[0]->client->send_write_op( + return $_[0]->client->send_retryable_write_op( MongoDB::Op::_Delete->_new( session => $_[0]->_get_session_from_hashref( $_[2] ), ( defined $_[2] ? (%{$_[2]}) : () ), @@ -557,7 +560,7 @@ sub replace_one { MongoDB::UsageError->throw("filter and replace arguments must be references") unless ref( $_[1] ) && ref( $_[2] ); - return $_[0]->client->send_write_op( + return $_[0]->client->send_retryable_write_op( MongoDB::Op::_Update->_new( session => $_[0]->_get_session_from_hashref( $_[3] ), ( defined $_[3] ? (%{$_[3]}) : () ), @@ -608,7 +611,7 @@ sub update_one { MongoDB::UsageError->throw("filter and update arguments must be references") unless ref( $_[1] ) && ref( $_[2] ); - return $_[0]->client->send_write_op( + return $_[0]->client->send_retryable_write_op( MongoDB::Op::_Update->_new( session => $_[0]->_get_session_from_hashref( $_[3] ), ( defined $_[3] ? (%{$_[3]}) : () ), @@ -940,7 +943,9 @@ sub find_one_and_delete { session => $session, ); - return $self->client->send_write_op($op); + return $self->write_concern->is_acknowledged + ? $self->client->send_retryable_write_op( $op ) + : $self->client->send_write_op( $op ); } =method find_one_and_replace @@ -1647,6 +1652,7 @@ sub bulk_write { } elsif ( $method eq 'delete_many' ) { $view->delete_many; + $bulk->_retryable( 0 ); next; } @@ -1662,6 +1668,7 @@ sub bulk_write { } elsif ( $method eq 'update_many' ) { $view->update_many($update_doc); + $bulk->_retryable( 0 ); } else { MongoDB::UsageError->throw("unknown bulk operation '$method'"); @@ -1733,7 +1740,9 @@ sub _find_one_and_update_or_replace { %{ $self->_op_args }, ); - return $self->client->send_write_op($op); + return $self->write_concern->is_acknowledged + ? $self->client->send_retryable_write_op( $op ) + : $self->client->send_write_op( $op ); } # Extracts a session from a provided hashref, or returns an implicit session diff --git a/lib/MongoDB/MongoClient.pm b/lib/MongoDB/MongoClient.pm index d7c0389e..582810a4 100644 --- a/lib/MongoDB/MongoClient.pm +++ b/lib/MongoDB/MongoClient.pm @@ -956,6 +956,61 @@ sub _build_read_concern_level { ); } +=attr retry_writes + +Whether the client should use retryable writes for supported commands. The +default value is false, which means that no write commands will be retried. + +If this is set to a true value, then commands which support retryable writes +will be retried on certain errors, such as C and C errors. + +This may be set in a connection string with the C option. + +Note that this is only supported on MongoDB > 3.6 in Replica Set or Shard +Clusters, and will be ignored on other deployments. + +Unacknowledged write operations also do not support retryable writes, even when +retry_writes has been enabled. + +The supported single statement write operations are currently as follows: + +=for :list +* C +* C +* C +* C +* C +* C +* C + +The supported multi statement write operations are as follows: + +=for :list +* C +* C + +The multi statment operations may be ether ordered or unordered. Note that for +C operations, the request may not include update_many or +delete_many operations. + +=cut + +has retry_writes => ( + is => 'lazy', + isa => Bool, + builder => '_build_retry_writes', +); + +sub _build_retry_writes { + my ( $self ) = @_; + return $self->__uri_or_else( + u => 'retrywrites', + e => 'retry_writes', + d => 0, + ); +} + #--------------------------------------------------------------------------# # deprecated public attributes #--------------------------------------------------------------------------# @@ -1294,6 +1349,7 @@ my @deferred_options = qw( read_pref_mode read_pref_tag_sets replica_set_name + retry_writes server_selection_timeout_ms server_selection_try_once socket_check_interval_ms @@ -1602,6 +1658,78 @@ BEGIN { *send_primary_op = \&send_write_op; } +sub send_retryable_write_op { + my ( $self, $op ) = @_; + + return $self->send_write_op( $op ) unless $self->retry_writes; + + my $result; + my $link = $self->{_topology}->get_writable_link; + + # If server doesnt support retryable writes, pretend its not enabled + unless ( $link->supports_retryWrites ) { + eval { ($result) = $self->_try_write_op_for_link( $link, $op ); 1 } or do { + my $err = length($@) ? $@ : "caught error, but it was lost in eval unwind"; + WITH_ASSERTS ? ( confess $err ) : ( die $err ); + }; + return $result; + } + + # If we get this far and there is no session, then somethings gone really + # wrong, so probably not worth worrying about. + # + # increment transaction id before write, but otherwise is the same for both attempts + $op->session->_server_session->_increment_transaction_id; + $op->retryable_write( 1 ); + + # attempt the op the first time + eval { ($result) = $self->_try_write_op_for_link( $link, $op ); 1 } or do { + my $err = length($@) ? $@ : "caught error, but it was lost in eval unwind"; + my $retry_link = $self->{_topology}->get_writable_link; + + # Rare chance that the new link is not retryable + unless ( $retry_link->supports_retryWrites ) { + WITH_ASSERTS ? ( confess $err ) : ( die $err ); + } + + # Second attempt + eval { ($result) = $self->_try_write_op_for_link( $retry_link, $op ); 1 } or do { + my $retry_err = length($@) ? $@ : "caught error, but it was lost in eval unwind"; + # Only network or not_master errors should propogate on second attempt + if ( $retry_err->$_isa("MongoDB::ConnectionError") + || $retry_err->$_isa("MongoDB::NotMasterError") ) { + WITH_ASSERTS ? ( confess $retry_err ) : ( die $retry_err ); + } + # die with original error otherwise + WITH_ASSERTS ? ( confess $err ) : ( die $err ); + }; + }; + # just in case this gets reused for some reason + $op->retryable_write( 0 ); + return $result; +} + +# op dispatcher written in highly optimized style +sub _try_write_op_for_link { + my ( $self, $link, $op ) = @_; + my $result; + ( + eval { ($result) = $op->execute($link, $self->{_topology}->type); 1 } or do { + my $err = length($@) ? $@ : "caught error, but it was lost in eval unwind"; + if ( $err->$_isa("MongoDB::ConnectionError") ) { + $self->{_topology}->mark_server_unknown( $link->server, $err ); + } + elsif ( $err->$_isa("MongoDB::NotMasterError") ) { + $self->{_topology}->mark_server_unknown( $link->server, $err ); + $self->{_topology}->mark_stale; + } + # normal die here instead of assert, which is used later + die $err; + } + ), + return $result; +} + # op dispatcher written in highly optimized style sub send_read_op { my ( $self, $op ) = @_; diff --git a/lib/MongoDB/Op/_BatchInsert.pm b/lib/MongoDB/Op/_BatchInsert.pm index 90200b52..5b71079f 100644 --- a/lib/MongoDB/Op/_BatchInsert.pm +++ b/lib/MongoDB/Op/_BatchInsert.pm @@ -79,7 +79,7 @@ sub execute { my $last_idx = $#$documents; for ( my $i = 0; $i <= $last_idx; $i++ ) { - push @insert_docs, $self->_pre_encode_insert( $link, $documents->[$i], $invalid_chars ); + push @insert_docs, $self->_pre_encode_insert( $link->max_bson_object_size, $documents->[$i], $invalid_chars ); push @ids, $insert_docs[-1]{metadata}{_id}; } diff --git a/lib/MongoDB/Op/_BulkWrite.pm b/lib/MongoDB/Op/_BulkWrite.pm index 0ba773ce..45be6f2d 100644 --- a/lib/MongoDB/Op/_BulkWrite.pm +++ b/lib/MongoDB/Op/_BulkWrite.pm @@ -38,6 +38,7 @@ use MongoDB::_Constants; use Types::Standard qw( ArrayRef Bool + InstanceOf ); use Safe::Isa; use Try::Tiny; @@ -57,6 +58,18 @@ has ordered => ( isa => Bool, ); +has client => ( + is => 'ro', + required => 1, + isa => InstanceOf['MongoDB::MongoClient'], +); + +has _retryable => ( + is => 'rw', + isa => Bool, + default => 1, +); + with $_ for qw( MongoDB::Role::_PrivateConstructor MongoDB::Role::_CollectionOp @@ -66,6 +79,11 @@ with $_ for qw( MongoDB::Role::_BypassValidation ); +sub _is_retryable { + my $self = shift; + return $self->write_concern->is_acknowledged && $self->_retryable; +} + sub has_collation { my $self = shift; return !!grep { @@ -160,9 +178,11 @@ sub _execute_write_command_batch { my @left_to_send = ($docs); + my $max_bson_size = $link->max_bson_object_size; + my $supports_doc_validation = $link->supports_doc_validation; + while (@left_to_send) { my $chunk = shift @left_to_send; - # for update/insert, pre-encode docs as they need custom BSON handling # that can't be applied to an entire write command at once if ( $cmd eq 'update' ) { @@ -173,7 +193,7 @@ sub _execute_write_command_batch { for ( my $i = 0; $i <= $#$chunk; $i++ ) { next if ref( $chunk->[$i]{u} ) eq 'MongoDB::BSON::_EncodedDoc'; my $is_replace = delete $chunk->[$i]{is_replace}; - $chunk->[$i]{u} = $self->_pre_encode_update( $link->max_bson_object_size, $chunk->[$i]{u}, $is_replace ); + $chunk->[$i]{u} = $self->_pre_encode_update( $max_bson_size, $chunk->[$i]{u}, $is_replace ); } } elsif ( $cmd eq 'insert' ) { @@ -182,7 +202,7 @@ sub _execute_write_command_batch { # split, check if the doc is already encoded for ( my $i = 0; $i <= $#$chunk; $i++ ) { unless ( ref( $chunk->[$i] ) eq 'MongoDB::BSON::_EncodedDoc' ) { - $chunk->[$i] = $self->_pre_encode_insert( $link, $chunk->[$i], '.' ); + $chunk->[$i] = $self->_pre_encode_insert( $max_bson_size, $chunk->[$i], '.' ); }; } } @@ -195,7 +215,7 @@ sub _execute_write_command_batch { ]; if ( $cmd eq 'insert' || $cmd eq 'update' ) { - (undef, $cmd_doc) = $self->_maybe_bypass($link, $cmd_doc); + $cmd_doc = $self->_maybe_bypass( $supports_doc_validation, $cmd_doc ); } my $op = MongoDB::Op::_Command->_new( @@ -204,13 +224,17 @@ sub _execute_write_command_batch { query_flags => {}, bson_codec => $self->bson_codec, session => $self->session, + retryable_write => $self->retryable_write, monitoring_callback => $self->monitoring_callback, ); my $cmd_result = try { - $op->execute($link) + $self->_is_retryable + ? $self->client->send_retryable_write_op( $op ) + : $self->client->send_write_op( $op ); } catch { + # This error never touches the database!.... so is before any retryable writes errors etc. if ( $_->$_isa("MongoDB::_CommandSizeError") ) { if ( @$chunk == 1 ) { MongoDB::DocumentError->throw( @@ -219,7 +243,7 @@ sub _execute_write_command_batch { ); } else { - unshift @left_to_send, $self->_split_chunk( $link, $chunk, $_->size ); + unshift @left_to_send, $self->_split_chunk( $chunk, $_->size ); } } else { @@ -252,7 +276,7 @@ sub _execute_write_command_batch { } sub _split_chunk { - my ( $self, $link, $chunk, $size ) = @_; + my ( $self, $chunk, $size ) = @_; my $avg_cmd_size = $size / @$chunk; my $new_cmds_per_chunk = int( MAX_BSON_WIRE_SIZE / $avg_cmd_size ); diff --git a/lib/MongoDB/Op/_FindAndDelete.pm b/lib/MongoDB/Op/_FindAndDelete.pm index 1f618031..e89d92ba 100644 --- a/lib/MongoDB/Op/_FindAndDelete.pm +++ b/lib/MongoDB/Op/_FindAndDelete.pm @@ -77,6 +77,7 @@ sub execute { query_flags => {}, bson_codec => $self->bson_codec, session => $self->session, + retryable_write => $self->retryable_write, monitoring_callback => $self->monitoring_callback, ); diff --git a/lib/MongoDB/Op/_FindAndUpdate.pm b/lib/MongoDB/Op/_FindAndUpdate.pm index bb0ce33f..f3f66f81 100644 --- a/lib/MongoDB/Op/_FindAndUpdate.pm +++ b/lib/MongoDB/Op/_FindAndUpdate.pm @@ -67,8 +67,8 @@ sub execute { "MongoDB host '" . $link->address . "' doesn't support collation" ); } - my ( undef, $command ) = $self->_maybe_bypass( - $link, + my $command = $self->_maybe_bypass( + $link->supports_doc_validation, [ findAndModify => $self->coll_name, query => $self->filter, @@ -88,6 +88,7 @@ sub execute { query_flags => {}, bson_codec => $self->bson_codec, session => $self->session, + retryable_write => $self->retryable_write, monitoring_callback => $self->monitoring_callback, ); diff --git a/lib/MongoDB/Op/_InsertOne.pm b/lib/MongoDB/Op/_InsertOne.pm index 9230c23f..01a4d579 100644 --- a/lib/MongoDB/Op/_InsertOne.pm +++ b/lib/MongoDB/Op/_InsertOne.pm @@ -56,35 +56,32 @@ sub execute { my ( $self, $link ) = @_; my ( $orig_doc, $insert_doc ) = ( $self->document ); - ( $insert_doc = $self->_pre_encode_insert( $link, $orig_doc, '.' ) ), + ( $insert_doc = $self->_pre_encode_insert( $link->max_bson_object_size, $orig_doc, '.' ) ), ( $self->_set_doc_id( $insert_doc->{metadata}{_id} ) ); - return ! $self->write_concern->is_acknowledged - ? ( - $self->_send_legacy_op_noreply( $link, - MongoDB::_Protocol::write_insert( $self->full_name, $insert_doc->{bson} ), - $insert_doc, "MongoDB::UnacknowledgedResult", "insert" ) - ) - : $link->does_write_commands - ? ( - $self->_send_write_command( - $self->_maybe_bypass( - $link, - [ - insert => $self->coll_name, - documents => [$insert_doc], - @{ $self->write_concern->as_args }, - ], - ), - $orig_doc, - "MongoDB::InsertOneResult", - )->assert - ) - : ( - $self->_send_legacy_op_with_gle( $link, - MongoDB::_Protocol::write_insert( $self->full_name, $insert_doc->{bson} ), - $insert_doc, "MongoDB::InsertOneResult", "insert" )->assert - ); + return $self->_send_legacy_op_noreply( $link, + MongoDB::_Protocol::write_insert( $self->full_name, $insert_doc->{bson} ), + $orig_doc, "MongoDB::UnacknowledgedResult", "insert" ) + if ! $self->write_concern->is_acknowledged; + + return $self->_send_write_command( + $link, + $self->_maybe_bypass( + $link->supports_doc_validation, + [ + insert => $self->coll_name, + documents => [$insert_doc], + @{ $self->write_concern->as_args }, + ] + ), + $orig_doc, + "MongoDB::InsertOneResult" + )->assert + if $link->does_write_commands; + + return $self->_send_legacy_op_with_gle( $link, + MongoDB::_Protocol::write_insert( $self->full_name, $insert_doc->{bson} ), + $orig_doc, "MongoDB::InsertOneResult", "insert" )->assert; } sub _parse_cmd { diff --git a/lib/MongoDB/Op/_Update.pm b/lib/MongoDB/Op/_Update.pm index 43679eb4..7ad71f02 100644 --- a/lib/MongoDB/Op/_Update.pm +++ b/lib/MongoDB/Op/_Update.pm @@ -123,56 +123,62 @@ sub execute { ( defined $self->arrayFilters ? ( arrayFilters => $self->arrayFilters ) : () ), }; - return ! $self->write_concern->is_acknowledged - ? ( - $self->_send_legacy_op_noreply( - $link, - MongoDB::_Protocol::write_update( - $self->full_name, - $self->bson_codec->encode_one( $orig_op->{q}, { invalid_chars => '' } ), - $orig_op->{u}->{bson}, - { - upsert => $orig_op->{upsert}, - multi => $orig_op->{multi}, - }, - ), - $orig_op, - "MongoDB::UpdateResult", - "update", - ) - ) - : $link->does_write_commands - ? ( - $self->_send_write_command( - $self->_maybe_bypass( - $link, - [ - update => $self->coll_name, - updates => [ $orig_op ], - @{ $self->write_concern->as_args }, + return $self->_send_legacy_op_noreply( + $link, + MongoDB::_Protocol::write_update( + $self->full_name, + $self->bson_codec->encode_one( $orig_op->{q}, { invalid_chars => '' } ), + $self->_pre_encode_update( $link->max_bson_object_size, + $orig_op->{u}, $self->is_replace )->{bson}, + { + upsert => $orig_op->{upsert}, + multi => $orig_op->{multi}, + }, + ), + $orig_op, + "MongoDB::UpdateResult", + "update", + ) if !$self->write_concern->is_acknowledged; + + return $self->_send_write_command( + $link, + $self->_maybe_bypass( + $link->supports_doc_validation, + [ + update => $self->coll_name, + updates => [ + { + %$orig_op, + u => $self->_pre_encode_update( + $link->max_bson_object_size, + $orig_op->{u}, $self->is_replace + ), + } ], - ), - $orig_op, - "MongoDB::UpdateResult" - )->assert - ) - : ( - $self->_send_legacy_op_with_gle( - $link, - MongoDB::_Protocol::write_update( - $self->full_name, - $self->bson_codec->encode_one( $orig_op->{q}, { invalid_chars => '' } ), - $orig_op->{u}->{bson}, - { - upsert => $orig_op->{upsert}, - multi => $orig_op->{multi}, - }, - ), - $orig_op, - "MongoDB::UpdateResult", - "update", - )->assert - ); + @{ $self->write_concern->as_args }, + ], + ), + $orig_op, + "MongoDB::UpdateResult" + )->assert + if $link->does_write_commands; + + return $self->_send_legacy_op_with_gle( + $link, + MongoDB::_Protocol::write_update( + $self->full_name, + $self->bson_codec->encode_one( $orig_op->{q}, { invalid_chars => '' } ), + $self->_pre_encode_update( $link->max_bson_object_size, + $orig_op->{u}, $self->is_replace )->{bson}, + { + upsert => $orig_op->{upsert}, + multi => $orig_op->{multi}, + }, + ), + $orig_op, + "MongoDB::UpdateResult", + "update", + )->assert; } sub _parse_cmd { diff --git a/lib/MongoDB/Role/_BypassValidation.pm b/lib/MongoDB/Role/_BypassValidation.pm index 30cc8384..0fc82241 100644 --- a/lib/MongoDB/Role/_BypassValidation.pm +++ b/lib/MongoDB/Role/_BypassValidation.pm @@ -38,13 +38,13 @@ has bypassDocumentValidation => ( isa => Bool ); -# args not unpacked for efficiency; args are self, link, command; -# returns (unmodified) link and command +# args not unpacked for efficiency; args are self, validation supported +# flag, original command; returns (possibly modified) command sub _maybe_bypass { push @{ $_[2] }, bypassDocumentValidation => ( $_[0]->bypassDocumentValidation ? true : false ) - if defined $_[0]->bypassDocumentValidation && $_[1]->accepts_wire_version(4); - return $_[1], $_[2]; + if $_[1] && defined $_[0]->bypassDocumentValidation; + return $_[2]; } 1; diff --git a/lib/MongoDB/Role/_DatabaseOp.pm b/lib/MongoDB/Role/_DatabaseOp.pm index 6ae95181..29a9dffd 100644 --- a/lib/MongoDB/Role/_DatabaseOp.pm +++ b/lib/MongoDB/Role/_DatabaseOp.pm @@ -33,6 +33,7 @@ use MongoDB::_Types qw( use Types::Standard qw( CodeRef Str + Bool Maybe ); @@ -64,4 +65,11 @@ has session => ( isa => Maybe[ClientSession], ); +# set during retryable writes on supported operations +has retryable_write => ( + is => 'rw', + isa => Bool, + default => 0, +); + 1; diff --git a/lib/MongoDB/Role/_InsertPreEncoder.pm b/lib/MongoDB/Role/_InsertPreEncoder.pm index ff7bff40..8471e02a 100644 --- a/lib/MongoDB/Role/_InsertPreEncoder.pm +++ b/lib/MongoDB/Role/_InsertPreEncoder.pm @@ -35,7 +35,7 @@ requires qw/bson_codec/; # takes MongoDB::_Link and ref of type Document; returns # blessed BSON encode doc and the original/generated _id sub _pre_encode_insert { - my ( $self, $link, $doc, $invalid_chars ) = @_; + my ( $self, $max_bson_size, $doc, $invalid_chars ) = @_; my $type = ref($doc); @@ -60,7 +60,7 @@ sub _pre_encode_insert { $doc, { invalid_chars => $invalid_chars, - max_length => $link->max_bson_object_size, + max_length => $max_bson_size, first_key => '_id', first_value => $id, } diff --git a/lib/MongoDB/Role/_SessionSupport.pm b/lib/MongoDB/Role/_SessionSupport.pm index 0c9435c8..30c1b1ac 100644 --- a/lib/MongoDB/Role/_SessionSupport.pm +++ b/lib/MongoDB/Role/_SessionSupport.pm @@ -25,7 +25,7 @@ use MongoDB::_Types -types, 'to_IxHash'; use Safe::Isa; use namespace::clean; -requires qw/ session /; +requires qw/ session retryable_write /; sub _apply_session_and_cluster_time { my ( $self, $link, $query_ref ) = @_; @@ -42,6 +42,10 @@ sub _apply_session_and_cluster_time { $$query_ref = to_IxHash( $$query_ref ); ($$query_ref)->Push( 'lsid' => $self->session->session_id ); + if ( $self->retryable_write ) { + ($$query_ref)->Push( 'txnNumber' => $self->session->_server_session->transaction_id ); + } + $self->session->_server_session->update_last_use; my $cluster_time = $self->session->get_latest_cluster_time; diff --git a/lib/MongoDB/_Link.pm b/lib/MongoDB/_Link.pm index 0da85997..5f970847 100644 --- a/lib/MongoDB/_Link.pm +++ b/lib/MongoDB/_Link.pm @@ -122,6 +122,13 @@ has does_write_commands => ( isa => Bool, ); +# wire version 4 +has supports_doc_validation => ( + is => 'rwp', + init_arg => undef, + isa => Bool, +); + # for caching wire version >= 5 has supports_collation => ( is => 'rwp', @@ -135,6 +142,12 @@ has supports_arrayFilters => ( isa => Bool, ); +has supports_retryWrites => ( + is => 'rwp', + init_arg => undef, + isa => Bool, +); + my @connection_state_fields = qw( fh connected rcvbuf last_used fdset is_ssl ); @@ -240,8 +253,16 @@ sub set_metadata { || 2 * $self->max_bson_object_size ); $self->_set_does_write_commands( $self->accepts_wire_version(2) ); + $self->_set_supports_doc_validation( $self->accepts_wire_version(4) ); $self->_set_supports_collation( $self->accepts_wire_version(5) ); $self->_set_supports_arrayFilters( $self->accepts_wire_version(6) ); + $self->_set_supports_retryWrites( + $self->accepts_wire_version(6) + && defined( $server->logical_session_timeout_minutes ) + && ( $server->type ne 'Standalone' ) + ? 1 + : 0 + ); return; } diff --git a/lib/MongoDB/_Server.pm b/lib/MongoDB/_Server.pm index 851a3f03..9d99b39f 100644 --- a/lib/MongoDB/_Server.pm +++ b/lib/MongoDB/_Server.pm @@ -138,7 +138,7 @@ for my $s (qw/hosts passives arbiters/) { builder => "_build_$s", ); - no strict 'refs'; + no strict 'refs'; ## no critic *{"_build_$s"} = sub { [ map { lc $_ } ( @{ $_[0]->is_master->{$s} || [] } ) ]; }; diff --git a/lib/MongoDB/_ServerSession.pm b/lib/MongoDB/_ServerSession.pm index 097ea95d..e66d2a6b 100644 --- a/lib/MongoDB/_ServerSession.pm +++ b/lib/MongoDB/_ServerSession.pm @@ -24,6 +24,7 @@ use MongoDB::Error; use Moo; use UUID::URandom; +use Math::BigInt; use MongoDB::BSON::Binary; use MongoDB::_Types qw( Document @@ -35,7 +36,7 @@ use Types::Standard qw( ); use namespace::clean -except => 'meta'; -=method session_id +=attr session_id $server_session->session_id; @@ -61,9 +62,9 @@ sub _build_session_id { return { id => $uuid }; } -=method last_use +=attr last_use - $sever_session->last_use; + $server_session->last_use; Returns the unix time that this server session was last used. Used for checking expiry of a server session. If undefined, then the session has (probably) not @@ -77,6 +78,22 @@ has last_use => ( isa => Maybe[Int], ); +=attr transaction_id + + $server_session->transaction_id + +Returns the current transaction id for this server session. This is a ratcheted +incrementing ID number, which when combined with the session id allows for +retrying transactions in the correct order. + +=cut + +has transaction_id => ( + is => 'rwp', + init_arg => undef, + default => sub { Math::BigInt->new('0') }, +); + =method update_last_use $server_session->update_last_use; @@ -104,6 +121,11 @@ sub _is_expiring { return; } +sub _increment_transaction_id { + my $self = shift; + $self->transaction_id->binc(); +} + 1; __END__ diff --git a/lib/MongoDB/_Topology.pm b/lib/MongoDB/_Topology.pm index 6c9e5cae..95f5cebe 100644 --- a/lib/MongoDB/_Topology.pm +++ b/lib/MongoDB/_Topology.pm @@ -620,6 +620,22 @@ sub _supports_sessions { return; } +# Used for bulkWrite for shortcutting to original execute command +sub _supports_retry_writes { + my ( $self ) = @_; + + # retryWrites arent supported in standalone servers + return if $self->type eq 'Single'; + + # retryWrites require a wire version of at least six + return if $self->wire_version_ceil < 6; + + # must have lstm present + return 1 if defined $self->logical_session_timeout_minutes; + return; +} + + sub _check_staleness_compatibility { my ($self, $read_pref) = @_; my $max_staleness_sec = $read_pref ? $read_pref->max_staleness_seconds : -1; diff --git a/lib/MongoDB/_URI.pm b/lib/MongoDB/_URI.pm index d3087411..63ff0af6 100644 --- a/lib/MongoDB/_URI.pm +++ b/lib/MongoDB/_URI.pm @@ -109,6 +109,7 @@ sub _build_valid_options { readPreference readPreferenceTags replicaSet + retryWrites serverSelectionTimeoutMS serverSelectionTryOnce socketCheckIntervalMS diff --git a/t/data/retryable-writes/README.rst b/t/data/retryable-writes/README.rst new file mode 100644 index 00000000..b9f1008c --- /dev/null +++ b/t/data/retryable-writes/README.rst @@ -0,0 +1,314 @@ +===================== +Retryable Write Tests +===================== + +.. contents:: + +---- + +Introduction +============ + +The YAML and JSON files in this directory tree are platform-independent tests +that drivers can use to prove their conformance to the Retryable Writes spec. + +Several prose tests, which are not easily expressed in YAML, are also presented +in this file. Those tests will need to be manually implemented by each driver. + +Tests will require a MongoClient with ``retryWrites`` enabled. Integration tests +will require a running MongoDB cluster with server versions 3.6.0 or later. The +``{setFeatureCompatibilityVersion: 3.6}`` admin command will also need to have +been executed to enable support for retryable writes on the cluster. + +Server Fail Point +================= + +The tests depend on a server fail point, ``onPrimaryTransactionalWrite``, which +allows us to force a network error before the server would return a write result +to the client. The fail point also allows control whether the server will +successfully commit the write via its ``failBeforeCommitExceptionCode`` option. +Keep in mind that the fail point only triggers for transaction writes (i.e. write +commands including ``txnNumber`` and ``lsid`` fields). See `SERVER-29606`_ for +more information. + +.. _SERVER-29606: https://jira.mongodb.org/browse/SERVER-29606 + +The fail point may be configured like so:: + + db.runCommand({ + configureFailPoint: "onPrimaryTransactionalWrite", + mode: , + data: + }); + +``mode`` is a generic fail point option and may be assigned a string or document +value. The string values ``"alwaysOn"`` and ``"off"`` may be used to enable or +disable the fail point, respectively. A document may be used to specify either +``times`` or ``skip``, which are mutually exclusive: + +- ``{ times: }`` may be used to limit the number of times the fail + point may trigger before transitioning to ``"off"``. +- ``{ skip: }`` may be used to defer the first trigger of a fail + point, after which it will transition to ``"alwaysOn"``. + +The ``data`` option is a document that may be used to specify options that +control the fail point's behavior. As noted in `SERVER-29606`_, +``onPrimaryTransactionalWrite`` supports the following ``data`` options, which +may be combined if desired: + +- ``closeConnection``: Boolean option, which defaults to ``true``. If ``true``, + the connection on which the write is executed will be closed before a result + can be returned. +- ``failBeforeCommitExceptionCode``: Integer option, which is unset by default. + If set, the specified exception code will be thrown and the write will not be + committed. If unset, the write will be allowed to commit. + +Disabling Fail Point after Test Execution +----------------------------------------- + +After each test that configures a fail point, drivers should disable the +``onPrimaryTransactionalWrite`` fail point to avoid spurious failures in +subsequent tests. The fail point may be disabled like so:: + + db.runCommand({ + configureFailPoint: "onPrimaryTransactionalWrite", + mode: "off" + }); + +Network Error Tests +=================== + +Network error tests are expressed in YAML and should be run against a replica +set. These tests cannot be run against a shard cluster because mongos does not +support the necessary fail point. + +The tests exercise the following scenarios: + +- Single-statement write operations + + - Each test expecting a write result will encounter at-most one network error + for the write command. Retry attempts should return without error and allow + operation to succeed. Observation of the collection state will assert that + the write occurred at-most once. + + - Each test expecting an error will encounter successive network errors for + the write command. Observation of the collection state will assert that the + write was never committed on the server. + +- Multi-statement write operations + + - Each test expecting a write result will encounter at-most one network error + for some write command(s) in the batch. Retry attempts should return without + error and allow the batch to ultimately succeed. Observation of the + collection state will assert that each write occurred at-most once. + + - Each test expecting an error will encounter successive network errors for + some write command in the batch. The batch will ultimately fail with an + error, but observation of the collection state will assert that the failing + write was never committed on the server. We may observe that earlier writes + in the batch occurred at-most once. + +We cannot test a scenario where the first and second attempts both encounter +network errors but the write does actually commit during one of those attempts. +This is because (1) the fail point only triggers when a write would be committed +and (2) the skip and times options are mutually exclusive. That said, such a +test would mainly assert the server's correctness for at-most once semantics and +is not essential to assert driver correctness. + +Test Format +----------- + +Each YAML file has the following keys: + +- ``data``: The data that should exist in the collection under test before each + test run. + +- ``minServerVersion`` (optional): The minimum server version (inclusive) + required to successfully run the test. If this field is not present, it should + be assumed that there is no lower bound on the required server version. + +- ``maxServerVersion`` (optional): The maximum server version (exclusive) + against which this test can run successfully. If this field is not present, + it should be assumed that there is no upper bound on the required server + version. + +- ``tests``: An array of tests that are to be run independently of each other. + Each test will have some or all of the following fields: + + - ``description``: The name of the test. + + - ``failPoint``: Document describing options for configuring the + ``onPrimaryTransactionalWrite`` fail point on the primary server. This + document should be merged with the + ``{ configureFailPoint: "onPrimaryTransactionalWrite" }`` command document. + + - ``operation``: Document describing the operation to be executed. The + operation should be executed through a collection object derived from a + client that has been created with the ``retryWrites=true`` option. + This will have some or all of the following fields: + + - ``name``: The name of the operation as defined in the CRUD specification. + + - ``arguments``: The names and values of arguments from the CRUD + specification. + + - ``outcome``: Document describing the return value and/or expected state of + the collection after the operation is executed. This will have some or all + of the following fields: + + - ``error``: If ``true``, the test should expect an error or exception. Note + that some drivers may report server-side errors as a write error within a + write result object. + + - ``result``: The return value from the operation. This will correspond to + an operation's result object as defined in the CRUD specification. This + field may be omitted if ``error`` is ``true``. If this field is present + and ``error`` is ``true`` (generally for multi-statement tests), the + result reports information about operations that succeeded before an + unrecoverable failure. In that case, drivers may choose to check the + result object if their BulkWriteException (or equivalent) provides access + to a write result object. + + - ``collection``: + + - ``name`` (optional): The name of the collection to verify. If this isn't + present then use the collection under test. + + - ``data``: The data that should exist in the collection after the + operation has been run. + +Split Batch Tests +================= + +The YAML tests specify bulk write operations that are split by command type +(e.g. sequence of insert, update, and delete commands). Multi-statement write +operations may also be split due to ``maxWriteBatchSize``, +``maxBsonObjectSize``, or ``maxMessageSizeBytes``. + +For instance, an insertMany operation with five 10 MB documents executed using +OP_MSG payload type 0 (i.e. entire command in one document) would be split into +five insert commands in order to respect the 16 MB ``maxBsonObjectSize`` limit. +The same insertMany operation executed using OP_MSG payload type 1 (i.e. command +arguments pulled out into a separate payload vector) would be split into two +insert commands in order to respect the 48 MB ``maxMessageSizeBytes`` limit. + +Noting when a driver might split operations, the ``onPrimaryTransactionalWrite`` +fail point's ``skip`` option may be used to control when the fail point first +triggers. Once triggered, the fail point will transition to the ``alwaysOn`` +state until disabled. Driver authors should also note that the server attempts +to process all documents in a single insert command within a single commit (i.e. +one insert command with five documents may only trigger the fail point once). +This behavior is unique to insert commands (each statement in an update and +delete command is processed independently). + +If testing an insert that is split into two commands, a ``skip`` of one will +allow the fail point to trigger on the second insert command (because all +documents in the first command will be processed in the same commit). When +testing an update or delete that is split into two commands, the ``skip`` should +be set to the number of statements in the first command to allow the fail point +to trigger on the second command. + +Replica Set Failover Test +========================= + +In addition to network errors, writes should also be retried in the event of a +primary failover, which results in a "not master" command error (or similar). +The ``stepdownHangBeforePerformingPostMemberStateUpdateActions`` fail point +implemented in `d4eb562`_ for `SERVER-31355`_ may be used for this test, as it +allows a primary to keep its client connections open after a step down. This +fail point operates by hanging the step down procedure (i.e. ``replSetStepDown`` +command) until the fail point is later deactivated. + +.. _d4eb562: https://github.com/mongodb/mongo/commit/d4eb562ac63717904f24de4a22e395070687bc62 +.. _SERVER-31355: https://jira.mongodb.org/browse/SERVER-31355 + +The following test requires three MongoClient instances and will generally +require two execution contexts (async drivers may get by with a single thread). + +- The client under test will connect to the replica set and be used to execute + write operations. +- The fail point client will connect directly to the initial primary and be used + to toggle the fail point. +- The step down client will connect to the replica set and be used to step down + the primary. This client will generally require its own execution context, + since the step down will hang. + +In order to guarantee that the client under test does not detect the stepped +down primary's state change via SDAM, it must be configured with a large +`heartbeatFrequencyMS`_ value (e.g. 60 seconds). Single-threaded drivers may +also need to set `serverSelectionTryOnce`_ to ``false`` to ensure that server +selection for the retry attempt waits until a new primary is elected. + +.. _heartbeatFrequencyMS: https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#heartbeatfrequencyms +.. _serverSelectionTryOnce: https://github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.rst#serverselectiontryonce + +The test proceeds as follows: + +- Using the client under test, insert a document and observe a successful write + result. This will ensure that initial discovery takes place. +- Using the fail point client, activate the fail point by setting ``mode`` + to ``"alwaysOn"``. +- Using the step down client, step down the primary by executing the command + ``{ replSetStepDown: 60, force: true}``. This operation will hang so long as + the fail point is activated. When the fail point is later deactivated, the + step down will complete and the primary's client connections will be dropped. + At that point, any ensuing network error should be ignored. +- Using the client under test, insert a document and observe a successful write + result. The test MUST assert that the insert command fails once against the + stepped down node and is successfully retried on the newly elected primary + (after SDAM discovers the topology change). The test MAY use APM or another + means to observe both attempts. +- Using the fail point client, deactivate the fail point by setting ``mode`` + to ``"off"``. + +Command Construction Tests +========================== + +Drivers should also assert that command documents are properly constructed with +or without a transaction ID, depending on whether the write operation is +supported. `Command Monitoring`_ may be used to check for the presence of a +``txnNumber`` field in the command document. Note that command documents may +always include an ``lsid`` field per the `Driver Session`_ specification. + +.. _Command Monitoring: ../../command-monitoring/command-monitoring.rst +.. _Driver Session: ../../sessions/driver-sessions.rst + +These tests may be run against both a replica set and shard cluster. + +Drivers should test that transaction IDs are never included in commands for +unsupported write operations: + +* Write commands with unacknowledged write concerns (e.g. ``{w: 0}``) + +* Unsupported single-statement write operations + + - ``updateMany()`` + - ``deleteMany()`` + +* Unsupported multi-statement write operations + + - ``bulkWrite()`` that includes ``UpdateMany`` or ``DeleteMany`` + +* Unsupported write commands + + - ``aggregate`` with ``$out`` pipeline operator + +Drivers should test that transactions IDs are always included in commands for +supported write operations: + +* Supported single-statement write operations + + - ``insertOne()`` + - ``updateOne()`` + - ``replaceOne()`` + - ``deleteOne()`` + - ``findOneAndDelete()`` + - ``findOneAndReplace()`` + - ``findOneAndUpdate()`` + +* Supported multi-statement write operations + + - ``insertMany()`` with ``ordered=true`` + - ``insertMany()`` with ``ordered=false`` + - ``bulkWrite()`` with ``ordered=true`` (no ``UpdateMany`` or ``DeleteMany``) + - ``bulkWrite()`` with ``ordered=false`` (no ``UpdateMany`` or ``DeleteMany``) diff --git a/t/data/retryable-writes/bulkWrite.json b/t/data/retryable-writes/bulkWrite.json new file mode 100644 index 00000000..7b88ffb3 --- /dev/null +++ b/t/data/retryable-writes/bulkWrite.json @@ -0,0 +1,654 @@ +{ + "data": [ + { + "_id": 1, + "x": 11 + } + ], + "minServerVersion": "3.6", + "tests": [ + { + "description": "First command is retried", + "failPoint": { + "mode": { + "times": 1 + } + }, + "operation": { + "name": "bulkWrite", + "arguments": { + "requests": [ + { + "name": "insertOne", + "arguments": { + "document": { + "_id": 2, + "x": 22 + } + } + }, + { + "name": "updateOne", + "arguments": { + "filter": { + "_id": 2 + }, + "update": { + "$inc": { + "x": 1 + } + } + } + }, + { + "name": "deleteOne", + "arguments": { + "filter": { + "_id": 1 + } + } + } + ], + "options": { + "ordered": true + } + } + }, + "outcome": { + "result": { + "deletedCount": 1, + "insertedIds": { + "0": 2 + }, + "matchedCount": 1, + "modifiedCount": 1, + "upsertedCount": 0, + "upsertedIds": {} + }, + "collection": { + "data": [ + { + "_id": 2, + "x": 23 + } + ] + } + } + }, + { + "description": "All commands are retried", + "failPoint": { + "mode": { + "times": 7 + } + }, + "operation": { + "name": "bulkWrite", + "arguments": { + "requests": [ + { + "name": "insertOne", + "arguments": { + "document": { + "_id": 2, + "x": 22 + } + } + }, + { + "name": "updateOne", + "arguments": { + "filter": { + "_id": 2 + }, + "update": { + "$inc": { + "x": 1 + } + } + } + }, + { + "name": "insertOne", + "arguments": { + "document": { + "_id": 3, + "x": 33 + } + } + }, + { + "name": "updateOne", + "arguments": { + "filter": { + "_id": 4, + "x": 44 + }, + "update": { + "$inc": { + "x": 1 + } + }, + "upsert": true + } + }, + { + "name": "insertOne", + "arguments": { + "document": { + "_id": 5, + "x": 55 + } + } + }, + { + "name": "replaceOne", + "arguments": { + "filter": { + "_id": 3 + }, + "replacement": { + "_id": 3, + "x": 333 + } + } + }, + { + "name": "deleteOne", + "arguments": { + "filter": { + "_id": 1 + } + } + } + ], + "options": { + "ordered": true + } + } + }, + "outcome": { + "result": { + "deletedCount": 1, + "insertedIds": { + "0": 2, + "2": 3, + "4": 5 + }, + "matchedCount": 2, + "modifiedCount": 2, + "upsertedCount": 1, + "upsertedIds": { + "3": 4 + } + }, + "collection": { + "data": [ + { + "_id": 2, + "x": 23 + }, + { + "_id": 3, + "x": 333 + }, + { + "_id": 4, + "x": 45 + }, + { + "_id": 5, + "x": 55 + } + ] + } + } + }, + { + "description": "Both commands are retried after their first statement fails", + "failPoint": { + "mode": { + "times": 2 + } + }, + "operation": { + "name": "bulkWrite", + "arguments": { + "requests": [ + { + "name": "insertOne", + "arguments": { + "document": { + "_id": 2, + "x": 22 + } + } + }, + { + "name": "updateOne", + "arguments": { + "filter": { + "_id": 1 + }, + "update": { + "$inc": { + "x": 1 + } + } + } + }, + { + "name": "updateOne", + "arguments": { + "filter": { + "_id": 2 + }, + "update": { + "$inc": { + "x": 1 + } + } + } + } + ], + "options": { + "ordered": true + } + } + }, + "outcome": { + "result": { + "deletedCount": 0, + "insertedIds": { + "0": 2 + }, + "matchedCount": 2, + "modifiedCount": 2, + "upsertedCount": 0, + "upsertedIds": {} + }, + "collection": { + "data": [ + { + "_id": 1, + "x": 12 + }, + { + "_id": 2, + "x": 23 + } + ] + } + } + }, + { + "description": "Second command is retried after its second statement fails", + "failPoint": { + "mode": { + "skip": 2 + } + }, + "operation": { + "name": "bulkWrite", + "arguments": { + "requests": [ + { + "name": "insertOne", + "arguments": { + "document": { + "_id": 2, + "x": 22 + } + } + }, + { + "name": "updateOne", + "arguments": { + "filter": { + "_id": 1 + }, + "update": { + "$inc": { + "x": 1 + } + } + } + }, + { + "name": "updateOne", + "arguments": { + "filter": { + "_id": 2 + }, + "update": { + "$inc": { + "x": 1 + } + } + } + } + ], + "options": { + "ordered": true + } + } + }, + "outcome": { + "result": { + "deletedCount": 0, + "insertedIds": { + "0": 2 + }, + "matchedCount": 2, + "modifiedCount": 2, + "upsertedCount": 0, + "upsertedIds": {} + }, + "collection": { + "data": [ + { + "_id": 1, + "x": 12 + }, + { + "_id": 2, + "x": 23 + } + ] + } + } + }, + { + "description": "BulkWrite with unordered execution", + "failPoint": { + "mode": { + "times": 1 + } + }, + "operation": { + "name": "bulkWrite", + "arguments": { + "requests": [ + { + "name": "insertOne", + "arguments": { + "document": { + "_id": 2, + "x": 22 + } + } + }, + { + "name": "insertOne", + "arguments": { + "document": { + "_id": 3, + "x": 33 + } + } + } + ], + "options": { + "ordered": false + } + } + }, + "outcome": { + "result": { + "deletedCount": 0, + "insertedIds": { + "0": 2, + "1": 3 + }, + "matchedCount": 0, + "modifiedCount": 0, + "upsertedCount": 0, + "upsertedIds": {} + }, + "collection": { + "data": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 22 + }, + { + "_id": 3, + "x": 33 + } + ] + } + } + }, + { + "description": "First insertOne is never committed", + "failPoint": { + "mode": { + "times": 2 + }, + "data": { + "failBeforeCommitExceptionCode": 1 + } + }, + "operation": { + "name": "bulkWrite", + "arguments": { + "requests": [ + { + "name": "insertOne", + "arguments": { + "document": { + "_id": 2, + "x": 22 + } + } + }, + { + "name": "updateOne", + "arguments": { + "filter": { + "_id": 2 + }, + "update": { + "$inc": { + "x": 1 + } + } + } + }, + { + "name": "deleteOne", + "arguments": { + "filter": { + "_id": 1 + } + } + } + ], + "options": { + "ordered": true + } + } + }, + "outcome": { + "error": true, + "result": { + "deletedCount": 0, + "insertedIds": {}, + "matchedCount": 0, + "modifiedCount": 0, + "upsertedCount": 0, + "upsertedIds": {} + }, + "collection": { + "data": [ + { + "_id": 1, + "x": 11 + } + ] + } + } + }, + { + "description": "Second updateOne is never committed", + "failPoint": { + "mode": { + "skip": 1 + }, + "data": { + "failBeforeCommitExceptionCode": 1 + } + }, + "operation": { + "name": "bulkWrite", + "arguments": { + "requests": [ + { + "name": "insertOne", + "arguments": { + "document": { + "_id": 2, + "x": 22 + } + } + }, + { + "name": "updateOne", + "arguments": { + "filter": { + "_id": 2 + }, + "update": { + "$inc": { + "x": 1 + } + } + } + }, + { + "name": "deleteOne", + "arguments": { + "filter": { + "_id": 1 + } + } + } + ], + "options": { + "ordered": true + } + } + }, + "outcome": { + "error": true, + "result": { + "deletedCount": 0, + "insertedIds": { + "0": 2 + }, + "matchedCount": 0, + "modifiedCount": 0, + "upsertedCount": 0, + "upsertedIds": {} + }, + "collection": { + "data": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 22 + } + ] + } + } + }, + { + "description": "Third updateOne is never committed", + "failPoint": { + "mode": { + "skip": 2 + }, + "data": { + "failBeforeCommitExceptionCode": 1 + } + }, + "operation": { + "name": "bulkWrite", + "arguments": { + "requests": [ + { + "name": "updateOne", + "arguments": { + "filter": { + "_id": 1 + }, + "update": { + "$inc": { + "x": 1 + } + } + } + }, + { + "name": "insertOne", + "arguments": { + "document": { + "_id": 2, + "x": 22 + } + } + }, + { + "name": "updateOne", + "arguments": { + "filter": { + "_id": 2 + }, + "update": { + "$inc": { + "x": 1 + } + } + } + } + ], + "options": { + "ordered": true + } + } + }, + "outcome": { + "error": true, + "result": { + "deletedCount": 0, + "insertedIds": { + "0": 2 + }, + "matchedCount": 1, + "modifiedCount": 1, + "upsertedCount": 0, + "upsertedIds": {} + }, + "collection": { + "data": [ + { + "_id": 1, + "x": 12 + }, + { + "_id": 2, + "x": 22 + } + ] + } + } + } + ] +} diff --git a/t/data/retryable-writes/bulkWrite.yml b/t/data/retryable-writes/bulkWrite.yml new file mode 100644 index 00000000..167f8943 --- /dev/null +++ b/t/data/retryable-writes/bulkWrite.yml @@ -0,0 +1,305 @@ +data: + - { _id: 1, x: 11 } + +minServerVersion: '3.6' + +tests: + - + description: "First command is retried" + failPoint: + mode: { times: 1 } + operation: + name: "bulkWrite" + arguments: + requests: + - + name: "insertOne" + arguments: + document: { _id: 2, x: 22 } + - + name: "updateOne" + arguments: + filter: { _id: 2 } + update: { $inc: { x : 1 }} + - + name: "deleteOne" + arguments: + filter: { _id: 1 } + options: { ordered: true } + outcome: + result: + deletedCount: 1 + insertedIds: { 0: 2 } + matchedCount: 1 + modifiedCount: 1 + upsertedCount: 0 + upsertedIds: { } + collection: + data: + - { _id: 2, x: 23 } + - + # Write operations in this ordered batch are intentionally sequenced so + # that each write command consists of a single statement, which will + # fail on the first attempt and succeed on the second, retry attempt. + description: "All commands are retried" + failPoint: + mode: { times: 7 } + operation: + name: "bulkWrite" + arguments: + requests: + - + name: "insertOne" + arguments: + document: { _id: 2, x: 22 } + - + name: "updateOne" + arguments: + filter: { _id: 2 } + update: { $inc: { x : 1 }} + - + name: "insertOne" + arguments: + document: { _id: 3, x: 33 } + - + name: "updateOne" + arguments: + filter: { _id: 4, x: 44 } + update: { $inc: { x : 1 }} + upsert: true + - + name: "insertOne" + arguments: + document: { _id: 5, x: 55 } + - + name: "replaceOne" + arguments: + filter: { _id: 3 } + replacement: { _id: 3, x: 333 } + - + name: "deleteOne" + arguments: + filter: { _id: 1 } + options: { ordered: true } + outcome: + result: + deletedCount: 1 + insertedIds: { 0: 2, 2: 3, 4: 5 } + matchedCount: 2 + modifiedCount: 2 + upsertedCount: 1 + upsertedIds: { 3: 4 } + collection: + data: + - { _id: 2, x: 23 } + - { _id: 3, x: 333 } + - { _id: 4, x: 45 } + - { _id: 5, x: 55 } + - + description: "Both commands are retried after their first statement fails" + failPoint: + mode: { times: 2 } + operation: + name: "bulkWrite" + arguments: + requests: + - + name: "insertOne" + arguments: + document: { _id: 2, x: 22 } + - + name: "updateOne" + arguments: + filter: { _id: 1 } + update: { $inc: { x : 1 }} + - + name: "updateOne" + arguments: + filter: { _id: 2 } + update: { $inc: { x : 1 }} + options: { ordered: true } + outcome: + result: + deletedCount: 0 + insertedIds: { 0: 2 } + matchedCount: 2 + modifiedCount: 2 + upsertedCount: 0 + upsertedIds: { } + collection: + data: + - { _id: 1, x: 12 } + - { _id: 2, x: 23 } + - + description: "Second command is retried after its second statement fails" + failPoint: + mode: { skip: 2 } + operation: + name: "bulkWrite" + arguments: + requests: + - + name: "insertOne" + arguments: + document: { _id: 2, x: 22 } + - + name: "updateOne" + arguments: + filter: { _id: 1 } + update: { $inc: { x : 1 }} + - + name: "updateOne" + arguments: + filter: { _id: 2 } + update: { $inc: { x : 1 }} + options: { ordered: true } + outcome: + result: + deletedCount: 0 + insertedIds: { 0: 2 } + matchedCount: 2 + modifiedCount: 2 + upsertedCount: 0 + upsertedIds: { } + collection: + data: + - { _id: 1, x: 12 } + - { _id: 2, x: 23 } + - + description: "BulkWrite with unordered execution" + failPoint: + mode: { times: 1 } + operation: + name: "bulkWrite" + arguments: + requests: + - + name: "insertOne" + arguments: + document: { _id: 2, x: 22 } + - + name: "insertOne" + arguments: + document: { _id: 3, x: 33 } + options: { ordered: false } + outcome: + result: + deletedCount: 0 + insertedIds: { 0: 2, 1: 3 } + matchedCount: 0 + modifiedCount: 0 + upsertedCount: 0 + upsertedIds: { } + collection: + data: + - { _id: 1, x: 11 } + - { _id: 2, x: 22 } + - { _id: 3, x: 33 } + - + description: "First insertOne is never committed" + failPoint: + mode: { times: 2 } + data: { failBeforeCommitExceptionCode: 1 } + operation: + name: "bulkWrite" + arguments: + requests: + - + name: "insertOne" + arguments: + document: { _id: 2, x: 22 } + - + name: "updateOne" + arguments: + filter: { _id: 2 } + update: { $inc: { x : 1 }} + - + name: "deleteOne" + arguments: + filter: { _id: 1 } + options: { ordered: true } + outcome: + error: true + result: + deletedCount: 0 + insertedIds: { } + matchedCount: 0 + modifiedCount: 0 + upsertedCount: 0 + upsertedIds: { } + collection: + data: + - { _id: 1, x: 11 } + - + description: "Second updateOne is never committed" + failPoint: + mode: { skip: 1 } + data: { failBeforeCommitExceptionCode: 1 } + operation: + name: "bulkWrite" + arguments: + requests: + - + name: "insertOne" + arguments: + document: { _id: 2, x: 22 } + - + name: "updateOne" + arguments: + filter: { _id: 2 } + update: { $inc: { x : 1 }} + - + name: "deleteOne" + arguments: + filter: { _id: 1 } + options: { ordered: true } + outcome: + error: true + result: + deletedCount: 0 + insertedIds: { 0: 2 } + matchedCount: 0 + modifiedCount: 0 + upsertedCount: 0 + upsertedIds: { } + collection: + data: + - { _id: 1, x: 11 } + - { _id: 2, x: 22 } + - + description: "Third updateOne is never committed" + failPoint: + mode: { skip: 2 } + data: { failBeforeCommitExceptionCode: 1 } + operation: + name: "bulkWrite" + arguments: + requests: + - + name: "updateOne" + arguments: + filter: { _id: 1 } + update: { $inc: { x : 1 }} + - + name: "insertOne" + arguments: + document: { _id: 2, x: 22 } + - + name: "updateOne" + arguments: + filter: { _id: 2 } + update: { $inc: { x : 1 }} + options: { ordered: true } + outcome: + error: true + result: + deletedCount: 0 + insertedIds: { 0: 2 } + matchedCount: 1 + modifiedCount: 1 + upsertedCount: 0 + upsertedIds: { } + collection: + data: + - { _id: 1, x: 12 } + - { _id: 2, x: 22 } diff --git a/t/data/retryable-writes/deleteOne.json b/t/data/retryable-writes/deleteOne.json new file mode 100644 index 00000000..a552b893 --- /dev/null +++ b/t/data/retryable-writes/deleteOne.json @@ -0,0 +1,110 @@ +{ + "data": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 22 + } + ], + "minServerVersion": "3.6", + "tests": [ + { + "description": "DeleteOne is committed on first attempt", + "failPoint": { + "mode": { + "times": 1 + } + }, + "operation": { + "name": "deleteOne", + "arguments": { + "filter": { + "_id": 1 + } + } + }, + "outcome": { + "result": { + "deletedCount": 1 + }, + "collection": { + "data": [ + { + "_id": 2, + "x": 22 + } + ] + } + } + }, + { + "description": "DeleteOne is not committed on first attempt", + "failPoint": { + "mode": { + "times": 1 + }, + "data": { + "failBeforeCommitExceptionCode": 1 + } + }, + "operation": { + "name": "deleteOne", + "arguments": { + "filter": { + "_id": 1 + } + } + }, + "outcome": { + "result": { + "deletedCount": 1 + }, + "collection": { + "data": [ + { + "_id": 2, + "x": 22 + } + ] + } + } + }, + { + "description": "DeleteOne is never committed", + "failPoint": { + "mode": { + "times": 2 + }, + "data": { + "failBeforeCommitExceptionCode": 1 + } + }, + "operation": { + "name": "deleteOne", + "arguments": { + "filter": { + "_id": 1 + } + } + }, + "outcome": { + "error": true, + "collection": { + "data": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 22 + } + ] + } + } + } + ] +} diff --git a/t/data/retryable-writes/deleteOne.yml b/t/data/retryable-writes/deleteOne.yml new file mode 100644 index 00000000..20d61966 --- /dev/null +++ b/t/data/retryable-writes/deleteOne.yml @@ -0,0 +1,51 @@ +data: + - { _id: 1, x: 11 } + - { _id: 2, x: 22 } + +minServerVersion: '3.6' + +tests: + - + description: "DeleteOne is committed on first attempt" + failPoint: + mode: { times: 1 } + operation: + name: "deleteOne" + arguments: + filter: { _id: 1 } + outcome: + result: + deletedCount: 1 + collection: + data: + - { _id: 2, x: 22 } + - + description: "DeleteOne is not committed on first attempt" + failPoint: + mode: { times: 1 } + data: { failBeforeCommitExceptionCode: 1 } + operation: + name: "deleteOne" + arguments: + filter: { _id: 1 } + outcome: + result: + deletedCount: 1 + collection: + data: + - { _id: 2, x: 22 } + - + description: "DeleteOne is never committed" + failPoint: + mode: { times: 2 } + data: { failBeforeCommitExceptionCode: 1 } + operation: + name: "deleteOne" + arguments: + filter: { _id: 1 } + outcome: + error: true + collection: + data: + - { _id: 1, x: 11 } + - { _id: 2, x: 22 } diff --git a/t/data/retryable-writes/findOneAndDelete.json b/t/data/retryable-writes/findOneAndDelete.json new file mode 100644 index 00000000..d8f6c8fa --- /dev/null +++ b/t/data/retryable-writes/findOneAndDelete.json @@ -0,0 +1,127 @@ +{ + "data": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 22 + } + ], + "minServerVersion": "3.6", + "tests": [ + { + "description": "FindOneAndDelete is committed on first attempt", + "failPoint": { + "mode": { + "times": 1 + } + }, + "operation": { + "name": "findOneAndDelete", + "arguments": { + "filter": { + "x": { + "$gte": 11 + } + }, + "sort": { + "x": 1 + } + } + }, + "outcome": { + "result": { + "_id": 1, + "x": 11 + }, + "collection": { + "data": [ + { + "_id": 2, + "x": 22 + } + ] + } + } + }, + { + "description": "FindOneAndDelete is not committed on first attempt", + "failPoint": { + "mode": { + "times": 1 + }, + "data": { + "failBeforeCommitExceptionCode": 1 + } + }, + "operation": { + "name": "findOneAndDelete", + "arguments": { + "filter": { + "x": { + "$gte": 11 + } + }, + "sort": { + "x": 1 + } + } + }, + "outcome": { + "result": { + "_id": 1, + "x": 11 + }, + "collection": { + "data": [ + { + "_id": 2, + "x": 22 + } + ] + } + } + }, + { + "description": "FindOneAndDelete is never committed", + "failPoint": { + "mode": { + "times": 2 + }, + "data": { + "failBeforeCommitExceptionCode": 1 + } + }, + "operation": { + "name": "findOneAndDelete", + "arguments": { + "filter": { + "x": { + "$gte": 11 + } + }, + "sort": { + "x": 1 + } + } + }, + "outcome": { + "error": true, + "collection": { + "data": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 22 + } + ] + } + } + } + ] +} diff --git a/t/data/retryable-writes/findOneAndDelete.yml b/t/data/retryable-writes/findOneAndDelete.yml new file mode 100644 index 00000000..d945bc4d --- /dev/null +++ b/t/data/retryable-writes/findOneAndDelete.yml @@ -0,0 +1,52 @@ +data: + - { _id: 1, x: 11 } + - { _id: 2, x: 22 } + +minServerVersion: '3.6' + +tests: + - + description: "FindOneAndDelete is committed on first attempt" + failPoint: + mode: { times: 1 } + operation: + name: "findOneAndDelete" + arguments: + filter: { x: { $gte: 11 }} + sort: { x: 1 } + outcome: + result: { _id: 1, x: 11 } + collection: + data: + - { _id: 2, x: 22 } + - + description: "FindOneAndDelete is not committed on first attempt" + failPoint: + mode: { times: 1 } + data: { failBeforeCommitExceptionCode: 1 } + operation: + name: "findOneAndDelete" + arguments: + filter: { x: { $gte: 11 }} + sort: { x: 1 } + outcome: + result: { _id: 1, x: 11 } + collection: + data: + - { _id: 2, x: 22 } + - + description: "FindOneAndDelete is never committed" + failPoint: + mode: { times: 2 } + data: { failBeforeCommitExceptionCode: 1 } + operation: + name: "findOneAndDelete" + arguments: + filter: { x: { $gte: 11 }} + sort: { x: 1 } + outcome: + error: true + collection: + data: + - { _id: 1, x: 11 } + - { _id: 2, x: 22 } diff --git a/t/data/retryable-writes/findOneAndReplace.json b/t/data/retryable-writes/findOneAndReplace.json new file mode 100644 index 00000000..22c03d74 --- /dev/null +++ b/t/data/retryable-writes/findOneAndReplace.json @@ -0,0 +1,135 @@ +{ + "data": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 22 + } + ], + "minServerVersion": "3.6", + "tests": [ + { + "description": "FindOneAndReplace is committed on first attempt", + "failPoint": { + "mode": { + "times": 1 + } + }, + "operation": { + "name": "findOneAndReplace", + "arguments": { + "filter": { + "_id": 1 + }, + "replacement": { + "_id": 1, + "x": 111 + }, + "returnDocument": "Before" + } + }, + "outcome": { + "result": { + "_id": 1, + "x": 11 + }, + "collection": { + "data": [ + { + "_id": 1, + "x": 111 + }, + { + "_id": 2, + "x": 22 + } + ] + } + } + }, + { + "description": "FindOneAndReplace is not committed on first attempt", + "failPoint": { + "mode": { + "times": 1 + }, + "data": { + "failBeforeCommitExceptionCode": 1 + } + }, + "operation": { + "name": "findOneAndReplace", + "arguments": { + "filter": { + "_id": 1 + }, + "replacement": { + "_id": 1, + "x": 111 + }, + "returnDocument": "Before" + } + }, + "outcome": { + "result": { + "_id": 1, + "x": 11 + }, + "collection": { + "data": [ + { + "_id": 1, + "x": 111 + }, + { + "_id": 2, + "x": 22 + } + ] + } + } + }, + { + "description": "FindOneAndReplace is never committed", + "failPoint": { + "mode": { + "times": 2 + }, + "data": { + "failBeforeCommitExceptionCode": 1 + } + }, + "operation": { + "name": "findOneAndReplace", + "arguments": { + "filter": { + "_id": 1 + }, + "replacement": { + "_id": 1, + "x": 111 + }, + "returnDocument": "Before" + } + }, + "outcome": { + "error": true, + "collection": { + "data": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 22 + } + ] + } + } + } + ] +} diff --git a/t/data/retryable-writes/findOneAndReplace.yml b/t/data/retryable-writes/findOneAndReplace.yml new file mode 100644 index 00000000..f2dfc572 --- /dev/null +++ b/t/data/retryable-writes/findOneAndReplace.yml @@ -0,0 +1,57 @@ +data: + - { _id: 1, x: 11 } + - { _id: 2, x: 22 } + +minServerVersion: '3.6' + +tests: + - + description: "FindOneAndReplace is committed on first attempt" + failPoint: + mode: { times: 1 } + operation: + name: "findOneAndReplace" + arguments: + filter: { _id: 1 } + replacement: { _id: 1, x: 111 } + returnDocument: "Before" + outcome: + result: { _id: 1, x: 11 } + collection: + data: + - { _id: 1, x: 111 } + - { _id: 2, x: 22 } + - + description: "FindOneAndReplace is not committed on first attempt" + failPoint: + mode: { times: 1 } + data: { failBeforeCommitExceptionCode: 1 } + operation: + name: "findOneAndReplace" + arguments: + filter: { _id: 1 } + replacement: { _id: 1, x: 111 } + returnDocument: "Before" + outcome: + result: { _id: 1, x: 11 } + collection: + data: + - { _id: 1, x: 111 } + - { _id: 2, x: 22 } + - + description: "FindOneAndReplace is never committed" + failPoint: + mode: { times: 2 } + data: { failBeforeCommitExceptionCode: 1 } + operation: + name: "findOneAndReplace" + arguments: + filter: { _id: 1 } + replacement: { _id: 1, x: 111 } + returnDocument: "Before" + outcome: + error: true + collection: + data: + - { _id: 1, x: 11 } + - { _id: 2, x: 22 } diff --git a/t/data/retryable-writes/findOneAndUpdate.json b/t/data/retryable-writes/findOneAndUpdate.json new file mode 100644 index 00000000..11a76ab1 --- /dev/null +++ b/t/data/retryable-writes/findOneAndUpdate.json @@ -0,0 +1,137 @@ +{ + "data": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 22 + } + ], + "minServerVersion": "3.6", + "tests": [ + { + "description": "FindOneAndUpdate is committed on first attempt", + "failPoint": { + "mode": { + "times": 1 + } + }, + "operation": { + "name": "findOneAndUpdate", + "arguments": { + "filter": { + "_id": 1 + }, + "update": { + "$inc": { + "x": 1 + } + }, + "returnDocument": "Before" + } + }, + "outcome": { + "result": { + "_id": 1, + "x": 11 + }, + "collection": { + "data": [ + { + "_id": 1, + "x": 12 + }, + { + "_id": 2, + "x": 22 + } + ] + } + } + }, + { + "description": "FindOneAndUpdate is not committed on first attempt", + "failPoint": { + "mode": { + "times": 1 + }, + "data": { + "failBeforeCommitExceptionCode": 1 + } + }, + "operation": { + "name": "findOneAndUpdate", + "arguments": { + "filter": { + "_id": 1 + }, + "update": { + "$inc": { + "x": 1 + } + }, + "returnDocument": "Before" + } + }, + "outcome": { + "result": { + "_id": 1, + "x": 11 + }, + "collection": { + "data": [ + { + "_id": 1, + "x": 12 + }, + { + "_id": 2, + "x": 22 + } + ] + } + } + }, + { + "description": "FindOneAndUpdate is never committed", + "failPoint": { + "mode": { + "times": 2 + }, + "data": { + "failBeforeCommitExceptionCode": 1 + } + }, + "operation": { + "name": "findOneAndUpdate", + "arguments": { + "filter": { + "_id": 1 + }, + "update": { + "$inc": { + "x": 1 + } + } + } + }, + "outcome": { + "error": true, + "collection": { + "data": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 22 + } + ] + } + } + } + ] +} diff --git a/t/data/retryable-writes/findOneAndUpdate.yml b/t/data/retryable-writes/findOneAndUpdate.yml new file mode 100644 index 00000000..c9fc7672 --- /dev/null +++ b/t/data/retryable-writes/findOneAndUpdate.yml @@ -0,0 +1,56 @@ +data: + - { _id: 1, x: 11 } + - { _id: 2, x: 22 } + +minServerVersion: '3.6' + +tests: + - + description: "FindOneAndUpdate is committed on first attempt" + failPoint: + mode: { times: 1 } + operation: + name: "findOneAndUpdate" + arguments: + filter: { _id: 1 } + update: { $inc: { x : 1 }} + returnDocument: "Before" + outcome: + result: { _id: 1, x: 11 } + collection: + data: + - { _id: 1, x: 12 } + - { _id: 2, x: 22 } + - + description: "FindOneAndUpdate is not committed on first attempt" + failPoint: + mode: { times: 1 } + data: { failBeforeCommitExceptionCode: 1 } + operation: + name: "findOneAndUpdate" + arguments: + filter: { _id: 1 } + update: { $inc: { x : 1 }} + returnDocument: "Before" + outcome: + result: { _id: 1, x: 11 } + collection: + data: + - { _id: 1, x: 12 } + - { _id: 2, x: 22 } + - + description: "FindOneAndUpdate is never committed" + failPoint: + mode: { times: 2 } + data: { failBeforeCommitExceptionCode: 1 } + operation: + name: "findOneAndUpdate" + arguments: + filter: { _id: 1 } + update: { $inc: { x : 1 }} + outcome: + error: true + collection: + data: + - { _id: 1, x: 11 } + - { _id: 2, x: 22 } diff --git a/t/data/retryable-writes/insertMany.json b/t/data/retryable-writes/insertMany.json new file mode 100644 index 00000000..2d71cb91 --- /dev/null +++ b/t/data/retryable-writes/insertMany.json @@ -0,0 +1,153 @@ +{ + "data": [ + { + "_id": 1, + "x": 11 + } + ], + "minServerVersion": "3.6", + "tests": [ + { + "description": "InsertMany succeeds after one network error", + "failPoint": { + "mode": { + "times": 1 + } + }, + "operation": { + "name": "insertMany", + "arguments": { + "documents": [ + { + "_id": 2, + "x": 22 + }, + { + "_id": 3, + "x": 33 + } + ], + "options": { + "ordered": true + } + } + }, + "outcome": { + "result": { + "insertedIds": { + "0": 2, + "1": 3 + } + }, + "collection": { + "data": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 22 + }, + { + "_id": 3, + "x": 33 + } + ] + } + } + }, + { + "description": "InsertMany with unordered execution", + "failPoint": { + "mode": { + "times": 1 + } + }, + "operation": { + "name": "insertMany", + "arguments": { + "documents": [ + { + "_id": 2, + "x": 22 + }, + { + "_id": 3, + "x": 33 + } + ], + "options": { + "ordered": false + } + } + }, + "outcome": { + "result": { + "insertedIds": { + "0": 2, + "1": 3 + } + }, + "collection": { + "data": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 22 + }, + { + "_id": 3, + "x": 33 + } + ] + } + } + }, + { + "description": "InsertMany fails after multiple network errors", + "failPoint": { + "mode": "alwaysOn", + "data": { + "failBeforeCommitExceptionCode": 1 + } + }, + "operation": { + "name": "insertMany", + "arguments": { + "documents": [ + { + "_id": 2, + "x": 22 + }, + { + "_id": 3, + "x": 33 + }, + { + "_id": 4, + "x": 44 + } + ], + "options": { + "ordered": true + } + } + }, + "outcome": { + "error": true, + "collection": { + "data": [ + { + "_id": 1, + "x": 11 + } + ] + } + } + } + ] +} diff --git a/t/data/retryable-writes/insertMany.yml b/t/data/retryable-writes/insertMany.yml new file mode 100644 index 00000000..f3ff5d2f --- /dev/null +++ b/t/data/retryable-writes/insertMany.yml @@ -0,0 +1,68 @@ +data: + - { _id: 1, x: 11 } + +minServerVersion: '3.6' + +tests: + - + description: "InsertMany succeeds after one network error" + failPoint: + mode: { times: 1 } + operation: + name: "insertMany" + arguments: + documents: + - { _id: 2, x: 22 } + - { _id: 3, x: 33 } + options: { ordered: true } + outcome: + result: + insertedIds: { 0: 2, 1: 3 } + collection: + data: + - { _id: 1, x: 11 } + - { _id: 2, x: 22 } + - { _id: 3, x: 33 } + - + description: "InsertMany with unordered execution" + failPoint: + mode: { times: 1 } + operation: + name: "insertMany" + arguments: + documents: + - { _id: 2, x: 22 } + - { _id: 3, x: 33 } + options: { ordered: false } + outcome: + result: + insertedIds: { 0: 2, 1: 3 } + collection: + data: + - { _id: 1, x: 11 } + - { _id: 2, x: 22 } + - { _id: 3, x: 33 } + - + description: "InsertMany fails after multiple network errors" + failPoint: + # Normally, a mongod will insert the documents as a batch with a + # single commit. If this fails, mongod may try to insert each + # document one at a time depending on the failure. Therefore our + # single insert command may trigger the failpoint twice on each + # driver attempt. This test permanently enables the fail point to + # ensure the retry attempt always fails. + mode: "alwaysOn" + data: { failBeforeCommitExceptionCode: 1 } + operation: + name: "insertMany" + arguments: + documents: + - { _id: 2, x: 22 } + - { _id: 3, x: 33 } + - { _id: 4, x: 44 } + options: { ordered: true } + outcome: + error: true + collection: + data: + - { _id: 1, x: 11 } diff --git a/t/data/retryable-writes/insertOne.json b/t/data/retryable-writes/insertOne.json new file mode 100644 index 00000000..58ad6cc6 --- /dev/null +++ b/t/data/retryable-writes/insertOne.json @@ -0,0 +1,129 @@ +{ + "data": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 22 + } + ], + "minServerVersion": "3.6", + "tests": [ + { + "description": "InsertOne is committed on first attempt", + "failPoint": { + "mode": { + "times": 1 + } + }, + "operation": { + "name": "insertOne", + "arguments": { + "document": { + "_id": 3, + "x": 33 + } + } + }, + "outcome": { + "result": { + "insertedId": 3 + }, + "collection": { + "data": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 22 + }, + { + "_id": 3, + "x": 33 + } + ] + } + } + }, + { + "description": "InsertOne is not committed on first attempt", + "failPoint": { + "mode": { + "times": 1 + }, + "data": { + "failBeforeCommitExceptionCode": 1 + } + }, + "operation": { + "name": "insertOne", + "arguments": { + "document": { + "_id": 3, + "x": 33 + } + } + }, + "outcome": { + "result": { + "insertedId": 3 + }, + "collection": { + "data": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 22 + }, + { + "_id": 3, + "x": 33 + } + ] + } + } + }, + { + "description": "InsertOne is never committed", + "failPoint": { + "mode": { + "times": 2 + }, + "data": { + "failBeforeCommitExceptionCode": 1 + } + }, + "operation": { + "name": "insertOne", + "arguments": { + "document": { + "_id": 3, + "x": 33 + } + } + }, + "outcome": { + "error": true, + "collection": { + "data": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 22 + } + ] + } + } + } + ] +} diff --git a/t/data/retryable-writes/insertOne.yml b/t/data/retryable-writes/insertOne.yml new file mode 100644 index 00000000..5a303ddc --- /dev/null +++ b/t/data/retryable-writes/insertOne.yml @@ -0,0 +1,55 @@ +data: + - { _id: 1, x: 11 } + - { _id: 2, x: 22 } + +minServerVersion: '3.6' + +tests: + - + description: "InsertOne is committed on first attempt" + failPoint: + mode: { times: 1 } + operation: + name: "insertOne" + arguments: + document: { _id: 3, x: 33 } + outcome: + result: + insertedId: 3 + collection: + data: + - { _id: 1, x: 11 } + - { _id: 2, x: 22 } + - { _id: 3, x: 33 } + - + description: "InsertOne is not committed on first attempt" + failPoint: + mode: { times: 1 } + data: { failBeforeCommitExceptionCode: 1 } + operation: + name: "insertOne" + arguments: + document: { _id: 3, x: 33 } + outcome: + result: + insertedId: 3 + collection: + data: + - { _id: 1, x: 11 } + - { _id: 2, x: 22 } + - { _id: 3, x: 33 } + - + description: "InsertOne is never committed" + failPoint: + mode: { times: 2 } + data: { failBeforeCommitExceptionCode: 1 } + operation: + name: "insertOne" + arguments: + document: { _id: 3, x: 33 } + outcome: + error: true + collection: + data: + - { _id: 1, x: 11 } + - { _id: 2, x: 22 } diff --git a/t/data/retryable-writes/replaceOne.json b/t/data/retryable-writes/replaceOne.json new file mode 100644 index 00000000..f31966c3 --- /dev/null +++ b/t/data/retryable-writes/replaceOne.json @@ -0,0 +1,134 @@ +{ + "data": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 22 + } + ], + "minServerVersion": "3.6", + "tests": [ + { + "description": "ReplaceOne is committed on first attempt", + "failPoint": { + "mode": { + "times": 1 + } + }, + "operation": { + "name": "replaceOne", + "arguments": { + "filter": { + "_id": 1 + }, + "replacement": { + "_id": 1, + "x": 111 + } + } + }, + "outcome": { + "result": { + "matchedCount": 1, + "modifiedCount": 1, + "upsertedCount": 0 + }, + "collection": { + "data": [ + { + "_id": 1, + "x": 111 + }, + { + "_id": 2, + "x": 22 + } + ] + } + } + }, + { + "description": "ReplaceOne is not committed on first attempt", + "failPoint": { + "mode": { + "times": 1 + }, + "data": { + "failBeforeCommitExceptionCode": 1 + } + }, + "operation": { + "name": "replaceOne", + "arguments": { + "filter": { + "_id": 1 + }, + "replacement": { + "_id": 1, + "x": 111 + } + } + }, + "outcome": { + "result": { + "matchedCount": 1, + "modifiedCount": 1, + "upsertedCount": 0 + }, + "collection": { + "data": [ + { + "_id": 1, + "x": 111 + }, + { + "_id": 2, + "x": 22 + } + ] + } + } + }, + { + "description": "ReplaceOne is never committed", + "failPoint": { + "mode": { + "times": 2 + }, + "data": { + "failBeforeCommitExceptionCode": 1 + } + }, + "operation": { + "name": "replaceOne", + "arguments": { + "filter": { + "_id": 1 + }, + "replacement": { + "_id": 1, + "x": 111 + } + } + }, + "outcome": { + "error": true, + "collection": { + "data": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 22 + } + ] + } + } + } + ] +} diff --git a/t/data/retryable-writes/replaceOne.yml b/t/data/retryable-writes/replaceOne.yml new file mode 100644 index 00000000..08fbda9a --- /dev/null +++ b/t/data/retryable-writes/replaceOne.yml @@ -0,0 +1,60 @@ +data: + - { _id: 1, x: 11 } + - { _id: 2, x: 22 } + +minServerVersion: '3.6' + +tests: + - + description: "ReplaceOne is committed on first attempt" + failPoint: + mode: { times: 1 } + operation: + name: "replaceOne" + arguments: + filter: { _id: 1 } + replacement: { _id: 1, x: 111 } + outcome: + result: + matchedCount: 1 + modifiedCount: 1 + upsertedCount: 0 + collection: + data: + - { _id: 1, x: 111 } + - { _id: 2, x: 22 } + - + description: "ReplaceOne is not committed on first attempt" + failPoint: + mode: { times: 1 } + data: { failBeforeCommitExceptionCode: 1 } + operation: + name: "replaceOne" + arguments: + filter: { _id: 1 } + replacement: { _id: 1, x: 111 } + outcome: + result: + matchedCount: 1 + modifiedCount: 1 + upsertedCount: 0 + collection: + data: + - { _id: 1, x: 111 } + - { _id: 2, x: 22 } + - + description: "ReplaceOne is never committed" + failPoint: + mode: { times: 2 } + data: { failBeforeCommitExceptionCode: 1 } + operation: + name: "replaceOne" + arguments: + filter: { _id: 1 } + replacement: { _id: 1, x: 111 } + outcome: + error: true + collection: + data: + - { _id: 1, x: 11 } + - { _id: 2, x: 22 } diff --git a/t/data/retryable-writes/updateOne.json b/t/data/retryable-writes/updateOne.json new file mode 100644 index 00000000..96aa2bde --- /dev/null +++ b/t/data/retryable-writes/updateOne.json @@ -0,0 +1,275 @@ +{ + "data": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 22 + } + ], + "minServerVersion": "3.6", + "tests": [ + { + "description": "UpdateOne is committed on first attempt", + "failPoint": { + "mode": { + "times": 1 + } + }, + "operation": { + "name": "updateOne", + "arguments": { + "filter": { + "_id": 1 + }, + "update": { + "$inc": { + "x": 1 + } + } + } + }, + "outcome": { + "result": { + "matchedCount": 1, + "modifiedCount": 1, + "upsertedCount": 0 + }, + "collection": { + "data": [ + { + "_id": 1, + "x": 12 + }, + { + "_id": 2, + "x": 22 + } + ] + } + } + }, + { + "description": "UpdateOne is not committed on first attempt", + "failPoint": { + "mode": { + "times": 1 + }, + "data": { + "failBeforeCommitExceptionCode": 1 + } + }, + "operation": { + "name": "updateOne", + "arguments": { + "filter": { + "_id": 1 + }, + "update": { + "$inc": { + "x": 1 + } + } + } + }, + "outcome": { + "result": { + "matchedCount": 1, + "modifiedCount": 1, + "upsertedCount": 0 + }, + "collection": { + "data": [ + { + "_id": 1, + "x": 12 + }, + { + "_id": 2, + "x": 22 + } + ] + } + } + }, + { + "description": "UpdateOne is never committed", + "failPoint": { + "mode": { + "times": 2 + }, + "data": { + "failBeforeCommitExceptionCode": 1 + } + }, + "operation": { + "name": "updateOne", + "arguments": { + "filter": { + "_id": 1 + }, + "update": { + "$inc": { + "x": 1 + } + } + } + }, + "outcome": { + "error": true, + "collection": { + "data": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 22 + } + ] + } + } + }, + { + "description": "UpdateOne with upsert is committed on first attempt", + "failPoint": { + "mode": { + "times": 1 + } + }, + "operation": { + "name": "updateOne", + "arguments": { + "filter": { + "_id": 3, + "x": 33 + }, + "update": { + "$inc": { + "x": 1 + } + }, + "upsert": true + } + }, + "outcome": { + "result": { + "matchedCount": 0, + "modifiedCount": 0, + "upsertedCount": 1, + "upsertedId": 3 + }, + "collection": { + "data": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 22 + }, + { + "_id": 3, + "x": 34 + } + ] + } + } + }, + { + "description": "UpdateOne with upsert is not committed on first attempt", + "failPoint": { + "mode": { + "times": 1 + }, + "data": { + "failBeforeCommitExceptionCode": 1 + } + }, + "operation": { + "name": "updateOne", + "arguments": { + "filter": { + "_id": 3, + "x": 33 + }, + "update": { + "$inc": { + "x": 1 + } + }, + "upsert": true + } + }, + "outcome": { + "result": { + "matchedCount": 0, + "modifiedCount": 0, + "upsertedCount": 1, + "upsertedId": 3 + }, + "collection": { + "data": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 22 + }, + { + "_id": 3, + "x": 34 + } + ] + } + } + }, + { + "description": "UpdateOne with upsert is never committed", + "failPoint": { + "mode": { + "times": 2 + }, + "data": { + "failBeforeCommitExceptionCode": 1 + } + }, + "operation": { + "name": "updateOne", + "arguments": { + "filter": { + "_id": 3, + "x": 33 + }, + "update": { + "$inc": { + "x": 1 + } + }, + "upsert": true + } + }, + "outcome": { + "error": true, + "collection": { + "data": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 22 + } + ] + } + } + } + ] +} diff --git a/t/data/retryable-writes/updateOne.yml b/t/data/retryable-writes/updateOne.yml new file mode 100644 index 00000000..810e49f2 --- /dev/null +++ b/t/data/retryable-writes/updateOne.yml @@ -0,0 +1,120 @@ +data: + - { _id: 1, x: 11 } + - { _id: 2, x: 22 } + +minServerVersion: '3.6' + +tests: + - + description: "UpdateOne is committed on first attempt" + failPoint: + mode: { times: 1 } + operation: + name: "updateOne" + arguments: + filter: { _id: 1 } + update: { $inc: { x : 1 }} + outcome: + result: + matchedCount: 1 + modifiedCount: 1 + upsertedCount: 0 + collection: + data: + - { _id: 1, x: 12 } + - { _id: 2, x: 22 } + - + description: "UpdateOne is not committed on first attempt" + failPoint: + mode: { times: 1 } + data: { failBeforeCommitExceptionCode: 1 } + operation: + name: "updateOne" + arguments: + filter: { _id: 1 } + update: { $inc: { x : 1 }} + outcome: + result: + matchedCount: 1 + modifiedCount: 1 + upsertedCount: 0 + collection: + data: + - { _id: 1, x: 12 } + - { _id: 2, x: 22 } + - + description: "UpdateOne is never committed" + failPoint: + mode: { times: 2 } + data: { failBeforeCommitExceptionCode: 1 } + operation: + name: "updateOne" + arguments: + filter: { _id: 1 } + update: { $inc: { x : 1 }} + outcome: + error: true + collection: + data: + - { _id: 1, x: 11 } + - { _id: 2, x: 22 } + - + description: "UpdateOne with upsert is committed on first attempt" + failPoint: + mode: { times: 1 } + operation: + name: "updateOne" + arguments: + filter: { _id: 3, x: 33 } + update: { $inc: { x : 1 }} + upsert: true + outcome: + result: + matchedCount: 0 + modifiedCount: 0 + upsertedCount: 1 + upsertedId: 3 + collection: + data: + - { _id: 1, x: 11 } + - { _id: 2, x: 22 } + - { _id: 3, x: 34 } + - + description: "UpdateOne with upsert is not committed on first attempt" + failPoint: + mode: { times: 1 } + data: { failBeforeCommitExceptionCode: 1 } + operation: + name: "updateOne" + arguments: + filter: { _id: 3, x: 33 } + update: { $inc: { x : 1 }} + upsert: true + outcome: + result: + matchedCount: 0 + modifiedCount: 0 + upsertedCount: 1 + upsertedId: 3 + collection: + data: + - { _id: 1, x: 11 } + - { _id: 2, x: 22 } + - { _id: 3, x: 34 } + - + description: "UpdateOne with upsert is never committed" + failPoint: + mode: { times: 2 } + data: { failBeforeCommitExceptionCode: 1 } + operation: + name: "updateOne" + arguments: + filter: { _id: 3, x: 33 } + update: { $inc: { x : 1 }} + upsert: true + outcome: + error: true + collection: + data: + - { _id: 1, x: 11 } + - { _id: 2, x: 22 } diff --git a/t/lib/MongoDBTest.pm b/t/lib/MongoDBTest.pm index 32a66a33..7d51ff3c 100644 --- a/t/lib/MongoDBTest.pm +++ b/t/lib/MongoDBTest.pm @@ -39,6 +39,7 @@ our @EXPORT_OK = qw( uri_escape get_unique_collection get_feature_compat_version + check_min_server_version uuid_to_string ); @@ -87,9 +88,10 @@ sub get_test_db { } sub get_unique_collection { - my ( $db, $prefix ) = @_; + my ( $db, $prefix, $opts ) = @_; return $db->get_collection( - sprintf( '%s_%d_%d', $prefix, time(), int(rand(999999)) ) + sprintf( '%s_%d_%d', $prefix, time(), int(rand(999999)) ), + $opts ); } @@ -154,6 +156,18 @@ sub server_version { return version->parse("v$version_str"); } +sub check_min_server_version { + my ( $conn, $min_version ) = @_; + $min_version = "v$min_version" unless $min_version =~ /^v/; + $min_version .= ".0" unless $min_version =~ /^v\d+\.\d+.\d+$/; + $min_version = version->new($min_version); + my $server_version = server_version( $conn ); + if ( $min_version > $server_version ) { + return 1; + } + return 0; +} + sub server_type { my $conn = shift; diff --git a/t/retryable-writes-command-construction.t b/t/retryable-writes-command-construction.t new file mode 100644 index 00000000..cf1c93c0 --- /dev/null +++ b/t/retryable-writes-command-construction.t @@ -0,0 +1,320 @@ +# +# Copyright 2009-2013 MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Test in t-dynamic as not sure if failover should be tested on install? + +use strict; +use warnings; +use JSON::MaybeXS; +use Path::Tiny 0.054; # basename with suffix +use Test::More 0.88; +use Test::Fatal; +use boolean; + +use lib "t/lib"; + +use MongoDBTest qw/ + build_client + get_test_db + clear_testdbs + get_unique_collection + server_version + server_type + check_min_server_version + get_feature_compat_version +/; + +my @events; + +sub clear_events { @events = () } +sub event_count { scalar @events } +sub event_cb { push @events, $_[0] } + +my $conn = build_client( + retry_writes => 1, + monitoring_callback => \&event_cb, +); +my $testdb = get_test_db($conn); +my $server_version = server_version($conn); +my $server_type = server_type($conn); +my $feat_compat_ver = get_feature_compat_version($conn); + +plan skip_all => "standalone servers dont support retryableWrites" + if $server_type eq 'Standalone'; + +plan skip_all => "retryableWrites requires featureCompatibilityVersion 3.6 - got $feat_compat_ver" + if ( $feat_compat_ver < 3.6 ); + +sub check_event_no_txn { + my $cmd = shift; + my $op = shift; + is $events[-2]->{ commandName }, $cmd, "$op command correct"; + is $events[-2]->{ type }, 'command_started', "$op command started"; + ok ! exists $events[-2]->{ command }->{ txnNumber }, "$op no transaction number"; +}; + +subtest 'unacknowledged writes no transaction' => sub { + my $coll = get_unique_collection( $testdb, 'cmd_con_792_unac', { write_concern => { w => 0 } } ); + + clear_events(); + $coll->insert_one( { _id => 1 } ); + check_event_no_txn( 'insert', 'insert_one' ); + + clear_events(); + $coll->insert_many( [ + { _id => 2 }, + { _id => 3 }, + { _id => 4 }, + { _id => 5 }, + { _id => 6 }, + { _id => 7 }, + { _id => 8 }, + ] ); + check_event_no_txn( 'insert', 'insert_many' ); + + clear_events(); + $coll->replace_one( + { _id => 1 }, + { _id => 1, foo => 'bar' } + ); + check_event_no_txn( 'update', 'replace_one' ); + + clear_events(); + $coll->update_one( + { _id => 1 }, + { '$set' => { foo => 'qux' } }, + ); + check_event_no_txn( 'update', 'update_one' ); + + clear_events(); + $coll->update_many( + { _id => { '$in' => [1,2,3] } }, + { '$set' => { foo => 'qux' } }, + ); + check_event_no_txn( 'update', 'update_many' ); + + clear_events(); + $coll->delete_one( + { _id => 1 }, + ); + check_event_no_txn( 'delete', 'delete_one' ); + + clear_events(); + $coll->delete_many( + { _id => { '$in' => [2,3] } }, + ); + check_event_no_txn( 'delete', 'delete_many' ); + + clear_events(); + $coll->find_one_and_delete( + { _id => 4 }, + ); + check_event_no_txn( 'findAndModify', 'find_one_and_delete' ); + + clear_events(); + $coll->find_one_and_replace( + { _id => 5 }, + { _id => 5, flibble => 'bee' }, + ); + check_event_no_txn( 'findAndModify', 'find_one_and_replace' ); + + clear_events(); + $coll->find_one_and_update( + { _id => 6 }, + { '$set' => { bar => 'baz' } }, + ); + check_event_no_txn( 'findAndModify', 'find_one_and_update' ); + + # building an (un)ordered bulk is the same as using bulkWrite + clear_events(); + $coll->bulk_write( [ + insert_one => [ { _id => 1 } ], + insert_one => [ { _id => 2 } ], + ] ); + check_event_no_txn( 'insert', 'bulk_write ordered' ); + + clear_events(); + $coll->bulk_write( [ + insert_one => [ { _id => 1 } ], + insert_one => [ { _id => 2 } ], + ], { ordered => 0 } ); + check_event_no_txn( 'insert', 'bulk_write unordered' ); +}; + + +subtest 'unsupported single statement writes' => sub { + my $coll = get_unique_collection( $testdb, 'cmd_con_792_unsup' ); + + $coll->insert_many( [ + { _id => 1 }, + { _id => 2 }, + { _id => 3 }, + ] ); + + clear_events(); + $coll->update_many( + { _id => { '$in' => [1,2,3] } }, + { '$set' => { foo => 'qux' } }, + ); + check_event_no_txn( 'update', 'update_many' ); + + clear_events(); + $coll->delete_many( + { _id => { '$in' => [2,3] } }, + ); + check_event_no_txn( 'delete', 'delete_many' ); +}; + +subtest 'unsupported multi statement writes' => sub { + my $coll = get_unique_collection( $testdb, 'cmd_con_792_u_multi' ); + + $coll->insert_many( [ + { _id => 1 }, + { _id => 2 }, + { _id => 3 }, + ] ); + + clear_events(); + $coll->bulk_write( [ + update_many => [ + { _id => { '$in' => [1,2,3] } }, + { '$set' => { foo => 'qux' } }, + ], + ] ); + check_event_no_txn( 'update', 'bulk_write update_many' ); + + clear_events(); + $coll->bulk_write( [ + delete_many => [ + { _id => { '$in' => [1,2,3] } }, + ], + ] ); + check_event_no_txn( 'delete', 'bulk_write delete_many' ); +}; + +subtest 'unsupported write commands' => sub { + my $coll = get_unique_collection( $testdb, 'cmd_con_792_u_write' ); + + $coll->insert_many( [ + map { { count => $_ } } 1..20 + ] ); + + clear_events(); + my $result = $coll->aggregate( [ + { '$match' => { count => { '$gt' => 0 } } }, + { '$out' => 'test_out' } + ] ); + check_event_no_txn( 'aggregate', 'aggregate with $out' ); +}; + +sub check_event_with_txn { + my $cmd = shift; + my $op = shift; + is $events[-2]->{ commandName }, $cmd, "$op command correct"; + is $events[-2]->{ type }, 'command_started', "$op command started"; + isa_ok $events[-2]->{ command }->{ txnNumber }, "Math::BigInt", "$op has transaction number"; +} + +subtest 'supported single statement writes' => sub { + my $coll = get_unique_collection( $testdb, 'cmd_con_792_sup' ); + + $coll->insert_many( [ + { _id => 2 }, + { _id => 3 }, + { _id => 4 }, + { _id => 5 }, + { _id => 6 }, + { _id => 7 }, + { _id => 8 }, + ] ); + + clear_events(); + $coll->insert_one( { _id => 1 } ); + check_event_with_txn( 'insert', 'insert_one' ); + + clear_events(); + $coll->replace_one( + { _id => 1 }, + { _id => 1, foo => 'bar' } + ); + check_event_with_txn( 'update', 'replace_one' ); + + clear_events(); + $coll->update_one( + { _id => 1 }, + { '$set' => { foo => 'qux' } }, + ); + check_event_with_txn( 'update', 'update_one' ); + + clear_events(); + $coll->delete_one( + { _id => 1 }, + ); + check_event_with_txn( 'delete', 'delete_one' ); + + clear_events(); + $coll->find_one_and_delete( + { _id => 4 }, + ); + check_event_with_txn( 'findAndModify', 'find_one_and_delete' ); + + clear_events(); + $coll->find_one_and_replace( + { _id => 5 }, + { _id => 5, flibble => 'bee' }, + ); + check_event_with_txn( 'findAndModify', 'find_one_and_replace' ); + + clear_events(); + $coll->find_one_and_update( + { _id => 6 }, + { '$set' => { bar => 'baz' } }, + ); + check_event_with_txn( 'findAndModify', 'find_one_and_update' ); +}; + +subtest 'supported multi statement writes' => sub { + my $coll = get_unique_collection( $testdb, 'cmd_con_792_s_multi' ); + + clear_events(); + $coll->insert_many( [ + map { { _id => $_ } } 1..5 + ], { ordered => 1 } ); + check_event_with_txn( 'insert', 'insert_many' ); + + clear_events(); + $coll->insert_many( [ + map { { _id => $_ } } 6..10 + ], { ordered => 0 } ); + check_event_with_txn( 'insert', 'insert_many' ); + + # building an (un)ordered bulk is the same as using bulkWrite + clear_events(); + $coll->bulk_write( [ + insert_one => [ { _id => 11 } ], + insert_one => [ { _id => 12 } ], + ] ); + check_event_with_txn( 'insert', 'bulk_write ordered' ); + + clear_events(); + $coll->bulk_write( [ + insert_one => [ { _id => 13 } ], + insert_one => [ { _id => 14 } ], + ], { ordered => 0 } ); + check_event_with_txn( 'insert', 'bulk_write unordered' ); +}; + +done_testing; diff --git a/t/retryable-writes-spec.t b/t/retryable-writes-spec.t new file mode 100644 index 00000000..3a26641a --- /dev/null +++ b/t/retryable-writes-spec.t @@ -0,0 +1,260 @@ +# +# Copyright 2009-2013 MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +use strict; +use warnings; +use JSON::MaybeXS; +use Path::Tiny 0.054; # basename with suffix +use Test::More 0.88; +use Test::Fatal; + +use lib "t/lib"; + +use MongoDBTest qw/ + build_client + get_test_db + clear_testdbs + get_unique_collection + server_version + server_type + check_min_server_version + get_feature_compat_version +/; + +my $conn = build_client(retry_writes => 1); +my $testdb = get_test_db($conn); +my $server_version = server_version($conn); +my $server_type = server_type($conn); +my $feat_compat_ver = get_feature_compat_version($conn); + +plan skip_all => "standalone servers dont support retryableWrites" + if $server_type eq 'Standalone'; + +plan skip_all => "retryableWrites requires featureCompatibilityVersion 3.6 - got $feat_compat_ver" + if ( $feat_compat_ver < 3.6 ); + +sub run_test { + my ( $coll, $test ) = @_; + enable_failpoint( $test->{failPoint} ) if exists $test->{failPoint}; + + my $op = $test->{operation}; + my $method = $op->{name}; + $method =~ s{([A-Z])}{_\L$1}g; + + my $func_name = 'do_' . $method; + + my $ret = eval { main->$func_name( $coll, $op->{arguments} ) }; + my $err = $@; + + if ( exists $test->{outcome}->{error} && $test->{outcome}->{error} ) { + ok $err, 'Exception occured'; + } + + if ( !exists $test->{outcome}{error} && exists $test->{outcome}->{result} ) { + + #Dwarn $ret; + #Dwarn $test->{outcome}; + for my $res_key ( keys %{ $test->{outcome}->{result} } ) { + next if $res_key eq 'upsertedCount' && ! $ret->can('upserted_count'); # Driver does not parse this value on all things? + # next if $res_key eq 'upsertedId' && ! defined $ret->upserted_id; # upserted id is always present + my $res = $test->{outcome}->{result}->{$res_key}; + + if ( $res_key eq 'insertedIds' ) { + my $ret_parsed = {}; + for my $item ( @{ $ret->inserted } ) { + $ret_parsed->{$item->{index}} = $item->{_id}; + } + is_deeply $ret_parsed, $test->{outcome}->{result}->{insertedIds}, 'insertedIds correct in result'; + next; + } + if ( $res_key eq 'upsertedIds' ) { + my $ret_parsed = {}; + for my $item ( @{ $ret->upserted } ) { + $ret_parsed->{$item->{index}} = $item->{_id}; + } + is_deeply $ret_parsed, $test->{outcome}->{result}->{upsertedIds}, 'upsertedIds correct in result'; + next; + } + my $ret_key = $res_key; + $ret_key =~ s{([A-Z])}{_\L$1}g; + + is $ret->{$ret_key}, $res, "$res_key correct in result"; + } + } + + my @coll_outcome = $coll->find()->all; + my $coll_expected = $test->{outcome}->{collection}->{data}; + + is_deeply \@coll_outcome, $coll_expected, 'Collection has correct outcome'; + disable_failpoint() if exists $test->{failPoint}; +} + +sub do_delete_one { + my ( $self, $coll, $args ) = @_; + $args //= {}; + my $filter = defined $args->{filter} ? $args->{filter} : {}; + return $coll->delete_one( $filter ); +} + +sub do_replace_one { + my ( $self, $coll, $args ) = @_; + $args //= {}; + my $filter = defined $args->{filter} ? $args->{filter} : {}; + my $replacement = defined $args->{replacement} ? $args->{replacement} : {}; + return $coll->replace_one( $filter, $replacement ); +} + +sub do_find_one_and_update { + my ( $self, $coll, $args ) = @_; + $args //= {}; + my $filter = defined $args->{filter} ? $args->{filter} : {}; + my $update = defined $args->{update} ? $args->{update} : {}; + my $options = { + ( defined $args->{returnDocument} ? ( returnDocument => lc $args->{returnDocument} ) : () ) + }; + return $coll->find_one_and_update( $filter, $update, $options ); +} + +sub do_find_one_and_replace { + my ( $self, $coll, $args ) = @_; + $args //= {}; + my $filter = defined $args->{filter} ? $args->{filter} : {}; + my $replace = defined $args->{replacement} ? $args->{replacement} : {}; + my $options = { + ( defined $args->{returnDocument} ? ( returnDocument => lc $args->{returnDocument} ) : () ) + }; + return $coll->find_one_and_replace( $filter, $replace, $options ); +} + +sub do_insert_one { + my ( $self, $coll, $args ) = @_; + return $coll->insert_one( $args->{document} ); +} + +sub do_find_one_and_delete { + my ( $self, $coll, $args ) = @_; + $args //= {}; + my $filter = defined $args->{filter} ? $args->{filter} : {}; + my $options = { + ( defined $args->{sort} ? ( sort => $args->{sort} ) : () ) + }; + return $coll->find_one_and_delete( $filter, $options ); +} + +sub do_bulk_write { + my ( $self, $coll, $args ) = @_; + my $options = { + ( defined $args->{options} + && defined $args->{options}->{ordered} + && $args->{options}->{ordered} + ? ( ordered => 1 ) + : ( ordered => 0 ) ) + }; + + my @arguments; + for my $request ( @{ $args->{requests} } ) { + if ( $request->{name} eq 'insertOne' ) { + push @arguments, { insert_one => [ $request->{arguments}->{document} ] }; + } elsif ( $request->{name} eq 'updateOne' ) { + push @arguments, { update_one => [ + $request->{arguments}->{filter}, + $request->{arguments}->{update}, + ( defined $request->{arguments}->{upsert} + ? ( { upsert => $request->{arguments}->{upsert} ? 1 : 0 } ) + : () ) + ] }; + } elsif ( $request->{name} eq 'deleteOne' ) { + push @arguments, { delete_one => [ $request->{arguments}->{filter} ] }; + } elsif ( $request->{name} eq 'replaceOne' ) { + push @arguments, { replace_one => [ + $request->{arguments}->{filter}, + $request->{arguments}->{replacement} + ] }; + } + } + return $coll->bulk_write( \@arguments, $options ); +} + +sub do_update_one { + my ( $self, $coll, $args ) = @_; + $args //= {}; + my $filter = defined $args->{filter} ? $args->{filter} : {}; + my $update = defined $args->{update} ? $args->{update} : {}; + my $options = { + ( defined $args->{upsert} ? $args->{upsert} ? ( upsert => 1 ) : ( upsert => 0 ) : () ) + }; + return $coll->update_one( $filter, $update, $options ); +} + +sub do_insert_many { + my ( $self, $coll, $args ) = @_; + $args //= {}; + my $options = { + ( defined $args->{options} + && defined $args->{options}->{ordered} + && $args->{options}->{ordered} + ? ( ordered => 1 ) + : ( ordered => 0 ) ) + }; + return $coll->insert_many( $args->{documents}, $options ); +} + +my $dir = path("t/data/retryable-writes"); +my $iterator = $dir->iterator; +while ( my $path = $iterator->() ) { + next unless $path =~ /\.json$/; + my $plan = eval { decode_json( $path->slurp_utf8 ) }; + if ($@) { + die "Error decoding $path: $@"; + } + + subtest $path => sub { + if ( exists $plan->{minServerVersion} ) { + my $min_version = $plan->{minServerVersion}; + plan skip_all => "Requires MongoDB $min_version" + if check_min_server_version( $conn, $min_version ); + } + + for my $test ( @{ $plan->{tests} } ) { + my $coll = get_unique_collection( $testdb, 'retry_write' ); + my $ret = $coll->insert_many( $plan->{data} ); + my $description = $test->{description}; + subtest $description => sub { + run_test( $coll, $test ); + } + } + }; +} + +sub enable_failpoint { + my $doc = shift; + $conn->send_admin_command([ + configureFailPoint => 'onPrimaryTransactionalWrite', + %$doc, + ]); +} + +sub disable_failpoint { + my $doc = shift; + $conn->send_admin_command([ + configureFailPoint => 'onPrimaryTransactionalWrite', + mode => 'off', + ]); +} + +clear_testdbs; + +done_testing; diff --git a/t/retryable-writes-split-batch.t b/t/retryable-writes-split-batch.t new file mode 100644 index 00000000..41f43991 --- /dev/null +++ b/t/retryable-writes-split-batch.t @@ -0,0 +1,101 @@ +# +# Copyright 2009-2013 MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +use strict; +use warnings; +use JSON::MaybeXS; +use Path::Tiny 0.054; # basename with suffix +use Test::More 0.88; +use Test::Fatal; + +use lib "t/lib"; + +use MongoDBTest qw/ + build_client + get_test_db + clear_testdbs + get_unique_collection + server_version + server_type + check_min_server_version + get_feature_compat_version +/; + +my $conn = build_client( + retry_writes => 1, +); +my $testdb = get_test_db($conn); +my $coll = get_unique_collection( $testdb, 'retry_split_batch' ); +my $server_version = server_version($conn); +my $server_type = server_type($conn); +my $feat_compat_ver = get_feature_compat_version($conn); + +plan skip_all => "standalone servers dont support retryableWrites" + if $server_type eq 'Standalone'; + +plan skip_all => "retryableWrites requires featureCompatibilityVersion 3.6 - got $feat_compat_ver" + if ( $feat_compat_ver < 3.6 ); + +subtest "ordered batch split on size" => sub { + + enable_failpoint( { mode => { skip => 1 } } ); + + { + # 10MB (ish) doc + my $big_string = "a" x ( 1024 * 1024 * 10 ); + # Done with 5 to blow past both the 16MB maxBsonObjectSize and maxMessageSizeBytes + my $ret = $coll->insert_many( [ map { { _id => $_, a => $big_string } } 0 .. 5 ] ); + + is scalar( @{ $ret->inserted } ), 6, 'successfully inserted 6 items'; + } + disable_failpoint(); + + is( $coll->count, 6, "collection count" ); + + enable_failpoint( { mode => { skip => 2 } } ); + { + my $big_string_b = "b" x ( 1024 * 1024 * 10 ); + + my $ret = $coll->bulk_write( [ + { update_one => [ { _id => 1 }, { '$set' => { a => $big_string_b } } ] }, + { update_one => [ { _id => 3 }, { '$set' => { a => $big_string_b } } ] }, + { update_one => [ { _id => 5 }, { '$set' => { a => $big_string_b } } ] }, + ] ); + + is $ret->modified_count, 3, 'successfully modified 3 items'; + } + disable_failpoint(); + + is( $coll->count, 6, "collection count" ); +}; + +sub enable_failpoint { + my $doc = shift; + $conn->send_admin_command([ + configureFailPoint => 'onPrimaryTransactionalWrite', + %$doc, + ]); +} + +sub disable_failpoint { + my $doc = shift; + $conn->send_admin_command([ + configureFailPoint => 'onPrimaryTransactionalWrite', + mode => 'off', + ]); +} + +done_testing; diff --git a/t/testrules.yml b/t/testrules.yml index 0d2bcdad..56601d49 100644 --- a/t/testrules.yml +++ b/t/testrules.yml @@ -5,4 +5,5 @@ seq: - seq: t/using_profiler.t - seq: t/changestreams.t - seq: t/examples/changestream.t + - seq: t/PERL-792-retryable-writes-spec.t - par: **