From c5833c9b675aa497c6566ce3c917c8d3c2482737 Mon Sep 17 00:00:00 2001 From: Vincent Boisard Date: Fri, 24 Jun 2016 18:38:18 +0200 Subject: [PATCH 1/5] fix: Multiple thread safety issue that could result in hung process and/or wrong response being returned. #RUBY-1117 --- lib/mongo/protocol/message.rb | 6 +++-- lib/mongo/protocol/reply.rb | 5 ++++ lib/mongo/server/connection.rb | 10 +++++++- spec/mongo/protocol/reply_spec.rb | 10 +++++++- spec/mongo/server/connection_spec.rb | 37 ++++++++++++++++++++++++++++ spec/mongo/server/monitor_spec.rb | 2 +- 6 files changed, 65 insertions(+), 5 deletions(-) diff --git a/lib/mongo/protocol/message.rb b/lib/mongo/protocol/message.rb index 9965f2425c..a7521bba9d 100644 --- a/lib/mongo/protocol/message.rb +++ b/lib/mongo/protocol/message.rb @@ -110,7 +110,7 @@ def serialize(buffer = BSON::ByteBuffer.new, max_bson_size = nil) # # @return [ Message ] Instance of a Message class def self.deserialize(io, max_message_size = MAX_MESSAGE_SIZE) - length = deserialize_header(BSON::ByteBuffer.new(io.read(16))).first + length, request_id, response_to, op_code = deserialize_header(BSON::ByteBuffer.new(io.read(16))) # Protection from potential DOS man-in-the-middle attacks. See # DRIVERS-276. @@ -120,6 +120,8 @@ def self.deserialize(io, max_message_size = MAX_MESSAGE_SIZE) buffer = BSON::ByteBuffer.new(io.read(length - 16)) message = allocate + message.response_to = response_to + fields.each do |field| if field[:multi] deserialize_array(message, buffer, field) @@ -228,7 +230,7 @@ def serialize_header(buffer) # @param io [IO] Stream containing the header. # @return [Array] Deserialized header. def self.deserialize_header(io) - @length, @request_id, @response_to, @op_code = Header.deserialize(io) + Header.deserialize(io) end # A method for declaring a message field diff --git a/lib/mongo/protocol/reply.rb b/lib/mongo/protocol/reply.rb index 271f075a1e..d6d3023f58 100644 --- a/lib/mongo/protocol/reply.rb +++ b/lib/mongo/protocol/reply.rb @@ -26,6 +26,11 @@ module Protocol # @api semipublic class Reply < Message + # Returns the response_to for the reply + # + # @return [Fixnum] The response_to for this reply + attr_accessor :response_to + # Determine if the reply had a query failure flag. # # @example Did the reply have a query failure. diff --git a/lib/mongo/server/connection.rb b/lib/mongo/server/connection.rb index 092f54678c..01ce4f04e0 100644 --- a/lib/mongo/server/connection.rb +++ b/lib/mongo/server/connection.rb @@ -159,7 +159,15 @@ def ping def deliver(messages) write(messages) - messages.last.replyable? ? read : nil + + if messages.last.replyable? + # Protection against returning the response to a previous request. See + # RUBY-1117 + reply = read until reply && reply.response_to == messages.last.request_id + reply + else + nil + end end def authenticate! diff --git a/spec/mongo/protocol/reply_spec.rb b/spec/mongo/protocol/reply_spec.rb index 7e19128fbb..0ddae3b631 100644 --- a/spec/mongo/protocol/reply_spec.rb +++ b/spec/mongo/protocol/reply_spec.rb @@ -4,7 +4,7 @@ let(:length) { 78 } let(:request_id) { 0 } - let(:response_to) { 0 } + let(:response_to) { 42 } let(:op_code) { 1 } let(:flags) { 0 } let(:start) { 0 } @@ -110,6 +110,14 @@ end end + describe '#response_to' do + let(:reply) { described_class.deserialize(io) } + + it 'sets it to the correct value' do + expect(reply.response_to).to eq(42) + end + end + describe 'response flags' do context 'no flags' do diff --git a/spec/mongo/server/connection_spec.rb b/spec/mongo/server/connection_spec.rb index c9cd98be75..e61e8216db 100644 --- a/spec/mongo/server/connection_spec.rb +++ b/spec/mongo/server/connection_spec.rb @@ -250,6 +250,43 @@ end end + context 'when the response_to does not match the request_id' do + + let(:documents) do + [{ 'name' => 'bob' }, { 'name' => 'alice' }] + end + + let(:insert) do + Mongo::Protocol::Insert.new(TEST_DB, TEST_COLL, documents) + end + + let(:query_bob) do + Mongo::Protocol::Query.new(TEST_DB, TEST_COLL, { 'name' => 'bob' }) + end + + let(:query_alice) do + Mongo::Protocol::Query.new(TEST_DB, TEST_COLL, { 'name' => 'alice' }) + end + + let(:reply) do + connection.dispatch([ query_alice ]) + end + + after do + authorized_collection.delete_many + end + + it 'it dispatchs the message to the socket' do + # Fake a query for which we did not read the response. See RUBY-1117 + allow(query_bob).to receive(:replyable?) { false } + connection.dispatch([ insert, query_bob ]) + + expect(reply.documents.first['name']).to eq('alice') + expect(reply.response_to).to eq(query_alice.request_id) + end + end + + context 'when the message exceeds the max size' do context 'when the message is an insert' do diff --git a/spec/mongo/server/monitor_spec.rb b/spec/mongo/server/monitor_spec.rb index d86b784b11..a4845913bf 100644 --- a/spec/mongo/server/monitor_spec.rb +++ b/spec/mongo/server/monitor_spec.rb @@ -150,7 +150,7 @@ described_class.new(address, listeners) end - it 'defaults to 5' do + it 'defaults to 10' do expect(monitor.heartbeat_frequency).to eq(10) end end From 16fd2170db8f4ba2c78e3fd0f2403a74441bfcb5 Mon Sep 17 00:00:00 2001 From: Vincent Boisard Date: Mon, 27 Jun 2016 15:05:59 +0200 Subject: [PATCH 2/5] fix: spec title more explicit --- spec/mongo/server/connection_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/mongo/server/connection_spec.rb b/spec/mongo/server/connection_spec.rb index e61e8216db..5d194142ff 100644 --- a/spec/mongo/server/connection_spec.rb +++ b/spec/mongo/server/connection_spec.rb @@ -276,7 +276,7 @@ authorized_collection.delete_many end - it 'it dispatchs the message to the socket' do + it 'skips the orphan response and reads the next one' do # Fake a query for which we did not read the response. See RUBY-1117 allow(query_bob).to receive(:replyable?) { false } connection.dispatch([ insert, query_bob ]) From 7e2e94d9967c72409ff53da4d04860b3a1493992 Mon Sep 17 00:00:00 2001 From: Adrien Jarthon Date: Thu, 30 Jun 2016 17:15:29 +0200 Subject: [PATCH 3/5] new fix for the thread kill issue --- lib/mongo/error.rb | 1 + lib/mongo/error/unexpected_response.rb | 37 ++++++++++++++++ lib/mongo/protocol/message.rb | 9 +++- lib/mongo/protocol/reply.rb | 5 --- lib/mongo/server/connectable.rb | 13 +++--- lib/mongo/server/connection.rb | 10 +---- lib/mongo/server/connection_pool.rb | 10 ++--- spec/mongo/protocol/reply_spec.rb | 10 +---- spec/mongo/server/connection_spec.rb | 59 +++++++++++++++++++++++--- 9 files changed, 110 insertions(+), 44 deletions(-) create mode 100644 lib/mongo/error/unexpected_response.rb diff --git a/lib/mongo/error.rb b/lib/mongo/error.rb index 028f506d4b..21950fb5d6 100644 --- a/lib/mongo/error.rb +++ b/lib/mongo/error.rb @@ -99,5 +99,6 @@ class Error < StandardError require 'mongo/error/socket_timeout_error' require 'mongo/error/unchangeable_collection_option' require 'mongo/error/unexpected_chunk_length' +require 'mongo/error/unexpected_response' require 'mongo/error/missing_file_chunk' require 'mongo/error/unsupported_features' diff --git a/lib/mongo/error/unexpected_response.rb b/lib/mongo/error/unexpected_response.rb new file mode 100644 index 0000000000..3dbb09ecbb --- /dev/null +++ b/lib/mongo/error/unexpected_response.rb @@ -0,0 +1,37 @@ +# Copyright (C) 2014-2015 MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Mongo + class Error + + # Raised if the response read from the socket does not match the latest query. + # + # @since 2.2.6 + class UnexpectedResponse < Error + + # Create the new exception. + # + # @example Create the new exception. + # Mongo::Error::UnexpectedResponse.new(expected_response_to, response_to) + # + # @param [ Integer ] expected_response_to The last request id sent. + # @param [ Integer ] response_to The actual response_to of the reply. + # + # @since 2.1.0 + def initialize(expected_response_to, response_to) + super("Unexpected response. Got response for request ID #{response_to} but expected response for request ID #{expected_response_to}") + end + end + end +end diff --git a/lib/mongo/protocol/message.rb b/lib/mongo/protocol/message.rb index a7521bba9d..7ca7dd1985 100644 --- a/lib/mongo/protocol/message.rb +++ b/lib/mongo/protocol/message.rb @@ -109,7 +109,7 @@ def serialize(buffer = BSON::ByteBuffer.new, max_bson_size = nil) # @param [ IO ] io Stream containing a message # # @return [ Message ] Instance of a Message class - def self.deserialize(io, max_message_size = MAX_MESSAGE_SIZE) + def self.deserialize(io, max_message_size = MAX_MESSAGE_SIZE, expected_response_to = nil) length, request_id, response_to, op_code = deserialize_header(BSON::ByteBuffer.new(io.read(16))) # Protection from potential DOS man-in-the-middle attacks. See @@ -118,9 +118,14 @@ def self.deserialize(io, max_message_size = MAX_MESSAGE_SIZE) raise Error::MaxMessageSize.new(max_message_size) end + # Protection against returning the response to a previous request. See + # RUBY-1117 + if expected_response_to && response_to != expected_response_to + raise Error::UnexpectedResponse.new(expected_response_to, response_to) + end + buffer = BSON::ByteBuffer.new(io.read(length - 16)) message = allocate - message.response_to = response_to fields.each do |field| if field[:multi] diff --git a/lib/mongo/protocol/reply.rb b/lib/mongo/protocol/reply.rb index d6d3023f58..271f075a1e 100644 --- a/lib/mongo/protocol/reply.rb +++ b/lib/mongo/protocol/reply.rb @@ -26,11 +26,6 @@ module Protocol # @api semipublic class Reply < Message - # Returns the response_to for the reply - # - # @return [Fixnum] The response_to for this reply - attr_accessor :response_to - # Determine if the reply had a query failure flag. # # @example Did the reply have a query failure. diff --git a/lib/mongo/server/connectable.rb b/lib/mongo/server/connectable.rb index 3cb05ed59c..1b19e4877d 100644 --- a/lib/mongo/server/connectable.rb +++ b/lib/mongo/server/connectable.rb @@ -88,10 +88,11 @@ def ensure_connected ensure_same_process! connect! begin - yield socket - rescue Exception => e - disconnect! - raise e + res = yield socket + success = true + res + ensure + success or disconnect! end end @@ -102,9 +103,9 @@ def ensure_same_process! end end - def read + def read(request_id = nil) ensure_connected do |socket| - Protocol::Reply.deserialize(socket, max_message_size) + Protocol::Reply.deserialize(socket, max_message_size, request_id) end end end diff --git a/lib/mongo/server/connection.rb b/lib/mongo/server/connection.rb index 01ce4f04e0..7faff876f9 100644 --- a/lib/mongo/server/connection.rb +++ b/lib/mongo/server/connection.rb @@ -159,15 +159,7 @@ def ping def deliver(messages) write(messages) - - if messages.last.replyable? - # Protection against returning the response to a previous request. See - # RUBY-1117 - reply = read until reply && reply.response_to == messages.last.request_id - reply - else - nil - end + messages.last.replyable? ? read(messages.last.request_id) : nil end def authenticate! diff --git a/lib/mongo/server/connection_pool.rb b/lib/mongo/server/connection_pool.rb index b7c0414678..994a37ca37 100644 --- a/lib/mongo/server/connection_pool.rb +++ b/lib/mongo/server/connection_pool.rb @@ -103,12 +103,10 @@ def inspect # # @since 2.0.0 def with_connection - begin - connection = checkout - yield(connection) - ensure - checkin(connection) if connection - end + connection = checkout + yield(connection) + ensure + checkin(connection) if connection end private diff --git a/spec/mongo/protocol/reply_spec.rb b/spec/mongo/protocol/reply_spec.rb index 0ddae3b631..7e19128fbb 100644 --- a/spec/mongo/protocol/reply_spec.rb +++ b/spec/mongo/protocol/reply_spec.rb @@ -4,7 +4,7 @@ let(:length) { 78 } let(:request_id) { 0 } - let(:response_to) { 42 } + let(:response_to) { 0 } let(:op_code) { 1 } let(:flags) { 0 } let(:start) { 0 } @@ -110,14 +110,6 @@ end end - describe '#response_to' do - let(:reply) { described_class.deserialize(io) } - - it 'sets it to the correct value' do - expect(reply.response_to).to eq(42) - end - end - describe 'response flags' do context 'no flags' do diff --git a/spec/mongo/server/connection_spec.rb b/spec/mongo/server/connection_spec.rb index 5d194142ff..f1ba9bc493 100644 --- a/spec/mongo/server/connection_spec.rb +++ b/spec/mongo/server/connection_spec.rb @@ -268,21 +268,66 @@ Mongo::Protocol::Query.new(TEST_DB, TEST_COLL, { 'name' => 'alice' }) end - let(:reply) do - connection.dispatch([ query_alice ]) - end - after do authorized_collection.delete_many end - it 'skips the orphan response and reads the next one' do + before do # Fake a query for which we did not read the response. See RUBY-1117 allow(query_bob).to receive(:replyable?) { false } connection.dispatch([ insert, query_bob ]) + end + + it 'raises an UnexpectedResponse' do + expect { + connection.dispatch([ query_alice ]) + }.to raise_error(Mongo::Error::UnexpectedResponse, /Unexpected response. Got response for request ID \d+ but expected response for request ID \d+/) + end + + it "doesn't break subsequent requests" do + expect { + connection.dispatch([ query_alice ]) + }.to raise_error(Mongo::Error::UnexpectedResponse) - expect(reply.documents.first['name']).to eq('alice') - expect(reply.response_to).to eq(query_alice.request_id) + expect(connection.dispatch([ query_alice ]).documents.first['name']).to eq('alice') + end + end + + context 'when a request is brutaly interrupted (Thread.kill)' do + + let(:documents) do + [{ 'name' => 'bob' }, { 'name' => 'alice' }] + end + + let(:insert) do + Mongo::Protocol::Insert.new(TEST_DB, TEST_COLL, documents) + end + + let(:query_bob) do + Mongo::Protocol::Query.new(TEST_DB, TEST_COLL, { 'name' => 'bob' }) + end + + let(:query_alice) do + Mongo::Protocol::Query.new(TEST_DB, TEST_COLL, { 'name' => 'alice' }) + end + + before do + connection.dispatch([ insert ]) + end + + after do + authorized_collection.delete_many + end + + it "closes the socket and doesn't leak into subsequent requests" do + t = Thread.new { + # Kill the thread just before the reply is read + expect(Mongo::Protocol::Reply).to receive(:deserialize) { t.kill } + connection.dispatch([ query_bob ]) + } + t.join(1) + allow(Mongo::Protocol::Reply).to receive(:deserialize).and_call_original + expect(connection.dispatch([ query_alice ]).documents.first['name']).to eq('alice') end end From eb9895d4c3065558b002af5c44c63f5f649421ad Mon Sep 17 00:00:00 2001 From: Adrien Jarthon Date: Tue, 5 Jul 2016 11:58:18 +0200 Subject: [PATCH 4/5] various cosmetic fixes + new spec robustness --- lib/mongo/error/unexpected_response.rb | 5 +++-- lib/mongo/server/connectable.rb | 4 ++-- spec/mongo/server/connection_spec.rb | 9 +++++---- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/lib/mongo/error/unexpected_response.rb b/lib/mongo/error/unexpected_response.rb index 3dbb09ecbb..fb960d5cbb 100644 --- a/lib/mongo/error/unexpected_response.rb +++ b/lib/mongo/error/unexpected_response.rb @@ -28,9 +28,10 @@ class UnexpectedResponse < Error # @param [ Integer ] expected_response_to The last request id sent. # @param [ Integer ] response_to The actual response_to of the reply. # - # @since 2.1.0 + # @since 2.2.6 def initialize(expected_response_to, response_to) - super("Unexpected response. Got response for request ID #{response_to} but expected response for request ID #{expected_response_to}") + super("Unexpected response. Got response for request ID #{response_to} " + + "but expected response for request ID #{expected_response_to}") end end end diff --git a/lib/mongo/server/connectable.rb b/lib/mongo/server/connectable.rb index 1b19e4877d..03e5be478c 100644 --- a/lib/mongo/server/connectable.rb +++ b/lib/mongo/server/connectable.rb @@ -88,9 +88,9 @@ def ensure_connected ensure_same_process! connect! begin - res = yield socket + result = yield socket success = true - res + result ensure success or disconnect! end diff --git a/spec/mongo/server/connection_spec.rb b/spec/mongo/server/connection_spec.rb index f1ba9bc493..dcdc7d0927 100644 --- a/spec/mongo/server/connection_spec.rb +++ b/spec/mongo/server/connection_spec.rb @@ -281,7 +281,8 @@ it 'raises an UnexpectedResponse' do expect { connection.dispatch([ query_alice ]) - }.to raise_error(Mongo::Error::UnexpectedResponse, /Unexpected response. Got response for request ID \d+ but expected response for request ID \d+/) + }.to raise_error(Mongo::Error::UnexpectedResponse, + /Got response for request ID \d+ but expected response for request ID \d+/) end it "doesn't break subsequent requests" do @@ -319,13 +320,13 @@ authorized_collection.delete_many end - it "closes the socket and doesn't leak into subsequent requests" do + it "closes the socket and does not use it for subsequent requests" do t = Thread.new { # Kill the thread just before the reply is read - expect(Mongo::Protocol::Reply).to receive(:deserialize) { t.kill } + allow(Mongo::Protocol::Reply).to receive(:deserialize) { t.kill } connection.dispatch([ query_bob ]) } - t.join(1) + t.join(2) allow(Mongo::Protocol::Reply).to receive(:deserialize).and_call_original expect(connection.dispatch([ query_alice ]).documents.first['name']).to eq('alice') end From fa255496f96583ac36c41434861900545682f458 Mon Sep 17 00:00:00 2001 From: Adrien Jarthon Date: Tue, 5 Jul 2016 15:25:00 +0200 Subject: [PATCH 5/5] attempt at more robust spec --- spec/mongo/server/connection_spec.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/spec/mongo/server/connection_spec.rb b/spec/mongo/server/connection_spec.rb index dcdc7d0927..8e16ba384d 100644 --- a/spec/mongo/server/connection_spec.rb +++ b/spec/mongo/server/connection_spec.rb @@ -323,11 +323,11 @@ it "closes the socket and does not use it for subsequent requests" do t = Thread.new { # Kill the thread just before the reply is read - allow(Mongo::Protocol::Reply).to receive(:deserialize) { t.kill } + allow(Mongo::Protocol::Reply).to receive(:deserialize_header) { t.kill } connection.dispatch([ query_bob ]) } - t.join(2) - allow(Mongo::Protocol::Reply).to receive(:deserialize).and_call_original + t.join + allow(Mongo::Protocol::Reply).to receive(:deserialize_header).and_call_original expect(connection.dispatch([ query_alice ]).documents.first['name']).to eq('alice') end end