Permalink
Browse files

Merge pull request #7675 from jrochkind/fair_conn_pool_backport

backport fair connection pool 02b2335 to 3-2-stable
  • Loading branch information...
2 parents dd76b3b + 0693e07 commit 24a77743b69b2a3452848f7d78afd2692b209a03 @rafaelfranca rafaelfranca committed Sep 17, 2012
@@ -1,5 +1,10 @@
## Rails 3.2.9 (unreleased)
+* Make ActiveRecord::ConnectionPool 'fair', first thread waiting is
+ first thread given newly available connection. Backport of #6492 02b2335563
+
+ *jrochkind*
+
* Fix creation of through association models when using `collection=[]`
on a `has_many :through` association from an unsaved model.
Fix #7661.
@@ -57,10 +57,141 @@ module ConnectionAdapters
# * +wait_timeout+: number of seconds to block and wait for a connection
# before giving up and raising a timeout error (default 5 seconds).
class ConnectionPool
+
+ # Threadsafe, fair, FIFO queue. Meant to be used by ConnectionPool
+ # with which it shares a Monitor. But could be a generic Queue.
+ #
+ # The Queue in stdlib's 'thread' could replace this class except
+ # stdlib's doesn't support waiting with a timeout.
+ class Queue
+ def initialize(lock = Monitor.new)
+ @lock = lock
+ @cond = @lock.new_cond
+ @num_waiting = 0
+ @queue = []
+ end
+
+ # Test if any threads are currently waiting on the queue.
+ def any_waiting?
+ synchronize do
+ @num_waiting > 0
+ end
+ end
+
+ # Return the number of threads currently waiting on this
+ # queue.
+ def num_waiting
+ synchronize do
+ @num_waiting
+ end
+ end
+
+ # Add +element+ to the queue. Never blocks.
+ def add(element)
+ synchronize do
+ @queue.push element
+ @cond.signal
+ end
+ end
+
+ # If +element+ is in the queue, remove and return it, or nil.
+ def delete(element)
+ synchronize do
+ @queue.delete(element)
+ end
+ end
+
+ # Remove all elements from the queue.
+ def clear
+ synchronize do
+ @queue.clear
+ end
+ end
+
+ # Remove the head of the queue.
+ #
+ # If +timeout+ is not given, remove and return the head the
+ # queue if the number of available elements is strictly
+ # greater than the number of threads currently waiting (that
+ # is, don't jump ahead in line). Otherwise, return nil.
+ #
+ # If +timeout+ is given, block if it there is no element
+ # available, waiting up to +timeout+ seconds for an element to
+ # become available.
+ #
+ # Raises:
+ # - ConnectionTimeoutError if +timeout+ is given and no element
+ # becomes available after +timeout+ seconds,
+ def poll(timeout = nil)
+ synchronize do
+ if timeout
+ no_wait_poll || wait_poll(timeout)
+ else
+ no_wait_poll
+ end
+ end
+ end
+
+ private
+
+ def synchronize(&block)
+ @lock.synchronize(&block)
+ end
+
+ # Test if the queue currently contains any elements.
+ def any?
+ !@queue.empty?
+ end
+
+ # A thread can remove an element from the queue without
+ # waiting if an only if the number of currently available
+ # connections is strictly greater than the number of waiting
+ # threads.
+ def can_remove_no_wait?
+ @queue.size > @num_waiting
+ end
+
+ # Removes and returns the head of the queue if possible, or nil.
+ def remove
+ @queue.shift
+ end
+
+ # Remove and return the head the queue if the number of
+ # available elements is strictly greater than the number of
+ # threads currently waiting. Otherwise, return nil.
+ def no_wait_poll
+ remove if can_remove_no_wait?
+ end
+
+ # Waits on the queue up to +timeout+ seconds, then removes and
+ # returns the head of the queue.
+ def wait_poll(timeout)
+ @num_waiting += 1
+
+ t0 = Time.now
+ elapsed = 0
+ loop do
+ @cond.wait(timeout - elapsed)
+
+ return remove if any?
+
+ elapsed = Time.now - t0
+
+ if elapsed >= timeout
+ msg = 'could not obtain a database connection within %0.3f seconds (waited %0.3f seconds)' %
+ [timeout, elapsed]
+ raise ConnectionTimeoutError, msg
+ end
+ end
+ ensure
+ @num_waiting -= 1
+ end
+ end
+
include MonitorMixin
attr_accessor :automatic_reconnect
- attr_reader :spec, :connections
+ attr_reader :spec, :connections, :size
# Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification
# object which describes database connection information (e.g. adapter,
@@ -76,14 +207,23 @@ def initialize(spec)
# The cache of reserved connections mapped to threads
@reserved_connections = {}
- @queue = new_cond
@timeout = spec.config[:wait_timeout] || 5
# default max pool size to 5
@size = (spec.config[:pool] && spec.config[:pool].to_i) || 5
@connections = []
@automatic_reconnect = true
+
+ @available = Queue.new self
+ end
+
+ # Hack for tests to be able to add connections. Do not call outside of tests
+ def insert_connection_for_test!(c) #:nodoc:
+ synchronize do
+ @connections << c
+ @available.add c
+ end
end
# Retrieve the connection associated with the current thread, or call
@@ -139,6 +279,7 @@ def disconnect!
conn.disconnect!
end
@connections = []
+ @available.clear
end
end
@@ -150,9 +291,15 @@ def clear_reloadable_connections!
checkin conn
conn.disconnect! if conn.requires_reloading?
end
+
@connections.delete_if do |conn|
conn.requires_reloading?
end
+ @available.clear
+ @connections.each do |conn|
+ @available.add conn
+ end
+
end
end
@@ -216,58 +363,22 @@ def clear_stale_cached_connections!
# Check-out a database connection from the pool, indicating that you want
# to use it. You should call #checkin when you no longer need this.
#
- # This is done by either returning an existing connection, or by creating
- # a new connection. If the maximum number of connections for this pool has
- # already been reached, but the pool is empty (i.e. they're all being used),
- # then this method will wait until a thread has checked in a connection.
- # The wait time is bounded however: if no connection can be checked out
- # within the timeout specified for this pool, then a ConnectionTimeoutError
- # exception will be raised.
+ # This is done by either returning and leasing existing connection, or by
+ # creating a new connection and leasing it.
+ #
+ # If all connections are leased and the pool is at capacity (meaning the
+ # number of currently leased connections is greater than or equal to the
+ # size limit set), an ActiveRecord::PoolFullError exception will be raised.
#
# Returns: an AbstractAdapter object.
#
# Raises:
- # - ConnectionTimeoutError: no connection can be obtained from the pool
- # within the timeout period.
+ # - PoolFullError: no connection can be obtained from the pool.
def checkout
synchronize do
- waited_time = 0
-
- loop do
- conn = @connections.find { |c| c.lease }
-
- unless conn
- if @connections.size < @size
- conn = checkout_new_connection
- conn.lease
- end
- end
-
- if conn
- checkout_and_verify conn
- return conn
- end
-
- if waited_time >= @timeout
- raise ConnectionTimeoutError, "could not obtain a database connection#{" within #{@timeout} seconds" if @timeout} (waited #{waited_time} seconds). The max pool size is currently #{@size}; consider increasing it."
- end
-
- # Sometimes our wait can end because a connection is available,
- # but another thread can snatch it up first. If timeout hasn't
- # passed but no connection is avail, looks like that happened --
- # loop and wait again, for the time remaining on our timeout.
- before_wait = Time.now
- @queue.wait( [@timeout - waited_time, 0].max )
- waited_time += (Time.now - before_wait)
-
- # Will go away in Rails 4, when we don't clean up
- # after leaked connections automatically anymore. Right now, clean
- # up after we've returned from a 'wait' if it looks like it's
- # needed, then loop and try again.
- if(active_connections.size >= @connections.size)
- clear_stale_cached_connections!
- end
- end
+ conn = acquire_connection
+ conn.lease
+ checkout_and_verify(conn)
end
end
@@ -280,10 +391,40 @@ def checkin(conn)
synchronize do
conn.run_callbacks :checkin do
conn.expire
- @queue.signal
end
release conn
+
+ @available.add conn
+ end
+ end
+
+ # Acquire a connection by one of 1) immediately removing one
+ # from the queue of available connections, 2) creating a new
+ # connection if the pool is not at capacity, 3) waiting on the
+ # queue for a connection to become available (first calling
+ # clear_stale_cached_connections! to clean up leaked connections,
+ # this cleanup will prob be going away in Rails4).
+ #
+ # Raises:
+ # - ConnectionTimeoutError if a connection could not be acquired
+ def acquire_connection
+ if conn = @available.poll
+ conn
+ elsif @connections.size < @size
+ checkout_new_connection
+ else
+ # this conditional clear_stale will go away in Rails 4, when we don't
+ # clean up after leaked connections automatically anymore. Right now,
+ # clean up after we've returned from a 'wait' if it looks like it's
+ # needed before trying to wait for a connection.
+ synchronize do
+ if(active_connections.size >= @connections.size)
+ clear_stale_cached_connections!
+ end
+ end
+
+ @available.poll(@timeout)
end
end
@@ -36,7 +36,7 @@ def test_expire_mutates_in_use
def test_close
pool = ConnectionPool.new(Base::ConnectionSpecification.new({}, nil))
- pool.connections << adapter
+ pool.insert_connection_for_test! adapter
adapter.pool = pool
# Make sure the pool marks the connection in use
Oops, something went wrong.

0 comments on commit 24a7774

Please sign in to comment.