diff --git a/lib/mongo/error/operation_failure.rb b/lib/mongo/error/operation_failure.rb index 5e6b222893..c4b5dd791a 100644 --- a/lib/mongo/error/operation_failure.rb +++ b/lib/mongo/error/operation_failure.rb @@ -141,8 +141,9 @@ def write_retryable_code? # @since 2.6.0 def change_stream_resumable? if @result && @result.is_a?(Mongo::Operation::GetMore::Result) - change_stream_resumable_message? || - change_stream_resumable_code? + !change_stream_not_resumable_label? && + (change_stream_resumable_message? || + change_stream_resumable_code?) else false end @@ -162,6 +163,16 @@ def change_stream_resumable_code? end private :change_stream_resumable_code? + def change_stream_not_resumable_label? + if labels + labels.include? 'NonResumableChangeStreamError' + else + false + end + end + private :change_stream_not_resumable_label? + + # Create the operation failure. # # @example Create the error object diff --git a/spec/mongo/error/operation_failure_spec.rb b/spec/mongo/error/operation_failure_spec.rb index 1078a04274..301e00c5f2 100644 --- a/spec/mongo/error/operation_failure_spec.rb +++ b/spec/mongo/error/operation_failure_spec.rb @@ -148,6 +148,52 @@ end end end + + context 'when there is a non-resumable label' do + context 'getMore response' do + let(:error) { Mongo::Error::OperationFailure.new('no message', + Mongo::Operation::GetMore::Result.new([]), + :code => 91, :code_name => 'ShutdownInProgress', + :labels => ['NonResumableChangeStreamError']) } + + it 'returns false' do + expect(error.change_stream_resumable?).to eql(false) + end + end + + context 'not a getMore response' do + let(:error) { Mongo::Error::OperationFailure.new('no message', nil, + :code => 91, :code_name => 'ShutdownInProgress', + :labels => ['NonResumableChangeStreamError']) } + + it 'returns false' do + expect(error.change_stream_resumable?).to eql(false) + end + end + end + + context 'when there is another label' do + context 'getMore response' do + let(:error) { Mongo::Error::OperationFailure.new('no message', + Mongo::Operation::GetMore::Result.new([]), + :code => 91, :code_name => 'ShutdownInProgress', + :labels => [ Mongo::Error::TRANSIENT_TRANSACTION_ERROR_LABEL ]) } + + it 'returns true' do + expect(error.change_stream_resumable?).to eql(true) + end + end + + context 'not a getMore response' do + let(:error) { Mongo::Error::OperationFailure.new('no message', nil, + :code => 91, :code_name => 'ShutdownInProgress', + :labels => [ Mongo::Error::TRANSIENT_TRANSACTION_ERROR_LABEL ]) } + + it 'returns false' do + expect(error.change_stream_resumable?).to eql(false) + end + end + end end describe '#labels' do diff --git a/spec/spec_tests/change_streams_spec.rb b/spec/spec_tests/change_streams_spec.rb index 2060437621..0089222a61 100644 --- a/spec/spec_tests/change_streams_spec.rb +++ b/spec/spec_tests/change_streams_spec.rb @@ -29,7 +29,7 @@ expect(result[:result]).to match_result(test) end - it 'has the correct command_started events' do + it 'has the correct command_started events', if: test.expectations do expect(result[:events]).to match_commands(test) end end diff --git a/spec/spec_tests/data/change_streams/change-streams-errors.yml b/spec/spec_tests/data/change_streams/change-streams-errors.yml index 1286e86588..f50c80cd06 100644 --- a/spec/spec_tests/data/change_streams/change-streams-errors.yml +++ b/spec/spec_tests/data/change_streams/change-streams-errors.yml @@ -50,4 +50,27 @@ tests: database_name: *database_name result: error: - code: 40324 \ No newline at end of file + code: 40324 + - + description: Change Stream should error when _id is projected out + minServerVersion: "4.1.11" + target: collection + topology: + - replicaset + - sharded + changeStreamPipeline: + - + $project: { _id: 0 } + changeStreamOptions: {} + operations: + - + database: *database_name + collection: *collection_name + name: insertOne + arguments: + document: + z: 3 + result: + error: + code: 280 + errorLabels: [ "NonResumableChangeStreamError" ] diff --git a/spec/support/change_streams.rb b/spec/support/change_streams.rb index 6758ab311e..95a20d4bf0 100644 --- a/spec/support/change_streams.rb +++ b/spec/support/change_streams.rb @@ -77,6 +77,11 @@ class ChangeStreamsTest # @since 2.0.0 attr_reader :description + # Optional list of command-started events in Extended JSON format + # + # @return [ Array ] The list of command-started events + attr_reader :expectations + def initialize(test, coll1, coll2, db1, db2) @description = test['description'] @min_server_version = test['minServerVersion'] @@ -136,18 +141,30 @@ def run op.execute(@db1, @db2) end - changes = [].tap do |changes| - next unless @result['success'] + changes = [] - unless @result['success'].empty? - change_stream.take_while do |change| - changes << change - changes.length < @result['success'].length - end - end + # attempt first next call (catch NonResumableChangeStreamError errors) + begin + change = change_stream.to_enum.next + changes << change + rescue Mongo::Error::OperationFailure => e + return { + result: { 'error' => { 'code' => e.code, 'errorLabels' => e.labels} }, + events: events + } + end + + # continue until changeStream has received as many changes as there + # are in result.success + if @result['success'] && changes.length < @result['success'].length + change_stream.take_while do |change| + changes << change + changes.length < @result['success'].length + end + end - change_stream.close - end + change_stream.close + { result: { 'success' => changes },