Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Make connection pool fair #6488

Closed
wants to merge 8 commits into from

4 participants

@pmahoney

This is a second attempt of #6416

It makes the connection pool "fair" with respect to waiting threads. I've done some more measurements here: http://polycrystal.org/2012/05/24/activerecord_connection_pool_fairness.html The patch is also cleaned up compared to the first attempt; the code is much more readable.

It includes some test fixes from @yahonda that this patch triggered (though the failures seem unrelated to the code)

I am still getting test failures, but I see the same failures against master: https://gist.github.com/2788538 And none of these seem related to the connection pool.

pmahoney and others added some commits
@jrochkind

Awesome. This definitely deals with some troubles i've been having.

  1. We don't need strict fairness: if two connections become
    available at the same time, it's fine of two threads that were
    waiting acquire the connections out of order.

    What keeps you from being strictly fair? Strict fairness (or close to it barring edge cases) would work out even better for me, although this will still be a huge improvement. Per our previous conversation where I think you said that if multiple threads were waiting, #signal would always wake up the one that was waiting the longest (verified in both jruby and mri?) -- what prevents strict fairness from being implemented? (The kind of fairness enforced here is still much better, and possibly good enough even for my use case, just curious if it can be improved yet further).

    • I notice even though your comments say you don't care about strict fairness -- your test actually does verify strict order with the order test, no? Are the comments outdated, strict fairness really is being guaranteed by the test?
  2. What's the @cond.broadcast needed for? What was failing without this that required this? Related to first point above? I ask because previous implementations (3-2-stable as well as master) did not use a broadcast, but it didn't seem to cause any problems -- the threads that ended up waiting indefinitely in master previously were not caused by a lack of broadcast, they were caused by the situation you fixed with @num_wait and your semi-fair guarantee, as well as code that didn't keep track of total time waiting so threads would keep loop-waiting indefinitely when other threads 'stole' connections.

@pmahoney

I think you said that if multiple threads were waiting, #signal would always wake up the one that was waiting the longest (verified in both jruby and mri?) -- what prevents strict fairness from being implemented?

Yes, that is true. By "not strict" I mean that if two connections become available at the same time, and thread1 and thread2 are waiting in line, the order in which they re-acquire the monitor is not guaranteed (but thread3 will not be able to "steal" because @num_waiting check forces it to wait).

What's the @cond.broadcast needed for?

I don't see this in the diff. There was a broadcast in the original patch, but this new one should have removed it, unless I missed one.

@jrochkind

Aha, so if two connections become available more or less at once, it's not guaranteed whether thread1 or thread2 goes first, but they are both guaranteed to get a connection ahead of thread3? If that's so, that's plenty good enough.

I don't see this in the diff. There was an @broadcast in the original patch,

