Skip to content

Commit

Permalink
Merge pull request #142 from cheald/thread_leaks
Browse files Browse the repository at this point in the history
Thread leak fixes
  • Loading branch information
TylerBrock committed Dec 21, 2012
2 parents 8b4c8cb + a393557 commit 8829003
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 59 deletions.
22 changes: 11 additions & 11 deletions lib/mongo/mongo_replica_set_client.rb 100644 → 100755
Expand Up @@ -188,8 +188,8 @@ def connect
seeds = @manager.nil? ? @seeds : @manager.seeds
@manager = PoolManager.new(self, seeds)

Thread.current[:managers] ||= Hash.new
Thread.current[:managers][self] = @manager
Thread.current[:mongo_pool_managers] ||= Hash.new
Thread.current[:mongo_pool_managers][self] = @manager

@manager.connect
@refresh_version += 1
Expand Down Expand Up @@ -239,7 +239,7 @@ def hard_refresh!
new_manager = PoolManager.new(self, discovered_seeds | @seeds)
new_manager.connect

Thread.current[:managers][self] = new_manager
Thread.current[:mongo_pool_managers][self] = new_manager

# TODO: make sure that connect has succeeded
@old_managers << @manager
Expand Down Expand Up @@ -297,8 +297,8 @@ def close(opts={})
end

# Clear the reference to this object.
if Thread.current[:managers]
Thread.current[:managers].delete(self)
if Thread.current[:mongo_pool_managers]
Thread.current[:mongo_pool_managers].delete(self)
end

@connected = false
Expand Down Expand Up @@ -377,19 +377,19 @@ def checkin(socket)
end

def ensure_manager
Thread.current[:managers] ||= Hash.new
Thread.current[:mongo_pool_managers] ||= Hash.new

if Thread.current[:managers][self] != @manager
Thread.current[:managers][self] = @manager
if Thread.current[:mongo_pool_managers][self] != @manager
Thread.current[:mongo_pool_managers][self] = @manager
end
end

def pin_pool(pool)
@manager.pinned_pools[Thread.current] = pool if @manager
Thread.current["mongo_pinned_pool_#{@manager.object_id}"] = pool if @manager
end

def unpin_pool(pool)
@manager.pinned_pools[Thread.current] = nil if @manager
Thread.current["mongo_pinned_pool_#{@manager.object_id}"] = nil if @manager
end

def get_socket_from_pool(pool)
Expand All @@ -401,7 +401,7 @@ def get_socket_from_pool(pool)
end

def local_manager
Thread.current[:managers][self] if Thread.current[:managers]
Thread.current[:mongo_pool_managers][self] if Thread.current[:mongo_pool_managers]
end

def arbiters
Expand Down
4 changes: 2 additions & 2 deletions lib/mongo/mongo_sharded_client.rb 100644 → 100755
Expand Up @@ -96,8 +96,8 @@ def connect(force = !@connected)
@old_managers << @manager if @manager
@manager = ShardingPoolManager.new(self, discovered_seeds | @seeds)

Thread.current[:managers] ||= Hash.new
Thread.current[:managers][self] = @manager
Thread.current[:mongo_pool_managers] ||= Hash.new
Thread.current[:mongo_pool_managers][self] = @manager

@manager.connect
@refresh_version += 1
Expand Down
40 changes: 6 additions & 34 deletions lib/mongo/util/pool.rb 100644 → 100755
Expand Up @@ -19,7 +19,6 @@ module Mongo
class Pool
PING_ATTEMPTS = 6
MAX_PING_TIME = 1_000_000
PRUNE_INTERVAL = 10_000

attr_accessor :host,
:port,
Expand Down Expand Up @@ -58,12 +57,10 @@ def initialize(client, host, port, opts={})
@socket_ops = Hash.new { |h, k| h[k] = [] }

@sockets = []
@pids = {}
@checked_out = []
@ping_time = nil
@last_ping = nil
@closed = false
@threads_to_sockets = {}
@checkout_counter = 0
end

Expand Down Expand Up @@ -205,9 +202,8 @@ def checkout_new_socket
@client.apply_saved_authentication(:socket => socket)

@sockets << socket
@pids[socket] = Process.pid
@checked_out << socket
@threads_to_sockets[Thread.current] = socket
Thread.current["mongo_affiliated_socket_#{self.object_id}"] = socket
socket
end

Expand Down Expand Up @@ -249,28 +245,19 @@ def checkout_existing_socket(socket=nil)
socket = (@sockets - @checked_out).first
end

if @pids[socket] != Process.pid
@pids[socket] = nil
if socket.pid != Process.pid
@sockets.delete(socket)
if socket
socket.close unless socket.closed?
end
checkout_new_socket
else
@checked_out << socket
@threads_to_sockets[Thread.current] = socket
Thread.current["mongo_affiliated_socket_#{self.object_id}"] = socket
socket
end
end

def prune_thread_socket_hash
current_threads = Set[*Thread.list]

@threads_to_sockets.delete_if do |thread, socket|
!current_threads.include?(thread)
end
end

