Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/mongo/error.rb
Original file line number Diff line number Diff line change
Expand Up @@ -99,5 +99,6 @@ class Error < StandardError
require 'mongo/error/socket_timeout_error'
require 'mongo/error/unchangeable_collection_option'
require 'mongo/error/unexpected_chunk_length'
require 'mongo/error/unexpected_response'
require 'mongo/error/missing_file_chunk'
require 'mongo/error/unsupported_features'
38 changes: 38 additions & 0 deletions lib/mongo/error/unexpected_response.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Copyright (C) 2014-2015 MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module Mongo
class Error

# Raised if the response read from the socket does not match the latest query.
#
# @since 2.2.6
class UnexpectedResponse < Error

# Create the new exception.
#
# @example Create the new exception.
# Mongo::Error::UnexpectedResponse.new(expected_response_to, response_to)
#
# @param [ Integer ] expected_response_to The last request id sent.
# @param [ Integer ] response_to The actual response_to of the reply.
#
# @since 2.2.6
def initialize(expected_response_to, response_to)
super("Unexpected response. Got response for request ID #{response_to} " +
"but expected response for request ID #{expected_response_to}")
end
end
end
end
13 changes: 10 additions & 3 deletions lib/mongo/protocol/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -109,17 +109,24 @@ def serialize(buffer = BSON::ByteBuffer.new, max_bson_size = nil)
# @param [ IO ] io Stream containing a message
#
# @return [ Message ] Instance of a Message class
def self.deserialize(io, max_message_size = MAX_MESSAGE_SIZE)
length = deserialize_header(BSON::ByteBuffer.new(io.read(16))).first
def self.deserialize(io, max_message_size = MAX_MESSAGE_SIZE, expected_response_to = nil)
length, request_id, response_to, op_code = deserialize_header(BSON::ByteBuffer.new(io.read(16)))

# Protection from potential DOS man-in-the-middle attacks. See
# DRIVERS-276.
if length > (max_message_size || MAX_MESSAGE_SIZE)
raise Error::MaxMessageSize.new(max_message_size)
end

# Protection against returning the response to a previous request. See
# RUBY-1117
if expected_response_to && response_to != expected_response_to
raise Error::UnexpectedResponse.new(expected_response_to, response_to)
end

buffer = BSON::ByteBuffer.new(io.read(length - 16))
message = allocate

fields.each do |field|
if field[:multi]
deserialize_array(message, buffer, field)
Expand Down Expand Up @@ -228,7 +235,7 @@ def serialize_header(buffer)
# @param io [IO] Stream containing the header.
# @return [Array<Fixnum>] Deserialized header.
def self.deserialize_header(io)
@length, @request_id, @response_to, @op_code = Header.deserialize(io)
Header.deserialize(io)
end

# A method for declaring a message field
Expand Down
13 changes: 7 additions & 6 deletions lib/mongo/server/connectable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,11 @@ def ensure_connected
ensure_same_process!
connect!
begin
yield socket
rescue Exception => e
disconnect!
raise e
result = yield socket
success = true
result
ensure
success or disconnect!
Copy link
Contributor

@estolfo estolfo Jul 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just check result here, do we really need the success variable?

I've tested with this code:

begin
  result = yield socket
ensure
  result || disconnect!
end

which works as well. Let me know what you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can only use result if it can't be nil and I don't have any certainty about that, do you?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an exception is raised when the socket is yielded, it won't return a value. If I'm not mistaken, it's never the case that result has a value and the block to which the socket is yielded throws an exception. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, that's not the issue, the issue if if the yield doesn't raise but returns nil, we would have no way to know if result is nil because there was an exception, or because it is supposed to be nil.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you're saying. That's a good point so using the success variable is safer.

end
end

Expand All @@ -102,9 +103,9 @@ def ensure_same_process!
end
end

def read
def read(request_id = nil)
ensure_connected do |socket|
Protocol::Reply.deserialize(socket, max_message_size)
Protocol::Reply.deserialize(socket, max_message_size, request_id)
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/mongo/server/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def ping

def deliver(messages)
write(messages)
messages.last.replyable? ? read : nil
messages.last.replyable? ? read(messages.last.request_id) : nil
end

def authenticate!
Expand Down
10 changes: 4 additions & 6 deletions lib/mongo/server/connection_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,10 @@ def inspect
#
# @since 2.0.0
def with_connection
begin
connection = checkout
yield(connection)
ensure
checkin(connection) if connection
end
connection = checkout
yield(connection)
ensure
checkin(connection) if connection
end

private
Expand Down
83 changes: 83 additions & 0 deletions spec/mongo/server/connection_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,89 @@
end
end

context 'when the response_to does not match the request_id' do

let(:documents) do
[{ 'name' => 'bob' }, { 'name' => 'alice' }]
end

let(:insert) do
Mongo::Protocol::Insert.new(TEST_DB, TEST_COLL, documents)
end

let(:query_bob) do
Mongo::Protocol::Query.new(TEST_DB, TEST_COLL, { 'name' => 'bob' })
end

let(:query_alice) do
Mongo::Protocol::Query.new(TEST_DB, TEST_COLL, { 'name' => 'alice' })
end

after do
authorized_collection.delete_many
end

before do
# Fake a query for which we did not read the response. See RUBY-1117
allow(query_bob).to receive(:replyable?) { false }
connection.dispatch([ insert, query_bob ])
end

it 'raises an UnexpectedResponse' do
expect {
connection.dispatch([ query_alice ])
}.to raise_error(Mongo::Error::UnexpectedResponse,
/Got response for request ID \d+ but expected response for request ID \d+/)
end

it "doesn't break subsequent requests" do
expect {
connection.dispatch([ query_alice ])
}.to raise_error(Mongo::Error::UnexpectedResponse)

expect(connection.dispatch([ query_alice ]).documents.first['name']).to eq('alice')
end
end

context 'when a request is brutaly interrupted (Thread.kill)' do

let(:documents) do
[{ 'name' => 'bob' }, { 'name' => 'alice' }]
end

let(:insert) do
Mongo::Protocol::Insert.new(TEST_DB, TEST_COLL, documents)
end

let(:query_bob) do
Mongo::Protocol::Query.new(TEST_DB, TEST_COLL, { 'name' => 'bob' })
end

let(:query_alice) do
Mongo::Protocol::Query.new(TEST_DB, TEST_COLL, { 'name' => 'alice' })
end

before do
connection.dispatch([ insert ])
end

after do
authorized_collection.delete_many
end

it "closes the socket and does not use it for subsequent requests" do
t = Thread.new {
# Kill the thread just before the reply is read
allow(Mongo::Protocol::Reply).to receive(:deserialize_header) { t.kill }
connection.dispatch([ query_bob ])
}
t.join
allow(Mongo::Protocol::Reply).to receive(:deserialize_header).and_call_original
expect(connection.dispatch([ query_alice ]).documents.first['name']).to eq('alice')
end
end


context 'when the message exceeds the max size' do

context 'when the message is an insert' do
Expand Down
2 changes: 1 addition & 1 deletion spec/mongo/server/monitor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@
described_class.new(address, listeners)
end

it 'defaults to 5' do
it 'defaults to 10' do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks = )

expect(monitor.heartbeat_frequency).to eq(10)
end
end
Expand Down