here is where I see it. I see now it's actually only in the #reap implementation. I don't trust the semantics of the reap implementation already, and lack of fairness when reaping (which if it works right ought to only apply to code that violates AR's contract in the first place) is not too much of a concern.

But I think it could be replaced by counting up how many times the reaper reaped, and doing that many signals instead of a broadcast, would that be better?

@jrochkind

I'm actually still confused about the fairness.

  • If thread1, thread2, and thread3 are all waiting (yep, thread3 is already waiting too)
  • and then two connections become avail at more or less the same time
  • are both thread1 and thread2 guaranteed to get a connection before thread3 (which was also waiting?)

your order check in the test seems to guarantee this in fact is true, I think?

@pmahoney

Ah. It was removed here. The combined diff for the pull request is better: https://github.com/rails/rails/pull/6488/files

... counting up how many times the reaper reaped ...

That's what @available.add checkout_new_connection if @available.any_waiting? (in #remove which is called by #reap) is supposed to do, though I admit I have not done any testing of the reaper. The reaper attempts to remove stale connections, so I attempt to then create new ones to replace those that have been removed. But what happens if someone checks in a presumed-leaked connection that has been removed? Ugh.

@jrochkind

ugh, sorry, ignore on broadcast, I see I wasn't looking at the final version, which has no broadcast at all in the reap. okay then.

still curious about nature of guarantees, but this is a good patch regardless, I think.

I actually run into this problem in MRI not jruby -- I'll try to run your test in MRI 1.9.3 next week, cause i'm curious -- i fully expect based on my experiences, it will show similar benefit in MRI.

@jrochkind

I am suspicious of the reaper in general, personally, although @tenderlove may disagree.

But I personally don't think the reaper does anything particularly useful atm, so making sure it does what it does properly for fairness... I dunno.

The reaper right now will reap only if a connection is still checked out, was last checked out more than @dead_connection_checkout seconds ago (default 5), and has been closed by the actual rdbms (I think that's what active? checks?)

Most rdbms have a very long timeout for idleness, MySQL by default (with AR mysql2) will wait hours before closing an idle connection. Which is in fact ordinarily what you want, I think?

So I'm not sure how the reaper does anything useful -- it won't reap a 'leaked' connection, under normal conditions, for many minutes or even hours after it was leaked (typo fixed).

I may be missing something? Maybe you're expected to significantly reduce the rdbm's idle timeout to make use of the reaper?

There are of course times when a connection may be closed because of network or server problems or rdbms restart, unrelated to leaked connections. But the reaper's not meant to deal with that, i don't think, and probably isn't the right way to anyway. (There's already an automatic_reconnect key for some of AR's adapters, although it's semantics aren't entirely clear).

Anyhow, this is really a different ticket, I just mention it before you dive into making things 'right' with the reaper, and because you seem to understand this stuff well and I hadn't gotten anyone else to consider or confirm or deny my suspicions about current reaper func yet. :)

@pmahoney

I'm actually still confused about the fairness.

If thread1, thread2, and thread3 are all waiting (yep, thread3 is already waiting too)
and then two connections become avail at more or less the same time
are both thread1 and thread2 guaranteed to get a connection before thread3 (which was also waiting?)

your order check in the test seems to guarantee this in fact is true, I think?

The test_checkout_fairness_by_group is a better test of this. What happens (I think) is that a ConditionVariable does guarantee that the longest waiting thread is the first to wake up. But the first thing a thread does after being woken up is re-acquire the monitor. It's this second action that is a free-for-all. So, yes, thread1 and thread2 will get the connection ahead of thread3 in your example, because thread3 will not be woken up.

@pmahoney

I just mention it before you dive into making things 'right' with the reaper

I was planning on just ignoring that :-P

@pmahoney

@jrochkind Oh, and thanks a bunch for taking a look at this. I greatly appreciate the second set of eyes.

@jrochkind

Okay, I think i understand the fairness issue, and it seems pretty damn good. Def understand the issue where it's unpredictable which thread will get the lock first -- that's what requires the @num_waiting in the first place. And I understand how that guards against a thread that wasn't waiting at all 'stealing' a connection from one that was. (Yes, I have had this problem too).

I think I'm understanding right that your code will be pretty close to fair -- if there are multiple threads waiting, there's no way the oldest thread waiting will get continually bumped in favor of newer waiters. The issue is only when N>1 threads are checked in at very close to the same time, and even then the first N waiters will all get connections before the N+1st and subsequent waiters. That seems totally good enough.

On the reaper.... man, looking at the mysql2 adapter code specifically, I don't think the reaper will ever reap anything, i can't figure out how a connection could ever not be active? even if the rdbms has closed it for idleness -- active? is implemented adapter-specific, but in mysql2 it seems to me a connection will always be active? unless manually disconnected.

That's really a different issue and for @tenderlove to consider I guess, since he added the code. Personally, I would not ever use the reaper at all, which fortunately is easy to do by making sure reap_frequency is unset.

@jrochkind

@pmahoney thanks a lot for doing it, man! I've been struggling with this stuff for a while, and not managing to solve it, and not managing to find anyone else to review my ideas/code for AR either! (I am not a committer, in case that was not clear, def not).

Concurrency is def confusing.

@rafaelfranca

@pmahoney I think you will need to squash your commits.

@tenderlove could you review this one?

@pmahoney

Here's a mostly squashed version: #6492

@pmahoney pmahoney closed this
@pmahoney pmahoney referenced this pull request
Merged

Fair connection pool2 #6492

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on May 25, 2012
  1. @pmahoney

    Make connection pool fair with respect to waiting threads.

    pmahoney authored
    Conflicts:
    
    	activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
  2. @pmahoney
  3. @pmahoney
  4. @pmahoney
  5. @yahonda @pmahoney

    Cache metadata in advance to avoid extra sql statements while testing.

    yahonda authored pmahoney committed
    Reason: If metadata is not cached extra sql statements
    will be executed, which causes failures tests with assert_queries().
  6. @pmahoney

    Cache metadata in advance to avoid extra sql statements while testing.

    pmahoney authored
    Reason: If metadata is not cached extra sql statements
    will be executed, which causes failures tests with assert_queries().
  7. @pmahoney
  8. @pmahoney
This page is out of date. Refresh to see the latest.
View
211 activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
@@ -2,7 +2,6 @@
require 'monitor'
require 'set'
require 'active_support/core_ext/module/deprecation'
-require 'timeout'
module ActiveRecord
# Raised when a connection could not be obtained within the connection
@@ -70,6 +69,131 @@ module ConnectionAdapters
# after which the Reaper will consider a connection reapable. (default
# 5 seconds).
class ConnectionPool
+ # Threadsafe, fair, FIFO queue. Meant to be used by ConnectionPool
+ # with which it shares a Monitor. But could be a generic Queue.
+ #
+ # The Queue in stdlib's 'thread' could replace this class except
+ # stdlib's doesn't support waiting with a timeout.
+ class Queue
+ def initialize(lock = Monitor.new)
+ @lock = lock
+ @cond = @lock.new_cond
+ @num_waiting = 0
+ @queue = []
+ end
+
+ # Test if any threads are currently waiting on the queue.
+ def any_waiting?
+ synchronize do
+ @num_waiting > 0
+ end
+ end
+
+ # Return the number of threads currently waiting on this
+ # queue.
+ def num_waiting
+ synchronize do
+ @num_waiting
+ end
+ end
+
+ # Add +element+ to the queue. Never blocks.
+ def add(element)
+ synchronize do
+ @queue.push element
+ @cond.signal
+ end
+ end
+
+ # If +element+ is in the queue, remove and return it, or nil.
+ def delete(element)
+ synchronize do
+ @queue.delete(element)
+ end
+ end
+
+ # Remove all elements from the queue.
+ def clear
+ synchronize do
+ @queue.clear
+ end
+ end
+
+ # Remove the head of the queue.
+ #
+ # If +timeout+ is not given, remove and return the head the
+ # queue if the number of available elements is strictly
+ # greater than the number of threads currently waiting (that
+ # is, don't jump ahead in line). Otherwise, return nil.
+ #
+ # If +timeout+ is given, block if it there is no element
+ # available, waiting up to +timeout+ seconds for an element to
+ # become available.
+ #
+ # Raises:
+ # - ConnectionTimeoutError if +timeout+ is given and no element
+ # becomes available after +timeout+ seconds,
+ def poll(timeout = nil)
+ synchronize do
+ if timeout
+ no_wait_poll || wait_poll(timeout)
+ else
+ no_wait_poll
+ end
+ end
+ end
+
+ private
+
+ def synchronize(&block)
+ @lock.synchronize(&block)
+ end
+
+ # Test if the queue currently contains any elements.
+ def any?
+ !@queue.empty?
+ end
+
+ # A thread can remove an element from the queue without
+ # waiting if an only if the number of currently available
+ # connections is strictly greater than the number of waiting
+ # threads.
+ def can_remove_no_wait?
+ @queue.size > @num_waiting
+ end
+
+ # Removes and returns the head of the queue if possible, or nil.
+ def remove
+ @queue.shift
+ end
+
+ # Remove and return the head the queue if the number of
+ # available elements is strictly greater than the number of
+ # threads currently waiting. Otherwise, return nil.
+ def no_wait_poll
+ remove if can_remove_no_wait?
+ end
+
+ # Waits on the queue up to +timeout+ seconds, then removes and
+ # returns the head of the queue.
+ def wait_poll(timeout)
+ @num_waiting += 1
+
+ t0 = Time.now
+ elapsed = 0
+ loop do
+ @cond.wait(timeout - elapsed)
+
+ return remove if any?
+
+ elapsed = Time.now - t0
+ raise ConnectionTimeoutError if elapsed >= timeout
+ end
+ ensure
+ @num_waiting -= 1
+ end
+ end
+
# Every +frequency+ seconds, the reaper will call +reap+ on +pool+.
# A reaper instantiated with a nil frequency will never reap the
# connection pool.
@@ -100,21 +224,6 @@ def run
attr_accessor :automatic_reconnect, :checkout_timeout, :dead_connection_timeout
attr_reader :spec, :connections, :size, :reaper
- class Latch # :nodoc:
- def initialize
- @mutex = Mutex.new
- @cond = ConditionVariable.new
- end
-
- def release
- @mutex.synchronize { @cond.broadcast }
- end
-
- def await
- @mutex.synchronize { @cond.wait @mutex }
- end
- end
-
# Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification
# object which describes database connection information (e.g. adapter,
# host name, username, password, etc), as well as the maximum size for
@@ -137,9 +246,18 @@ def initialize(spec)
# default max pool size to 5
@size = (spec.config[:pool] && spec.config[:pool].to_i) || 5
- @latch = Latch.new
@connections = []
@automatic_reconnect = true
+
+ @available = Queue.new self
+ end
+
+ # Hack for tests to be able to add connections. Do not call outside of tests
+ def insert_connection_for_test!(c) #:nodoc:
+ synchronize do
+ @connections << c
+ @available.add c
+ end
end
# Retrieve the connection associated with the current thread, or call
@@ -197,6 +315,7 @@ def disconnect!
conn.disconnect!
end
@connections = []
+ @available.clear
end
end
@@ -211,6 +330,10 @@ def clear_reloadable_connections!
@connections.delete_if do |conn|
conn.requires_reloading?
end
+ @available.clear
+ @connections.each do |conn|
+ @available.add conn
+ end
end
end
@@ -234,23 +357,10 @@ def clear_stale_cached_connections! # :nodoc:
# Raises:
# - PoolFullError: no connection can be obtained from the pool.
def checkout
- loop do
- # Checkout an available connection
- synchronize do
- # Try to find a connection that hasn't been leased, and lease it
- conn = connections.find { |c| c.lease }
-
- # If all connections were leased, and we have room to expand,
- # create a new connection and lease it.
- if !conn && connections.size < size
- conn = checkout_new_connection
- conn.lease
- end
-
- return checkout_and_verify(conn) if conn
- end
-
- Timeout.timeout(@checkout_timeout, PoolFullError) { @latch.await }
+ synchronize do
+ conn = acquire_connection
+ conn.lease
+ checkout_and_verify(conn)
end
end
@@ -266,8 +376,9 @@ def checkin(conn)
end
release conn
+
+ @available.add conn
end
- @latch.release
end
# Remove a connection from the connection pool. The connection will
@@ -275,12 +386,14 @@ def checkin(conn)
def remove(conn)
synchronize do
@connections.delete conn
+ @available.delete conn
# FIXME: we might want to store the key on the connection so that removing
# from the reserved hash will be a little easier.
release conn
+
+ @available.add checkout_new_connection if @available.any_waiting?
end
- @latch.release
end
# Removes dead connections from the pool. A dead connection can occur
@@ -293,11 +406,35 @@ def reap
remove conn if conn.in_use? && stale > conn.last_use && !conn.active?
end
end
- @latch.release
end
private
+ # Acquire a connection by one of 1) immediately removing one
+ # from the queue of available connections, 2) creating a new
+ # connection if the pool is not at capacity, 3) waiting on the
+ # queue for a connection to become available.
+ #
+ # Raises:
+ # - PoolFullError if a connection could not be acquired (FIXME:
+ # why not ConnectionTimeoutError?
+ def acquire_connection
+ if conn = @available.poll
+ conn
+ elsif @connections.size < @size
+ checkout_new_connection
+ else
+ t0 = Time.now
+ begin
+ @available.poll(@checkout_timeout)
+ rescue ConnectionTimeoutError
+ msg = 'could not obtain a database connection within %0.3f seconds (waited %0.3f seconds)' %
+ [@checkout_timeout, Time.now - t0]
+ raise PoolFullError, msg
+ end
+ end
+ end
+
def release(conn)
thread_id = if @reserved_connections[current_connection_id] == conn
current_connection_id
View
7 activerecord/test/cases/associations/eager_test.rb
@@ -962,6 +962,10 @@ def test_eager_loading_with_order_on_joined_table_preloads
end
def test_eager_loading_with_conditions_on_joined_table_preloads
+ # cache metadata in advance to avoid extra sql statements executed while testing
+ Tagging.first
+ Tag.first
+
posts = assert_queries(2) do
Post.scoped(:select => 'distinct posts.*', :includes => :author, :joins => [:comments], :where => "comments.body like 'Thank you%'", :order => 'posts.id').all
end
@@ -1011,6 +1015,9 @@ def test_eager_loading_with_select_on_joined_table_preloads
def test_eager_loading_with_conditions_on_join_model_preloads
Author.columns
+
+ # cache metadata in advance to avoid extra sql statements executed while testing
+ AuthorAddress.first
authors = assert_queries(2) do
Author.scoped(:includes => :author_address, :joins => :comments, :where => "posts.title like 'Welcome%'").all
View
3  activerecord/test/cases/associations/has_many_associations_test.rb
@@ -1339,6 +1339,9 @@ def test_custom_primary_key_on_new_record_should_fetch_with_query
author = Author.new(:name => "David")
assert !author.essays.loaded?
+ # cache metadata in advance to avoid extra sql statements executed while testing
+ Essay.first
+
assert_queries 1 do
assert_equal 1, author.essays.size
end
View
2  activerecord/test/cases/connection_adapters/abstract_adapter_test.rb
@@ -36,7 +36,7 @@ def test_expire_mutates_in_use
def test_close
pool = ConnectionPool.new(ConnectionSpecification.new({}, nil))
- pool.connections << adapter
+ pool.insert_connection_for_test! adapter
adapter.pool = pool
# Make sure the pool marks the connection in use
View
104 activerecord/test/cases/connection_pool_test.rb
@@ -200,6 +200,110 @@ def test_checkout_behaviour
end.join
end
+ # The connection pool is "fair" if threads waiting for
+ # connections receive them the order in which they began
+ # waiting. This ensures that we don't timeout one HTTP request
+ # even while well under capacity in a multi-threaded environment
+ # such as a Java servlet container.
+ #
+ # We don't need strict fairness: if two connections become
+ # available at the same time, it's fine of two threads that were
+ # waiting acquire the connections out of order.
+ #
+ # Thus this test prepares waiting threads and then trickles in
+ # available connections slowly, ensuring the wakeup order is
+ # correct in this case.
+ def test_checkout_fairness
+ @pool.instance_variable_set(:@size, 10)
+ expected = (1..@pool.size).to_a.freeze
+ # check out all connections so our threads start out waiting
+ conns = expected.map { @pool.checkout }
+ mutex = Mutex.new
+ order = []
+ errors = []
+
+ threads = expected.map do |i|
+ t = Thread.new {
+ begin
+ conn = @pool.checkout # never checked back in
+ mutex.synchronize { order << i }
+ rescue => e
+ mutex.synchronize { errors << e }
+ end
+ }
+ Thread.pass until t.status == "sleep"
+ t
+ end
+
+ # this should wake up the waiting threads one by one in order
+ conns.each { |conn| @pool.checkin(conn); sleep 0.1 }
+
+ threads.each(&:join)
+
+ raise errors.first if errors.any?
+
+ assert_equal(expected, order)
+ end
+
+ # As mentioned in #test_checkout_fairness, we don't care about
+ # strict fairness. This test creates two groups of threads:
+ # group1 whose members all start waiting before any thread in
+ # group2. Enough connections are checked in to wakeup all
+ # group1 threads, and the fact that only group1 and no group2
+ # threads acquired a connection is enforced.
+ def test_checkout_fairness_by_group
+ @pool.instance_variable_set(:@size, 10)
+ # take all the connections
+ conns = (1..10).map { @pool.checkout }
+ mutex = Mutex.new
+ successes = [] # threads that successfully got a connection
+ errors = []
+
+ make_thread = proc do |i|
+ t = Thread.new {
+ begin
+ conn = @pool.checkout # never checked back in
+ mutex.synchronize { successes << i }
+ rescue => e
+ mutex.synchronize { errors << e }
+ end
+ }
+ Thread.pass until t.status == "sleep"
+ t
+ end
+
+ # all group1 threads start waiting before any in group2
+ group1 = (1..5).map(&make_thread)
+ group2 = (6..10).map(&make_thread)
+
+ # checkin n connections back to the pool
+ checkin = proc do |n|
+ n.times do
+ c = conns.pop
+ @pool.checkin(c)
+ end
+ end
+
+ checkin.call(group1.size) # should wake up all group1
+
+ loop do
+ sleep 0.1
+ break if mutex.synchronize { (successes.size + errors.size) == group1.size }
+ end
+
+ winners = mutex.synchronize { successes.dup }
+ checkin.call(group2.size) # should wake up everyone remaining
+
+ group1.each(&:join)
+ group2.each(&:join)
+
+ assert_equal((1..group1.size).to_a, winners.sort)
+
+ if errors.any?
+ raise errors.first
+ end
+ end
+
def test_automatic_reconnect=
pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec
assert pool.automatic_reconnect
Something went wrong with that request. Please try again.