diff --git a/lib/mongo/mongo_replica_set_client.rb b/lib/mongo/mongo_replica_set_client.rb old mode 100644 new mode 100755 index 371f4a764d..b589300fbb --- a/lib/mongo/mongo_replica_set_client.rb +++ b/lib/mongo/mongo_replica_set_client.rb @@ -188,8 +188,8 @@ def connect seeds = @manager.nil? ? @seeds : @manager.seeds @manager = PoolManager.new(self, seeds) - Thread.current[:managers] ||= Hash.new - Thread.current[:managers][self] = @manager + Thread.current[:mongo_pool_managers] ||= Hash.new + Thread.current[:mongo_pool_managers][self] = @manager @manager.connect @refresh_version += 1 @@ -239,7 +239,7 @@ def hard_refresh! new_manager = PoolManager.new(self, discovered_seeds | @seeds) new_manager.connect - Thread.current[:managers][self] = new_manager + Thread.current[:mongo_pool_managers][self] = new_manager # TODO: make sure that connect has succeeded @old_managers << @manager @@ -297,8 +297,8 @@ def close(opts={}) end # Clear the reference to this object. - if Thread.current[:managers] - Thread.current[:managers].delete(self) + if Thread.current[:mongo_pool_managers] + Thread.current[:mongo_pool_managers].delete(self) end @connected = false @@ -377,19 +377,19 @@ def checkin(socket) end def ensure_manager - Thread.current[:managers] ||= Hash.new + Thread.current[:mongo_pool_managers] ||= Hash.new - if Thread.current[:managers][self] != @manager - Thread.current[:managers][self] = @manager + if Thread.current[:mongo_pool_managers][self] != @manager + Thread.current[:mongo_pool_managers][self] = @manager end end def pin_pool(pool) - @manager.pinned_pools[Thread.current] = pool if @manager + Thread.current["mongo_pinned_pool_#{@manager.object_id}"] = pool if @manager end def unpin_pool(pool) - @manager.pinned_pools[Thread.current] = nil if @manager + Thread.current["mongo_pinned_pool_#{@manager.object_id}"] = nil if @manager end def get_socket_from_pool(pool) @@ -401,7 +401,7 @@ def get_socket_from_pool(pool) end def local_manager - Thread.current[:managers][self] if Thread.current[:managers] + Thread.current[:mongo_pool_managers][self] if Thread.current[:mongo_pool_managers] end def arbiters diff --git a/lib/mongo/mongo_sharded_client.rb b/lib/mongo/mongo_sharded_client.rb old mode 100644 new mode 100755 index d24fee7c42..6322e4d1d9 --- a/lib/mongo/mongo_sharded_client.rb +++ b/lib/mongo/mongo_sharded_client.rb @@ -96,8 +96,8 @@ def connect(force = !@connected) @old_managers << @manager if @manager @manager = ShardingPoolManager.new(self, discovered_seeds | @seeds) - Thread.current[:managers] ||= Hash.new - Thread.current[:managers][self] = @manager + Thread.current[:mongo_pool_managers] ||= Hash.new + Thread.current[:mongo_pool_managers][self] = @manager @manager.connect @refresh_version += 1 diff --git a/lib/mongo/util/pool.rb b/lib/mongo/util/pool.rb old mode 100644 new mode 100755 index 667cdf4a27..26530c7a6f --- a/lib/mongo/util/pool.rb +++ b/lib/mongo/util/pool.rb @@ -19,7 +19,6 @@ module Mongo class Pool PING_ATTEMPTS = 6 MAX_PING_TIME = 1_000_000 - PRUNE_INTERVAL = 10_000 attr_accessor :host, :port, @@ -58,12 +57,10 @@ def initialize(client, host, port, opts={}) @socket_ops = Hash.new { |h, k| h[k] = [] } @sockets = [] - @pids = {} @checked_out = [] @ping_time = nil @last_ping = nil @closed = false - @threads_to_sockets = {} @checkout_counter = 0 end @@ -205,9 +202,8 @@ def checkout_new_socket @client.apply_saved_authentication(:socket => socket) @sockets << socket - @pids[socket] = Process.pid @checked_out << socket - @threads_to_sockets[Thread.current] = socket + Thread.current["mongo_affiliated_socket_#{self.object_id}"] = socket socket end @@ -249,8 +245,7 @@ def checkout_existing_socket(socket=nil) socket = (@sockets - @checked_out).first end - if @pids[socket] != Process.pid - @pids[socket] = nil + if socket.pid != Process.pid @sockets.delete(socket) if socket socket.close unless socket.closed? @@ -258,19 +253,11 @@ def checkout_existing_socket(socket=nil) checkout_new_socket else @checked_out << socket - @threads_to_sockets[Thread.current] = socket + Thread.current["mongo_affiliated_socket_#{self.object_id}"] = socket socket end end - def prune_thread_socket_hash - current_threads = Set[*Thread.list] - - @threads_to_sockets.delete_if do |thread, socket| - !current_threads.include?(thread) - end - end - # Check out an existing socket or create a new socket if the maximum # pool size has not been exceeded. Otherwise, wait for the next # available socket. @@ -285,22 +272,12 @@ def checkout end @connection_mutex.synchronize do - if @checkout_counter > PRUNE_INTERVAL - @checkout_counter = 0 - prune_thread_socket_hash - else - @checkout_counter += 1 - end - - if socket_for_thread = @threads_to_sockets[Thread.current] + if socket_for_thread = Thread.current["mongo_affiliated_socket_#{self.object_id}"] if !@checked_out.include?(socket_for_thread) socket = checkout_existing_socket(socket_for_thread) end else # First checkout for this thread - thread_length = @threads_to_sockets.keys.length - if (thread_length <= @sockets.size) && (@sockets.size < @size) - socket = checkout_new_socket - elsif @checked_out.size < @sockets.size + if @checked_out.size < @sockets.size socket = checkout_existing_socket elsif @sockets.size < @size socket = checkout_new_socket @@ -318,12 +295,7 @@ def checkout if socket.closed? @checked_out.delete(socket) @sockets.delete(socket) - @threads_to_sockets.each do |k,v| - if v == socket - @threads_to_sockets.delete(k) - end - end - + Thread.current["mongo_affiliated_socket_#{self.object_id}"] = nil socket = checkout_new_socket end diff --git a/lib/mongo/util/pool_manager.rb b/lib/mongo/util/pool_manager.rb old mode 100644 new mode 100755 index 4fabcfca7a..fe5a04e96c --- a/lib/mongo/util/pool_manager.rb +++ b/lib/mongo/util/pool_manager.rb @@ -6,8 +6,6 @@ class PoolManager :secondary_pool, :secondary_pools, :hosts, :nodes, :members, :seeds, :max_bson_size - attr_accessor :pinned_pools - # Create a new set of connection pools. # # The pool manager will by default use the original seed list passed @@ -16,7 +14,6 @@ class PoolManager # time. The union of these lists will be used when attempting to connect, # with the newly-discovered nodes being used first. def initialize(client, seeds=[]) - @pinned_pools = {} @client = client @seeds = seeds @previously_connected = false @@ -104,7 +101,7 @@ def read_pool(mode=@client.read, tags=@client.tag_sets, acceptable_latency=@client.acceptable_latency) - pinned = pinned_pools[Thread.current] + pinned = Thread.current["mongo_pinned_pool_#{self.object_id}"] if pinned && pinned.matches_mode(mode) && pinned.matches_tag_sets(tags) && pinned.up? pool = pinned @@ -158,7 +155,6 @@ def initialize_data @hosts = Set.new @members = Set.new @refresh_required = false - @pinned_pools = {} end # Connect to each member of the replica set diff --git a/lib/mongo/util/ssl_socket.rb b/lib/mongo/util/ssl_socket.rb old mode 100644 new mode 100755 index ef8d18c667..f068b4088c --- a/lib/mongo/util/ssl_socket.rb +++ b/lib/mongo/util/ssl_socket.rb @@ -9,11 +9,12 @@ module Mongo # mirroring Ruby's TCPSocket, vis., TCPSocket#send and TCPSocket#read. class SSLSocket - attr_accessor :pool + attr_accessor :pool, :pid def initialize(host, port, op_timeout=nil, connect_timeout=nil) @op_timeout = op_timeout @connect_timeout = connect_timeout + @pid = Process.pid @socket = ::TCPSocket.new(host, port) @socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) diff --git a/lib/mongo/util/tcp_socket.rb b/lib/mongo/util/tcp_socket.rb old mode 100644 new mode 100755 index 476b873531..8d8d9a9021 --- a/lib/mongo/util/tcp_socket.rb +++ b/lib/mongo/util/tcp_socket.rb @@ -8,11 +8,12 @@ module Mongo # sans Timeout::timeout # class TCPSocket - attr_accessor :pool + attr_accessor :pool, :pid def initialize(host, port, op_timeout=nil, connect_timeout=nil) - @op_timeout = op_timeout + @op_timeout = op_timeout @connect_timeout = connect_timeout + @pid = Process.pid # TODO: Prefer ipv6 if server is ipv6 enabled @address = Socket.getaddrinfo(host, nil, Socket::AF_INET).first[3] diff --git a/test/functional/connection_test.rb b/test/functional/connection_test.rb old mode 100644 new mode 100755 index 260acc8404..984c426c3a --- a/test/functional/connection_test.rb +++ b/test/functional/connection_test.rb @@ -415,10 +415,7 @@ def test_connection_activity end should "show a proper exception message if an IOError is raised while closing a socket" do - fake_socket = mock('fake_socket') - fake_socket.stubs(:close).raises(IOError.new) - fake_socket.stub_everything - TCPSocket.stubs(:new).returns(fake_socket) + TCPSocket.any_instance.stubs(:close).raises(IOError.new) @con.primary_pool.checkout_new_socket @con.primary_pool.expects(:warn) diff --git a/test/tools/mongo_config.rb b/test/tools/mongo_config.rb old mode 100644 new mode 100755 index a58843a6ec..382c8d83a5 --- a/test/tools/mongo_config.rb +++ b/test/tools/mongo_config.rb @@ -357,6 +357,7 @@ def repl_set_initiate( cfg = nil ) end def repl_set_startup + states = nil 60.times do states = repl_set_get_status.zip(repl_set_is_master) healthy = states.all? do |status, is_master|