From d06674d5dfcecb6355890020ee2245b6e87b07e4 Mon Sep 17 00:00:00 2001 From: Patrick Mahoney Date: Sun, 20 May 2012 21:46:08 -0500 Subject: [PATCH] Make connection pool fair with respect to waiting threads. --- .../abstract/connection_pool.rb | 120 +++++++++++++----- .../abstract_adapter_test.rb | 2 +- .../test/cases/connection_pool_test.rb | 115 +++++++++++++++++ 3 files changed, 202 insertions(+), 35 deletions(-) diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb index c6699737b41e6..450ef69744a74 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb @@ -2,7 +2,6 @@ require 'monitor' require 'set' require 'active_support/core_ext/module/deprecation' -require 'timeout' module ActiveRecord # Raised when a connection could not be obtained within the connection @@ -92,21 +91,6 @@ def run attr_accessor :automatic_reconnect, :timeout attr_reader :spec, :connections, :size, :reaper - class Latch # :nodoc: - def initialize - @mutex = Mutex.new - @cond = ConditionVariable.new - end - - def release - @mutex.synchronize { @cond.broadcast } - end - - def await - @mutex.synchronize { @cond.wait @mutex } - end - end - # Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification # object which describes database connection information (e.g. adapter, # host name, username, password, etc), as well as the maximum size for @@ -128,9 +112,25 @@ def initialize(spec) # default max pool size to 5 @size = (spec.config[:pool] && spec.config[:pool].to_i) || 5 - @latch = Latch.new @connections = [] @automatic_reconnect = true + + # connections available to be checked out + @available = [] + + # number of threads waiting to check out a connection + @num_waiting = 0 + + # signal threads waiting + @cond = new_cond + end + + # Hack for tests to be able to add connections. Do not call outside of tests + def insert_connection_for_test!(c) + synchronize do + @connections << c + @available << c + end end # Retrieve the connection associated with the current thread, or call @@ -188,6 +188,7 @@ def disconnect! conn.disconnect! end @connections = [] + @available = [] end end @@ -202,6 +203,9 @@ def clear_reloadable_connections! @connections.delete_if do |conn| conn.requires_reloading? end + @available.delete_if do |conn| + conn.requires_reloading? + end end end @@ -225,23 +229,19 @@ def clear_stale_cached_connections! # :nodoc: # Raises: # - PoolFullError: no connection can be obtained from the pool. def checkout - loop do - # Checkout an available connection - synchronize do - # Try to find a connection that hasn't been leased, and lease it - conn = connections.find { |c| c.lease } - - # If all connections were leased, and we have room to expand, - # create a new connection and lease it. - if !conn && connections.size < size - conn = checkout_new_connection - conn.lease - end + synchronize do + conn = nil + + if @num_waiting == 0 + conn = acquire_connection + end - return checkout_and_verify(conn) if conn + unless conn + conn = wait_until(@timeout) { acquire_connection } end - Timeout.timeout(@timeout, PoolFullError) { @latch.await } + conn.lease + checkout_and_verify(conn) end end @@ -257,8 +257,10 @@ def checkin(conn) end release conn + + @available.unshift conn + @cond.signal end - @latch.release end # Remove a connection from the connection pool. The connection will @@ -266,12 +268,14 @@ def checkin(conn) def remove(conn) synchronize do @connections.delete conn + @available.delete conn # FIXME: we might want to store the key on the connection so that removing # from the reserved hash will be a little easier. release conn + + @cond.signal # can make a new connection now end - @latch.release end # Removes dead connections from the pool. A dead connection can occur @@ -283,12 +287,60 @@ def reap connections.dup.each do |conn| remove conn if conn.in_use? && stale > conn.last_use && !conn.active? end + @cond.broadcast # may violate fairness end - @latch.release end private + # Take an available connection or, if possible, create a new + # one, or nil. + # + # Monitor must be held while calling this method. + # + # Returns: a newly acquired connection. + def acquire_connection + if @available.any? + @available.pop + elsif connections.size < size + checkout_new_connection + end + end + + # Wait on +@cond+ until the block returns non-nil. Note that + # unlike MonitorMixin::ConditionVariable#wait_until, this method + # does not test the block before the first wait period. + # + # Monitor must be held when calling this method. + # + # +timeout+: Integer timeout in seconds + # + # Returns: the result of the block + # + # Raises: + # - PoolFullError: timeout elapsed before +&block+ returned a connection + def wait_until(timeout, &block) + @num_waiting += 1 + begin + t0 = Time.now + loop do + elapsed = Time.now - t0 + if elapsed >= timeout + msg = 'could not obtain a database connection within %0.3f seconds (waited %0.3f seconds)' % + [timeout, elapsed] + raise PoolFullError, msg + end + + @cond.wait(timeout - elapsed) + + conn = yield + return conn if conn + end + ensure + @num_waiting -= 1 + end + end + def release(conn) thread_id = if @reserved_connections[current_connection_id] == conn current_connection_id diff --git a/activerecord/test/cases/connection_adapters/abstract_adapter_test.rb b/activerecord/test/cases/connection_adapters/abstract_adapter_test.rb index 7dc6e8afcb3fe..3e3d6e27697e3 100644 --- a/activerecord/test/cases/connection_adapters/abstract_adapter_test.rb +++ b/activerecord/test/cases/connection_adapters/abstract_adapter_test.rb @@ -36,7 +36,7 @@ def test_expire_mutates_in_use def test_close pool = ConnectionPool.new(ConnectionSpecification.new({}, nil)) - pool.connections << adapter + pool.insert_connection_for_test! adapter adapter.pool = pool # Make sure the pool marks the connection in use diff --git a/activerecord/test/cases/connection_pool_test.rb b/activerecord/test/cases/connection_pool_test.rb index 8dc9f761c218e..6871e628aacaa 100644 --- a/activerecord/test/cases/connection_pool_test.rb +++ b/activerecord/test/cases/connection_pool_test.rb @@ -200,6 +200,121 @@ def test_checkout_behaviour end.join end + # The connection pool is "fair" if threads waiting for + # connections receive them the order in which they began + # waiting. This ensures that we don't timeout one HTTP request + # even while well under capacity in a multi-threaded environment + # such as a Java servlet container. + # + # We don't need strict fairness: if two connections become + # available at the same time, it's fine of two threads that were + # waiting acquire the connections out of order. + # + # Thus this test prepares waiting threads and then trickles in + # available connections slowly, ensuring the wakeup order is + # correct in this case. + # + # Try a few times since it might work out just by chance. + def test_checkout_fairness + 4.times { setup; do_checkout_fairness } + end + + def do_checkout_fairness + expected = (1..@pool.size).to_a.freeze + # check out all connections so our threads start out waiting + conns = expected.map { @pool.checkout } + mutex = Mutex.new + order = [] + errors = [] + + threads = expected.map do |i| + t = Thread.new { + begin + conn = @pool.checkout # never checked back in + mutex.synchronize { order << i } + rescue => e + mutex.synchronize { errors << e } + end + } + Thread.pass until t.status == "sleep" + t + end + + # this should wake up the waiting threads one by one in order + conns.each { |conn| @pool.checkin(conn); sleep 0.1 } + + threads.each(&:join) + + raise errors.first if errors.any? + + assert_equal(expected, order) + end + + # As mentioned in #test_checkout_fairness, we don't care about + # strict fairness. This test creates two groups of threads: + # group1 whose members all start waiting before any thread in + # group2. Enough connections are checked in to wakeup all + # group1 threads, and the fact that only group1 and no group2 + # threads acquired a connection is enforced. + # + # Try a few times since it might work out just by chance. + def test_checkout_fairness_by_group + 4.times { setup; do_checkout_fairness_by_group } + end + + def do_checkout_fairness_by_group + @pool.instance_variable_set(:@size, 10) + # take all the connections + conns = (1..10).map { @pool.checkout } + mutex = Mutex.new + successes = [] # threads that successfully got a connection + errors = [] + + make_thread = proc do |i| + t = Thread.new { + begin + conn = @pool.checkout # never checked back in + mutex.synchronize { successes << i } + rescue => e + mutex.synchronize { errors << e } + end + } + Thread.pass until t.status == "sleep" + t + end + + # all group1 threads start waiting before any in group2 + group1 = (1..5).map(&make_thread) + group2 = (6..10).map(&make_thread) + + # checkin n connections back to the pool + checkin = proc do |n| + n.times do + c = conns.pop + @pool.checkin(c) + end + end + + checkin.call(group1.size) # should wake up all group1 + + loop do + sleep 0.1 + break if mutex.synchronize { (successes.size + errors.size) == group1.size } + end + + winners = mutex.synchronize { successes.dup } + checkin.call(group2.size) # should wake up everyone remaining + + group1.each(&:join) + group2.each(&:join) + + assert_equal((1..group1.size).to_a, winners.sort) + + if errors.any? + raise errors.first + end + end + def test_automatic_reconnect= pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec assert pool.automatic_reconnect