Permalink
Browse files

Merge pull request #6416 from pmahoney/threadsafe-connection-pool

Make connection pool fair with respect to waiting threads.
  • Loading branch information...
2 parents 525839f + d06674d commit d2901f0fc4270a765717ad572d559dc49a56b3a8 @tenderlove tenderlove committed May 21, 2012
@@ -2,7 +2,6 @@
require 'monitor'
require 'set'
require 'active_support/core_ext/module/deprecation'
-require 'timeout'
module ActiveRecord
# Raised when a connection could not be obtained within the connection
@@ -92,21 +91,6 @@ def run
attr_accessor :automatic_reconnect, :timeout
attr_reader :spec, :connections, :size, :reaper
- class Latch # :nodoc:
- def initialize
- @mutex = Mutex.new
- @cond = ConditionVariable.new
- end
-
- def release
- @mutex.synchronize { @cond.broadcast }
- end
-
- def await
- @mutex.synchronize { @cond.wait @mutex }
- end
- end
-
# Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification
# object which describes database connection information (e.g. adapter,
# host name, username, password, etc), as well as the maximum size for
@@ -128,9 +112,25 @@ def initialize(spec)
# default max pool size to 5
@size = (spec.config[:pool] && spec.config[:pool].to_i) || 5
- @latch = Latch.new
@connections = []
@automatic_reconnect = true
+
+ # connections available to be checked out
+ @available = []
+
+ # number of threads waiting to check out a connection
+ @num_waiting = 0
+
+ # signal threads waiting
+ @cond = new_cond
+ end
+
+ # Hack for tests to be able to add connections. Do not call outside of tests
+ def insert_connection_for_test!(c)
+ synchronize do
+ @connections << c
+ @available << c
+ end
end
# Retrieve the connection associated with the current thread, or call
@@ -188,6 +188,7 @@ def disconnect!
conn.disconnect!
end
@connections = []
+ @available = []
end
end
@@ -202,6 +203,9 @@ def clear_reloadable_connections!
@connections.delete_if do |conn|
conn.requires_reloading?
end
+ @available.delete_if do |conn|
+ conn.requires_reloading?
+ end
end
end
@@ -225,23 +229,19 @@ def clear_stale_cached_connections! # :nodoc:
# Raises:
# - PoolFullError: no connection can be obtained from the pool.
def checkout
- loop do
- # Checkout an available connection
- synchronize do
- # Try to find a connection that hasn't been leased, and lease it
- conn = connections.find { |c| c.lease }
-
- # If all connections were leased, and we have room to expand,
- # create a new connection and lease it.
- if !conn && connections.size < size
- conn = checkout_new_connection
- conn.lease
- end
+ synchronize do
+ conn = nil
+
+ if @num_waiting == 0
+ conn = acquire_connection
+ end
- return checkout_and_verify(conn) if conn
+ unless conn
+ conn = wait_until(@timeout) { acquire_connection }
end
- Timeout.timeout(@timeout, PoolFullError) { @latch.await }
+ conn.lease
+ checkout_and_verify(conn)
end
end
@@ -257,21 +257,25 @@ def checkin(conn)
end
release conn
+
+ @available.unshift conn
+ @cond.signal
end
- @latch.release
end
# Remove a connection from the connection pool. The connection will
# remain open and active but will no longer be managed by this pool.
def remove(conn)
synchronize do
@connections.delete conn
+ @available.delete conn
# FIXME: we might want to store the key on the connection so that removing
# from the reserved hash will be a little easier.
release conn
+
+ @cond.signal # can make a new connection now
end
- @latch.release
end
# Removes dead connections from the pool. A dead connection can occur
@@ -283,12 +287,60 @@ def reap
connections.dup.each do |conn|
remove conn if conn.in_use? && stale > conn.last_use && !conn.active?
end
+ @cond.broadcast # may violate fairness
end
- @latch.release
end
private
+ # Take an available connection or, if possible, create a new
+ # one, or nil.
+ #
+ # Monitor must be held while calling this method.
+ #
+ # Returns: a newly acquired connection.
+ def acquire_connection
+ if @available.any?
+ @available.pop
+ elsif connections.size < size
+ checkout_new_connection
+ end
+ end
+
+ # Wait on +@cond+ until the block returns non-nil. Note that
+ # unlike MonitorMixin::ConditionVariable#wait_until, this method
+ # does not test the block before the first wait period.
+ #
+ # Monitor must be held when calling this method.
+ #
+ # +timeout+: Integer timeout in seconds
+ #
+ # Returns: the result of the block
+ #
+ # Raises:
+ # - PoolFullError: timeout elapsed before +&block+ returned a connection
+ def wait_until(timeout, &block)
+ @num_waiting += 1
+ begin
+ t0 = Time.now
+ loop do
+ elapsed = Time.now - t0
+ if elapsed >= timeout
+ msg = 'could not obtain a database connection within %0.3f seconds (waited %0.3f seconds)' %
+ [timeout, elapsed]
+ raise PoolFullError, msg
+ end
+
+ @cond.wait(timeout - elapsed)
+
+ conn = yield
+ return conn if conn
+ end
+ ensure
+ @num_waiting -= 1
+ end
+ end
+
def release(conn)
thread_id = if @reserved_connections[current_connection_id] == conn
current_connection_id
@@ -36,7 +36,7 @@ def test_expire_mutates_in_use
def test_close
pool = ConnectionPool.new(ConnectionSpecification.new({}, nil))
- pool.connections << adapter
+ pool.insert_connection_for_test! adapter
adapter.pool = pool
# Make sure the pool marks the connection in use
@@ -200,6 +200,121 @@ def test_checkout_behaviour
end.join
end
+ # The connection pool is "fair" if threads waiting for
+ # connections receive them the order in which they began
+ # waiting. This ensures that we don't timeout one HTTP request
+ # even while well under capacity in a multi-threaded environment
+ # such as a Java servlet container.
+ #
+ # We don't need strict fairness: if two connections become
+ # available at the same time, it's fine of two threads that were
+ # waiting acquire the connections out of order.
+ #
+ # Thus this test prepares waiting threads and then trickles in
+ # available connections slowly, ensuring the wakeup order is
+ # correct in this case.
+ #
+ # Try a few times since it might work out just by chance.
+ def test_checkout_fairness
+ 4.times { setup; do_checkout_fairness }
+ end
+
+ def do_checkout_fairness
+ expected = (1..@pool.size).to_a.freeze
+ # check out all connections so our threads start out waiting
+ conns = expected.map { @pool.checkout }
+ mutex = Mutex.new
+ order = []
+ errors = []
+
+ threads = expected.map do |i|
+ t = Thread.new {
+ begin
+ conn = @pool.checkout # never checked back in
+ mutex.synchronize { order << i }
+ rescue => e
+ mutex.synchronize { errors << e }
+ end
+ }
+ Thread.pass until t.status == "sleep"
+ t
+ end
+
+ # this should wake up the waiting threads one by one in order
+ conns.each { |conn| @pool.checkin(conn); sleep 0.1 }
+
+ threads.each(&:join)
+
+ raise errors.first if errors.any?
+
+ assert_equal(expected, order)
+ end
+
+ # As mentioned in #test_checkout_fairness, we don't care about
+ # strict fairness. This test creates two groups of threads:
+ # group1 whose members all start waiting before any thread in
+ # group2. Enough connections are checked in to wakeup all
+ # group1 threads, and the fact that only group1 and no group2
+ # threads acquired a connection is enforced.
+ #
+ # Try a few times since it might work out just by chance.
+ def test_checkout_fairness_by_group
+ 4.times { setup; do_checkout_fairness_by_group }
+ end
+
+ def do_checkout_fairness_by_group
+ @pool.instance_variable_set(:@size, 10)
+ # take all the connections
+ conns = (1..10).map { @pool.checkout }
+ mutex = Mutex.new
+ successes = [] # threads that successfully got a connection
+ errors = []
+
+ make_thread = proc do |i|
+ t = Thread.new {
+ begin
+ conn = @pool.checkout # never checked back in
+ mutex.synchronize { successes << i }
+ rescue => e
+ mutex.synchronize { errors << e }
+ end
+ }
+ Thread.pass until t.status == "sleep"
+ t
+ end
+
+ # all group1 threads start waiting before any in group2
+ group1 = (1..5).map(&make_thread)
+ group2 = (6..10).map(&make_thread)
+
+ # checkin n connections back to the pool
+ checkin = proc do |n|
+ n.times do
+ c = conns.pop
+ @pool.checkin(c)
+ end
+ end
+
+ checkin.call(group1.size) # should wake up all group1
+
+ loop do
+ sleep 0.1
+ break if mutex.synchronize { (successes.size + errors.size) == group1.size }
+ end
+
+ winners = mutex.synchronize { successes.dup }
+ checkin.call(group2.size) # should wake up everyone remaining
+
+ group1.each(&:join)
+ group2.each(&:join)
+
+ assert_equal((1..group1.size).to_a, winners.sort)
+
+ if errors.any?
+ raise errors.first
+ end
+ end
+
def test_automatic_reconnect=
pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec
assert pool.automatic_reconnect

0 comments on commit d2901f0

Please sign in to comment.