Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RUBY-1811 ChangeStream spec's Resumable Error definition is too broad #1375

Merged
merged 2 commits into from Jun 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 13 additions & 2 deletions lib/mongo/error/operation_failure.rb
Expand Up @@ -141,8 +141,9 @@ def write_retryable_code?
# @since 2.6.0
def change_stream_resumable?
if @result && @result.is_a?(Mongo::Operation::GetMore::Result)
change_stream_resumable_message? ||
change_stream_resumable_code?
!change_stream_not_resumable_label? &&
(change_stream_resumable_message? ||
change_stream_resumable_code?)
else
false
end
Expand All @@ -162,6 +163,16 @@ def change_stream_resumable_code?
end
private :change_stream_resumable_code?

def change_stream_not_resumable_label?
if labels
labels.include? 'NonResumableChangeStreamError'
else
false
end
end
private :change_stream_not_resumable_label?


# Create the operation failure.
#
# @example Create the error object
Expand Down
46 changes: 46 additions & 0 deletions spec/mongo/error/operation_failure_spec.rb
Expand Up @@ -148,6 +148,52 @@
end
end
end

context 'when there is a non-resumable label' do
context 'getMore response' do
let(:error) { Mongo::Error::OperationFailure.new('no message',
Mongo::Operation::GetMore::Result.new([]),
:code => 91, :code_name => 'ShutdownInProgress',
:labels => ['NonResumableChangeStreamError']) }

it 'returns false' do
expect(error.change_stream_resumable?).to eql(false)
end
end

context 'not a getMore response' do
let(:error) { Mongo::Error::OperationFailure.new('no message', nil,
:code => 91, :code_name => 'ShutdownInProgress',
:labels => ['NonResumableChangeStreamError']) }

it 'returns false' do
expect(error.change_stream_resumable?).to eql(false)
end
end
end

context 'when there is another label' do
context 'getMore response' do
let(:error) { Mongo::Error::OperationFailure.new('no message',
Mongo::Operation::GetMore::Result.new([]),
:code => 91, :code_name => 'ShutdownInProgress',
:labels => [ Mongo::Error::TRANSIENT_TRANSACTION_ERROR_LABEL ]) }

it 'returns true' do
expect(error.change_stream_resumable?).to eql(true)
end
end

context 'not a getMore response' do
let(:error) { Mongo::Error::OperationFailure.new('no message', nil,
:code => 91, :code_name => 'ShutdownInProgress',
:labels => [ Mongo::Error::TRANSIENT_TRANSACTION_ERROR_LABEL ]) }

it 'returns false' do
expect(error.change_stream_resumable?).to eql(false)
end
end
end
p-mongo marked this conversation as resolved.
Show resolved Hide resolved
end

describe '#labels' do
Expand Down
2 changes: 1 addition & 1 deletion spec/spec_tests/change_streams_spec.rb
Expand Up @@ -29,7 +29,7 @@
expect(result[:result]).to match_result(test)
end

it 'has the correct command_started events' do
it 'has the correct command_started events', if: test.expectations do
expect(result[:events]).to match_commands(test)
end
end
Expand Down
25 changes: 24 additions & 1 deletion spec/spec_tests/data/change_streams/change-streams-errors.yml
Expand Up @@ -50,4 +50,27 @@ tests:
database_name: *database_name
result:
error:
code: 40324
code: 40324
-
description: Change Stream should error when _id is projected out
minServerVersion: "4.1.11"
target: collection
topology:
- replicaset
- sharded
changeStreamPipeline:
-
$project: { _id: 0 }
changeStreamOptions: {}
operations:
-
database: *database_name
collection: *collection_name
name: insertOne
arguments:
document:
z: 3
result:
error:
code: 280
errorLabels: [ "NonResumableChangeStreamError" ]
37 changes: 27 additions & 10 deletions spec/support/change_streams.rb
Expand Up @@ -77,6 +77,11 @@ class ChangeStreamsTest
# @since 2.0.0
attr_reader :description

# Optional list of command-started events in Extended JSON format
#
# @return [ Array<Hash> ] The list of command-started events
attr_reader :expectations

def initialize(test, coll1, coll2, db1, db2)
@description = test['description']
@min_server_version = test['minServerVersion']
Expand Down Expand Up @@ -136,18 +141,30 @@ def run
op.execute(@db1, @db2)
end

changes = [].tap do |changes|
next unless @result['success']
changes = []

unless @result['success'].empty?
change_stream.take_while do |change|
changes << change
changes.length < @result['success'].length
end
end
# attempt first next call (catch NonResumableChangeStreamError errors)
begin
change = change_stream.to_enum.next
changes << change
rescue Mongo::Error::OperationFailure => e
return {
result: { 'error' => { 'code' => e.code, 'errorLabels' => e.labels} },
events: events
}
end

# continue until changeStream has received as many changes as there
# are in result.success
if @result['success'] && changes.length < @result['success'].length
change_stream.take_while do |change|
changes << change
changes.length < @result['success'].length
end
end

change_stream.close
end
change_stream.close


{
result: { 'success' => changes },
Expand Down