diff --git a/lib/mongo/error/operation_failure.rb b/lib/mongo/error/operation_failure.rb index 361538d028..674be95f14 100644 --- a/lib/mongo/error/operation_failure.rb +++ b/lib/mongo/error/operation_failure.rb @@ -166,6 +166,12 @@ def write_retryable_code? # @since 2.6.0 def change_stream_resumable? if @result && @result.is_a?(Mongo::Operation::GetMore::Result) + # CursorNotFound exceptions are always resumable because the server + # is not aware of the cursor id, and thus cannot determine if + # the cursor is a change stream and cannot add the + # ResumableChangeStreamError label. + return true if code == 43 + # Connection description is not populated for unacknowledged writes. if connection_description.max_wire_version >= 9 label?('ResumableChangeStreamError') diff --git a/spec/mongo/collection/view/change_stream_resume_spec.rb b/spec/mongo/collection/view/change_stream_resume_spec.rb index dfea6b5e78..2dd538f3a1 100644 --- a/spec/mongo/collection/view/change_stream_resume_spec.rb +++ b/spec/mongo/collection/view/change_stream_resume_spec.rb @@ -196,7 +196,7 @@ change_stream.to_enum end - it 'propagates cursor not found error' do + it 'resumes on a cursor not found error' do original_cursor_id = cursor.id client.use(:admin).command({ @@ -204,9 +204,9 @@ cursors: [cursor.id] }) - lambda do + expect do enum.next - end.should raise_error(Mongo::Error::OperationFailure, /cursor.*not found/) + end.not_to raise_error end end @@ -218,7 +218,7 @@ collection.insert_one(a:2) end - it 'propagates cursor not found error' do + it 'resumes on a cursor not found error' do original_cursor_id = cursor.id client.use(:admin).command({ @@ -226,9 +226,9 @@ cursors: [cursor.id] }) - lambda do + expect do change_stream.try_next - end.should raise_error(Mongo::Error::OperationFailure, /cursor.*not found/) + end.not_to raise_error end end end diff --git a/spec/mongo/error/operation_failure_spec.rb b/spec/mongo/error/operation_failure_spec.rb index 51dfc45730..ef2f441921 100644 --- a/spec/mongo/error/operation_failure_spec.rb +++ b/spec/mongo/error/operation_failure_spec.rb @@ -227,6 +227,46 @@ end end + context 'when the error code is 43 (CursorNotFound)' do + let(:error) { Mongo::Error::OperationFailure.new(nil, result, code: 43, code_name: 'CursorNotFound') } + let(:result) do + Mongo::Operation::GetMore::Result.new( + Mongo::Protocol::Message.new, description) + end + + context 'wire protocol < 9' do + let(:description) do + Mongo::Server::Description.new('', + 'minWireVersion' => 0, + 'maxWireVersion' => 8, + ) + end + + it 'returns true' do + # CursorNotFound exceptions are resumable even if they don't have + # a ResumableChangeStreamError label because the server is not aware + # of the cursor id, and thus cannot determine if it is a change stream. + expect(error.change_stream_resumable?).to be true + end + end + + context 'wire protocol >= 9' do + let(:description) do + Mongo::Server::Description.new('', + 'minWireVersion' => 0, + 'maxWireVersion' => 9, + ) + end + + it 'returns true' do + # CursorNotFound exceptions are resumable even if they don't have + # a ResumableChangeStreamError label because the server is not aware + # of the cursor id, and thus cannot determine if it is a change stream. + expect(error.change_stream_resumable?).to be true + end + end + end + context 'not a getMore response' do let(:result) do Mongo::Operation::Result.new( diff --git a/spec/spec_tests/data/change_streams/change-streams-resume-whitelist.yml b/spec/spec_tests/data/change_streams/change-streams-resume-whitelist.yml index 40d0ac4386..2d92f29591 100644 --- a/spec/spec_tests/data/change_streams/change-streams-resume-whitelist.yml +++ b/spec/spec_tests/data/change_streams/change-streams-resume-whitelist.yml @@ -1105,3 +1105,69 @@ tests: fullDocument: x: $numberInt: "1" + - + # CursorNotFound is special-cased to be resumable regardless of server versions or error labels, so this test has + # no maxWireVersion. + description: "change stream resumes after CursorNotFound" + minServerVersion: "4.2" + failPoint: + configureFailPoint: failCommand + mode: { times: 1 } + data: + failCommands: ["getMore"] + errorCode: 43 + closeConnection: false + target: collection + topology: + - replicaset + - sharded + changeStreamPipeline: [] + changeStreamOptions: {} + operations: + - + database: *database_name + collection: *collection_name + name: insertOne + arguments: + document: + x: 1 + expectations: + - + command_started_event: + command: + aggregate: *collection_name + cursor: {} + pipeline: + - + $changeStream: {} + command_name: aggregate + database_name: *database_name + - + command_started_event: + command: + getMore: 42 + collection: *collection_name + command_name: getMore + database_name: *database_name + - + command_started_event: + command: + aggregate: *collection_name + cursor: {} + pipeline: + - + $changeStream: {} + command_name: aggregate + database_name: *database_name + result: + success: + - + _id: "42" + documentKey: "42" + operationType: insert + ns: + db: *database_name + coll: *collection_name + fullDocument: + x: + $numberInt: "1"