Skip to content

Commit

Permalink
Revert "backport fair connection pool 02b2335 to 3-2-stable"
Browse files Browse the repository at this point in the history
This reverts commit 0693e07.

Revert "Cache columns metadata to avoid extra while testing"

This reverts commit a82f1e3.

Reason: This is causing failures in the postgresql build.
See http://travis-ci.org/#!/rails/rails/builds/2485584

Related with #7675
  • Loading branch information
rafaelfranca committed Sep 20, 2012
1 parent 50a76c1 commit e4018a0
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 311 deletions.
5 changes: 0 additions & 5 deletions activerecord/CHANGELOG.md
@@ -1,10 +1,5 @@
## Rails 3.2.9 (unreleased)

* Make ActiveRecord::ConnectionPool 'fair', first thread waiting is
first thread given newly available connection. Backport of #6492 02b2335563

*jrochkind*

* Fix creation of through association models when using `collection=[]`
on a `has_many :through` association from an unsaved model.
Fix #7661.
Expand Down
Expand Up @@ -57,141 +57,10 @@ module ConnectionAdapters
# * +wait_timeout+: number of seconds to block and wait for a connection
# before giving up and raising a timeout error (default 5 seconds).
class ConnectionPool

# Threadsafe, fair, FIFO queue. Meant to be used by ConnectionPool
# with which it shares a Monitor. But could be a generic Queue.
#
# The Queue in stdlib's 'thread' could replace this class except
# stdlib's doesn't support waiting with a timeout.
class Queue
def initialize(lock = Monitor.new)
@lock = lock
@cond = @lock.new_cond
@num_waiting = 0
@queue = []
end

# Test if any threads are currently waiting on the queue.
def any_waiting?
synchronize do
@num_waiting > 0
end
end

# Return the number of threads currently waiting on this
# queue.
def num_waiting
synchronize do
@num_waiting
end
end

# Add +element+ to the queue. Never blocks.
def add(element)
synchronize do
@queue.push element
@cond.signal
end
end

# If +element+ is in the queue, remove and return it, or nil.
def delete(element)
synchronize do
@queue.delete(element)
end
end

# Remove all elements from the queue.
def clear
synchronize do
@queue.clear
end
end

# Remove the head of the queue.
#
# If +timeout+ is not given, remove and return the head the
# queue if the number of available elements is strictly
# greater than the number of threads currently waiting (that
# is, don't jump ahead in line). Otherwise, return nil.
#
# If +timeout+ is given, block if it there is no element
# available, waiting up to +timeout+ seconds for an element to
# become available.
#
# Raises:
# - ConnectionTimeoutError if +timeout+ is given and no element
# becomes available after +timeout+ seconds,
def poll(timeout = nil)
synchronize do
if timeout
no_wait_poll || wait_poll(timeout)
else
no_wait_poll
end
end
end

private

def synchronize(&block)
@lock.synchronize(&block)
end

# Test if the queue currently contains any elements.
def any?
!@queue.empty?
end

# A thread can remove an element from the queue without
# waiting if an only if the number of currently available
# connections is strictly greater than the number of waiting
# threads.
def can_remove_no_wait?
@queue.size > @num_waiting
end

# Removes and returns the head of the queue if possible, or nil.
def remove
@queue.shift
end

# Remove and return the head the queue if the number of
# available elements is strictly greater than the number of
# threads currently waiting. Otherwise, return nil.
def no_wait_poll
remove if can_remove_no_wait?
end

# Waits on the queue up to +timeout+ seconds, then removes and
# returns the head of the queue.
def wait_poll(timeout)
@num_waiting += 1

t0 = Time.now
elapsed = 0
loop do
@cond.wait(timeout - elapsed)

return remove if any?

elapsed = Time.now - t0

if elapsed >= timeout
msg = 'could not obtain a database connection within %0.3f seconds (waited %0.3f seconds)' %
[timeout, elapsed]
raise ConnectionTimeoutError, msg
end
end
ensure
@num_waiting -= 1
end
end

include MonitorMixin

attr_accessor :automatic_reconnect
attr_reader :spec, :connections, :size
attr_reader :spec, :connections

# Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification
# object which describes database connection information (e.g. adapter,
Expand All @@ -207,23 +76,14 @@ def initialize(spec)
# The cache of reserved connections mapped to threads
@reserved_connections = {}

@queue = new_cond
@timeout = spec.config[:wait_timeout] || 5

# default max pool size to 5
@size = (spec.config[:pool] && spec.config[:pool].to_i) || 5

@connections = []
@automatic_reconnect = true

@available = Queue.new self
end

# Hack for tests to be able to add connections. Do not call outside of tests
def insert_connection_for_test!(c) #:nodoc:
synchronize do
@connections << c
@available.add c
end
end

# Retrieve the connection associated with the current thread, or call
Expand Down Expand Up @@ -279,7 +139,6 @@ def disconnect!
conn.disconnect!
end
@connections = []
@available.clear
end
end

Expand All @@ -291,15 +150,9 @@ def clear_reloadable_connections!
checkin conn
conn.disconnect! if conn.requires_reloading?
end

