From 62e73825b619d83943d0b8b7728d48a048571d2b Mon Sep 17 00:00:00 2001 From: Hana Pearlman Date: Tue, 4 Jun 2019 15:42:18 -0400 Subject: [PATCH 1/2] RUBY-1811 ChangeStream spec's Resumable Error definition is too broad --- lib/mongo/error/operation_failure.rb | 20 +++++++- spec/mongo/error/operation_failure_spec.rb | 46 +++++++++++++++++++ .../change_streams/change-streams-errors.yml | 25 +++++++++- spec/support/change_streams.rb | 40 +++++++++++----- 4 files changed, 116 insertions(+), 15 deletions(-) diff --git a/lib/mongo/error/operation_failure.rb b/lib/mongo/error/operation_failure.rb index 5e6b222893..f681badf33 100644 --- a/lib/mongo/error/operation_failure.rb +++ b/lib/mongo/error/operation_failure.rb @@ -124,6 +124,11 @@ def write_retryable_code? {:code_name => 'Interrupted', :code => 11601}, ].freeze + # Error label that should result in a failing getMore command + # + # @api private + CHANGE_STREAM_NOT_RESUME_LABEL = 'NonResumableChangeStreamError'.freeze + # Change stream can be resumed when these error messages are encountered. # # @since 2.6.0 @@ -141,8 +146,9 @@ def write_retryable_code? # @since 2.6.0 def change_stream_resumable? if @result && @result.is_a?(Mongo::Operation::GetMore::Result) - change_stream_resumable_message? || - change_stream_resumable_code? + change_stream_resumable_label? && + (change_stream_resumable_message? || + change_stream_resumable_code?) else false end @@ -162,6 +168,16 @@ def change_stream_resumable_code? end private :change_stream_resumable_code? + def change_stream_resumable_label? + if labels + !labels.include? CHANGE_STREAM_NOT_RESUME_LABEL + else + true + end + end + private :change_stream_resumable_label? + + # Create the operation failure. # # @example Create the error object diff --git a/spec/mongo/error/operation_failure_spec.rb b/spec/mongo/error/operation_failure_spec.rb index 1078a04274..301e00c5f2 100644 --- a/spec/mongo/error/operation_failure_spec.rb +++ b/spec/mongo/error/operation_failure_spec.rb @@ -148,6 +148,52 @@ end end end + + context 'when there is a non-resumable label' do + context 'getMore response' do + let(:error) { Mongo::Error::OperationFailure.new('no message', + Mongo::Operation::GetMore::Result.new([]), + :code => 91, :code_name => 'ShutdownInProgress', + :labels => ['NonResumableChangeStreamError']) } + + it 'returns false' do + expect(error.change_stream_resumable?).to eql(false) + end + end + + context 'not a getMore response' do + let(:error) { Mongo::Error::OperationFailure.new('no message', nil, + :code => 91, :code_name => 'ShutdownInProgress', + :labels => ['NonResumableChangeStreamError']) } + + it 'returns false' do + expect(error.change_stream_resumable?).to eql(false) + end + end + end + + context 'when there is another label' do + context 'getMore response' do + let(:error) { Mongo::Error::OperationFailure.new('no message', + Mongo::Operation::GetMore::Result.new([]), + :code => 91, :code_name => 'ShutdownInProgress', + :labels => [ Mongo::Error::TRANSIENT_TRANSACTION_ERROR_LABEL ]) } + + it 'returns true' do + expect(error.change_stream_resumable?).to eql(true) + end + end + + context 'not a getMore response' do + let(:error) { Mongo::Error::OperationFailure.new('no message', nil, + :code => 91, :code_name => 'ShutdownInProgress', + :labels => [ Mongo::Error::TRANSIENT_TRANSACTION_ERROR_LABEL ]) } + + it 'returns false' do + expect(error.change_stream_resumable?).to eql(false) + end + end + end end describe '#labels' do diff --git a/spec/spec_tests/data/change_streams/change-streams-errors.yml b/spec/spec_tests/data/change_streams/change-streams-errors.yml index 1286e86588..f50c80cd06 100644 --- a/spec/spec_tests/data/change_streams/change-streams-errors.yml +++ b/spec/spec_tests/data/change_streams/change-streams-errors.yml @@ -50,4 +50,27 @@ tests: database_name: *database_name result: error: - code: 40324 \ No newline at end of file + code: 40324 + - + description: Change Stream should error when _id is projected out + minServerVersion: "4.1.11" + target: collection + topology: + - replicaset + - sharded + changeStreamPipeline: + - + $project: { _id: 0 } + changeStreamOptions: {} + operations: + - + database: *database_name + collection: *collection_name + name: insertOne + arguments: + document: + z: 3 + result: + error: + code: 280 + errorLabels: [ "NonResumableChangeStreamError" ] diff --git a/spec/support/change_streams.rb b/spec/support/change_streams.rb index 6758ab311e..d59b5539e4 100644 --- a/spec/support/change_streams.rb +++ b/spec/support/change_streams.rb @@ -136,18 +136,30 @@ def run op.execute(@db1, @db2) end - changes = [].tap do |changes| - next unless @result['success'] + changes = [] - unless @result['success'].empty? - change_stream.take_while do |change| - changes << change - changes.length < @result['success'].length - end - end + # attempt first next call (catch NonResumableChangeStreamError errors) + begin + change = change_stream.to_enum.next + changes << change + rescue Mongo::Error::OperationFailure => e + return { + result: { 'error' => { 'code' => e.code, 'errorLabels' => e.labels} }, + events: events + } + end + + # continue until changeStream has received as many changes as there + # are in result.success + if @result['success'] && changes.length < @result['success'].length + change_stream.take_while do |change| + changes << change + changes.length < @result['success'].length + end + end - change_stream.close - end + change_stream.close + { result: { 'success' => changes }, @@ -165,8 +177,12 @@ def match_result?(result) end def match_commands?(actual) - @expectations.each_with_index.all? do |e, i| - actual[i] && match?(e, actual[i]) + if @expectations + @expectations.each_with_index.all? do |e, i| + actual[i] && match?(e, actual[i]) + end + else + true end end From 459d954279810f65675f46a3a6bb3989ddd5f0d5 Mon Sep 17 00:00:00 2001 From: Hana Pearlman Date: Wed, 5 Jun 2019 13:17:55 -0400 Subject: [PATCH 2/2] implemented feedback --- lib/mongo/error/operation_failure.rb | 15 +++++---------- spec/spec_tests/change_streams_spec.rb | 2 +- spec/support/change_streams.rb | 13 +++++++------ 3 files changed, 13 insertions(+), 17 deletions(-) diff --git a/lib/mongo/error/operation_failure.rb b/lib/mongo/error/operation_failure.rb index f681badf33..c4b5dd791a 100644 --- a/lib/mongo/error/operation_failure.rb +++ b/lib/mongo/error/operation_failure.rb @@ -124,11 +124,6 @@ def write_retryable_code? {:code_name => 'Interrupted', :code => 11601}, ].freeze - # Error label that should result in a failing getMore command - # - # @api private - CHANGE_STREAM_NOT_RESUME_LABEL = 'NonResumableChangeStreamError'.freeze - # Change stream can be resumed when these error messages are encountered. # # @since 2.6.0 @@ -146,7 +141,7 @@ def write_retryable_code? # @since 2.6.0 def change_stream_resumable? if @result && @result.is_a?(Mongo::Operation::GetMore::Result) - change_stream_resumable_label? && + !change_stream_not_resumable_label? && (change_stream_resumable_message? || change_stream_resumable_code?) else @@ -168,14 +163,14 @@ def change_stream_resumable_code? end private :change_stream_resumable_code? - def change_stream_resumable_label? + def change_stream_not_resumable_label? if labels - !labels.include? CHANGE_STREAM_NOT_RESUME_LABEL + labels.include? 'NonResumableChangeStreamError' else - true + false end end - private :change_stream_resumable_label? + private :change_stream_not_resumable_label? # Create the operation failure. diff --git a/spec/spec_tests/change_streams_spec.rb b/spec/spec_tests/change_streams_spec.rb index 2060437621..0089222a61 100644 --- a/spec/spec_tests/change_streams_spec.rb +++ b/spec/spec_tests/change_streams_spec.rb @@ -29,7 +29,7 @@ expect(result[:result]).to match_result(test) end - it 'has the correct command_started events' do + it 'has the correct command_started events', if: test.expectations do expect(result[:events]).to match_commands(test) end end diff --git a/spec/support/change_streams.rb b/spec/support/change_streams.rb index d59b5539e4..95a20d4bf0 100644 --- a/spec/support/change_streams.rb +++ b/spec/support/change_streams.rb @@ -77,6 +77,11 @@ class ChangeStreamsTest # @since 2.0.0 attr_reader :description + # Optional list of command-started events in Extended JSON format + # + # @return [ Array ] The list of command-started events + attr_reader :expectations + def initialize(test, coll1, coll2, db1, db2) @description = test['description'] @min_server_version = test['minServerVersion'] @@ -177,12 +182,8 @@ def match_result?(result) end def match_commands?(actual) - if @expectations - @expectations.each_with_index.all? do |e, i| - actual[i] && match?(e, actual[i]) - end - else - true + @expectations.each_with_index.all? do |e, i| + actual[i] && match?(e, actual[i]) end end