# Check out an existing socket or create a new socket if the maximum
# pool size has not been exceeded. Otherwise, wait for the next
# available socket.
Expand All @@ -285,22 +272,12 @@ def checkout
end

@connection_mutex.synchronize do
if @checkout_counter > PRUNE_INTERVAL
@checkout_counter = 0
prune_thread_socket_hash
else
@checkout_counter += 1
end

if socket_for_thread = @threads_to_sockets[Thread.current]
if socket_for_thread = Thread.current["mongo_affiliated_socket_#{self.object_id}"]
if !@checked_out.include?(socket_for_thread)
socket = checkout_existing_socket(socket_for_thread)
end
else # First checkout for this thread
thread_length = @threads_to_sockets.keys.length
if (thread_length <= @sockets.size) && (@sockets.size < @size)
socket = checkout_new_socket
elsif @checked_out.size < @sockets.size
if @checked_out.size < @sockets.size
socket = checkout_existing_socket
elsif @sockets.size < @size
socket = checkout_new_socket
Expand All @@ -318,12 +295,7 @@ def checkout
if socket.closed?
@checked_out.delete(socket)
@sockets.delete(socket)
@threads_to_sockets.each do |k,v|
if v == socket
@threads_to_sockets.delete(k)
end
end

Thread.current["mongo_affiliated_socket_#{self.object_id}"] = nil
socket = checkout_new_socket
end

Expand Down
6 changes: 1 addition & 5 deletions lib/mongo/util/pool_manager.rb 100644 → 100755
Expand Up @@ -6,8 +6,6 @@ class PoolManager
:secondary_pool, :secondary_pools, :hosts, :nodes, :members, :seeds,
:max_bson_size

attr_accessor :pinned_pools

# Create a new set of connection pools.
#
# The pool manager will by default use the original seed list passed
Expand All @@ -16,7 +14,6 @@ class PoolManager
# time. The union of these lists will be used when attempting to connect,
# with the newly-discovered nodes being used first.
def initialize(client, seeds=[])
@pinned_pools = {}
@client = client
@seeds = seeds
@previously_connected = false
Expand Down Expand Up @@ -104,7 +101,7 @@ def read_pool(mode=@client.read,
tags=@client.tag_sets,
acceptable_latency=@client.acceptable_latency)

pinned = pinned_pools[Thread.current]
pinned = Thread.current["mongo_pinned_pool_#{self.object_id}"]

if pinned && pinned.matches_mode(mode) && pinned.matches_tag_sets(tags) && pinned.up?
pool = pinned
Expand Down Expand Up @@ -158,7 +155,6 @@ def initialize_data
@hosts = Set.new
@members = Set.new
@refresh_required = false
@pinned_pools = {}
end

# Connect to each member of the replica set
Expand Down
3 changes: 2 additions & 1 deletion lib/mongo/util/ssl_socket.rb 100644 → 100755
Expand Up @@ -9,11 +9,12 @@ module Mongo
# mirroring Ruby's TCPSocket, vis., TCPSocket#send and TCPSocket#read.
class SSLSocket

attr_accessor :pool
attr_accessor :pool, :pid

def initialize(host, port, op_timeout=nil, connect_timeout=nil)
@op_timeout = op_timeout
@connect_timeout = connect_timeout
@pid = Process.pid

@socket = ::TCPSocket.new(host, port)
@socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
Expand Down
5 changes: 3 additions & 2 deletions lib/mongo/util/tcp_socket.rb 100644 → 100755
Expand Up @@ -8,11 +8,12 @@ module Mongo
# sans Timeout::timeout
#
class TCPSocket
attr_accessor :pool
attr_accessor :pool, :pid

def initialize(host, port, op_timeout=nil, connect_timeout=nil)
@op_timeout = op_timeout
@op_timeout = op_timeout
@connect_timeout = connect_timeout
@pid = Process.pid

# TODO: Prefer ipv6 if server is ipv6 enabled
@address = Socket.getaddrinfo(host, nil, Socket::AF_INET).first[3]
Expand Down
5 changes: 1 addition & 4 deletions test/functional/connection_test.rb 100644 → 100755
Expand Up @@ -415,10 +415,7 @@ def test_connection_activity
end

should "show a proper exception message if an IOError is raised while closing a socket" do
fake_socket = mock('fake_socket')
fake_socket.stubs(:close).raises(IOError.new)
fake_socket.stub_everything
TCPSocket.stubs(:new).returns(fake_socket)
TCPSocket.any_instance.stubs(:close).raises(IOError.new)

@con.primary_pool.checkout_new_socket
@con.primary_pool.expects(:warn)
Expand Down
1 change: 1 addition & 0 deletions test/tools/mongo_config.rb 100644 → 100755
Expand Up @@ -357,6 +357,7 @@ def repl_set_initiate( cfg = nil )
end

def repl_set_startup
states = nil
60.times do
states = repl_set_get_status.zip(repl_set_is_master)
healthy = states.all? do |status, is_master|
Expand Down

0 comments on commit 8829003

Please sign in to comment.