@connections.delete_if do |conn|
conn.requires_reloading?
end
@available.clear
@connections.each do |conn|
@available.add conn
end

end
end

Expand Down Expand Up @@ -363,22 +216,58 @@ def clear_stale_cached_connections!
# Check-out a database connection from the pool, indicating that you want
# to use it. You should call #checkin when you no longer need this.
#
# This is done by either returning and leasing existing connection, or by
# creating a new connection and leasing it.
#
# If all connections are leased and the pool is at capacity (meaning the
# number of currently leased connections is greater than or equal to the
# size limit set), an ActiveRecord::PoolFullError exception will be raised.
# This is done by either returning an existing connection, or by creating
# a new connection. If the maximum number of connections for this pool has
# already been reached, but the pool is empty (i.e. they're all being used),
# then this method will wait until a thread has checked in a connection.
# The wait time is bounded however: if no connection can be checked out
# within the timeout specified for this pool, then a ConnectionTimeoutError
# exception will be raised.
#
# Returns: an AbstractAdapter object.
#
# Raises:
# - PoolFullError: no connection can be obtained from the pool.
# - ConnectionTimeoutError: no connection can be obtained from the pool
# within the timeout period.
def checkout
synchronize do
conn = acquire_connection
conn.lease
checkout_and_verify(conn)
waited_time = 0

loop do
conn = @connections.find { |c| c.lease }

unless conn
if @connections.size < @size
conn = checkout_new_connection
conn.lease
end
end

if conn
checkout_and_verify conn
return conn
end

if waited_time >= @timeout
raise ConnectionTimeoutError, "could not obtain a database connection#{" within #{@timeout} seconds" if @timeout} (waited #{waited_time} seconds). The max pool size is currently #{@size}; consider increasing it."
end

# Sometimes our wait can end because a connection is available,
# but another thread can snatch it up first. If timeout hasn't
# passed but no connection is avail, looks like that happened --
# loop and wait again, for the time remaining on our timeout.
before_wait = Time.now
@queue.wait( [@timeout - waited_time, 0].max )
waited_time += (Time.now - before_wait)

# Will go away in Rails 4, when we don't clean up
# after leaked connections automatically anymore. Right now, clean
# up after we've returned from a 'wait' if it looks like it's
# needed, then loop and try again.
if(active_connections.size >= @connections.size)
clear_stale_cached_connections!
end
end
end
end

Expand All @@ -391,40 +280,10 @@ def checkin(conn)
synchronize do
conn.run_callbacks :checkin do
conn.expire
@queue.signal
end

release conn

@available.add conn
end
end

# Acquire a connection by one of 1) immediately removing one
# from the queue of available connections, 2) creating a new
# connection if the pool is not at capacity, 3) waiting on the
# queue for a connection to become available (first calling
# clear_stale_cached_connections! to clean up leaked connections,
# this cleanup will prob be going away in Rails4).
#
# Raises:
# - ConnectionTimeoutError if a connection could not be acquired
def acquire_connection
if conn = @available.poll
conn
elsif @connections.size < @size
checkout_new_connection
else
# this conditional clear_stale will go away in Rails 4, when we don't
# clean up after leaked connections automatically anymore. Right now,
# clean up after we've returned from a 'wait' if it looks like it's
# needed before trying to wait for a connection.
synchronize do
if(active_connections.size >= @connections.size)
clear_stale_cached_connections!
end
end

@available.poll(@timeout)
end
end

Expand Down
6 changes: 0 additions & 6 deletions activerecord/test/cases/associations/eager_test.rb
Expand Up @@ -929,10 +929,6 @@ def test_eager_loading_with_order_on_joined_table_preloads
end

def test_eager_loading_with_conditions_on_joined_table_preloads
# cache metadata in advance to avoid extra sql statements executed while testing
Tagging.first
Tag.first

posts = assert_queries(2) do
Post.find(:all, :select => 'distinct posts.*', :include => :author, :joins => [:comments], :conditions => "comments.body like 'Thank you%'", :order => 'posts.id')
end
Expand Down Expand Up @@ -981,9 +977,7 @@ def test_eager_loading_with_select_on_joined_table_preloads
end

def test_eager_loading_with_conditions_on_join_model_preloads
# cache metadata in advance to avoid extra sql statements executed while testing
Author.columns
AuthorAddress.first

authors = assert_queries(2) do
Author.find(:all, :include => :author_address, :joins => :comments, :conditions => "posts.title like 'Welcome%'")
Expand Down
Expand Up @@ -1470,8 +1470,6 @@ def test_calling_first_or_last_on_new_record_should_not_run_queries
end

def test_custom_primary_key_on_new_record_should_fetch_with_query
Essay.first # cache metadata in advance to avoid extra sql statements executed while testing

author = Author.new(:name => "David")
assert !author.essays.loaded?

Expand Down
Expand Up @@ -36,7 +36,7 @@ def test_expire_mutates_in_use

def test_close
pool = ConnectionPool.new(Base::ConnectionSpecification.new({}, nil))
pool.insert_connection_for_test! adapter
pool.connections << adapter
adapter.pool = pool

# Make sure the pool marks the connection in use
Expand Down

0 comments on commit e4018a0

Please sign in to comment.