Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions lib/mongo/error/operation_failure.rb
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ def write_retryable_code?
# @since 2.6.0
def change_stream_resumable?
if @result && @result.is_a?(Mongo::Operation::GetMore::Result)
# CursorNotFound exceptions are always resumable because the server
# is not aware of the cursor id, and thus cannot determine if
# the cursor is a change stream and cannot add the
# ResumableChangeStreamError label.
return true if code == 43

# Connection description is not populated for unacknowledged writes.
if connection_description.max_wire_version >= 9
label?('ResumableChangeStreamError')
Expand Down
12 changes: 6 additions & 6 deletions spec/mongo/collection/view/change_stream_resume_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -196,17 +196,17 @@
change_stream.to_enum
end

it 'propagates cursor not found error' do
it 'resumes on a cursor not found error' do
original_cursor_id = cursor.id

client.use(:admin).command({
killCursors: collection.name,
cursors: [cursor.id]
})

lambda do
expect do
enum.next
end.should raise_error(Mongo::Error::OperationFailure, /cursor.*not found/)
end.not_to raise_error
end
end

Expand All @@ -218,17 +218,17 @@
collection.insert_one(a:2)
end

it 'propagates cursor not found error' do
it 'resumes on a cursor not found error' do
original_cursor_id = cursor.id

client.use(:admin).command({
killCursors: collection.name,
cursors: [cursor.id]
})

lambda do
expect do
change_stream.try_next
end.should raise_error(Mongo::Error::OperationFailure, /cursor.*not found/)
end.not_to raise_error
end
end
end
Expand Down
40 changes: 40 additions & 0 deletions spec/mongo/error/operation_failure_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,46 @@
end
end

context 'when the error code is 43 (CursorNotFound)' do
let(:error) { Mongo::Error::OperationFailure.new(nil, result, code: 43, code_name: 'CursorNotFound') }
let(:result) do
Mongo::Operation::GetMore::Result.new(
Mongo::Protocol::Message.new, description)
end

context 'wire protocol < 9' do
let(:description) do
Mongo::Server::Description.new('',
'minWireVersion' => 0,
'maxWireVersion' => 8,
)
end

it 'returns true' do
# CursorNotFound exceptions are resumable even if they don't have
# a ResumableChangeStreamError label because the server is not aware
# of the cursor id, and thus cannot determine if it is a change stream.
expect(error.change_stream_resumable?).to be true
end
end

context 'wire protocol >= 9' do
let(:description) do
Mongo::Server::Description.new('',
'minWireVersion' => 0,
'maxWireVersion' => 9,
)
end

it 'returns true' do
# CursorNotFound exceptions are resumable even if they don't have
# a ResumableChangeStreamError label because the server is not aware
# of the cursor id, and thus cannot determine if it is a change stream.
expect(error.change_stream_resumable?).to be true
end
end
end

context 'not a getMore response' do
let(:result) do
Mongo::Operation::Result.new(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1105,3 +1105,69 @@ tests:
fullDocument:
x:
$numberInt: "1"
-
# CursorNotFound is special-cased to be resumable regardless of server versions or error labels, so this test has
# no maxWireVersion.
description: "change stream resumes after CursorNotFound"
minServerVersion: "4.2"
failPoint:
configureFailPoint: failCommand
mode: { times: 1 }
data:
failCommands: ["getMore"]
errorCode: 43
closeConnection: false
target: collection
topology:
- replicaset
- sharded
changeStreamPipeline: []
changeStreamOptions: {}
operations:
-
database: *database_name
collection: *collection_name
name: insertOne
arguments:
document:
x: 1
expectations:
-
command_started_event:
command:
aggregate: *collection_name
cursor: {}
pipeline:
-
$changeStream: {}
command_name: aggregate
database_name: *database_name
-
command_started_event:
command:
getMore: 42
collection: *collection_name
command_name: getMore
database_name: *database_name
-
command_started_event:
command:
aggregate: *collection_name
cursor: {}
pipeline:
-
$changeStream: {}
command_name: aggregate
database_name: *database_name
result:
success:
-
_id: "42"
documentKey: "42"
operationType: insert
ns:
db: *database_name
coll: *collection_name
fullDocument:
x:
$numberInt: "1"