Skip to content

Commit

Permalink
Reap connections based on owning-thread death
Browse files Browse the repository at this point in the history
.. not a general timeout.

Now, if a thread checks out a connection then dies, we can immediately
recover that connection and re-use it.

This should alleviate the pool exhaustion discussed in rails#12867. More
importantly, it entirely avoids the potential issues of the reaper
attempting to check whether connections are still active: as long as the
owning thread is alive, the connection is its business alone.

As a no-op reap is now trivial (only entails checking a thread status
per connection), we can also perform one in-line any time we decide to
sleep for a connection.
  • Loading branch information
matthewd committed Mar 18, 2014
1 parent cc0d54b commit 9e457a8
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 68 deletions.
8 changes: 8 additions & 0 deletions activerecord/CHANGELOG.md
@@ -1,3 +1,11 @@
* Reap connections that were checked out by now-dead threads, instead
of waiting until they disconnect by themselves. Before this change,
a suitably constructed series of short-lived threads could starve
the connection pool, without ever having more than a couple alive at
the same time.

*Matthew Draper*

* `where.not` adds `references` for `includes` like normal `where` calls do.

Fixes #14406.
Expand Down
Expand Up @@ -58,13 +58,11 @@ module ConnectionAdapters
# * +checkout_timeout+: number of seconds to block and wait for a connection
# before giving up and raising a timeout error (default 5 seconds).
# * +reaping_frequency+: frequency in seconds to periodically run the
# Reaper, which attempts to find and close dead connections, which can
# occur if a programmer forgets to close a connection at the end of a
# thread or a thread dies unexpectedly. (Default nil, which means don't
# run the Reaper).
# * +dead_connection_timeout+: number of seconds from last checkout
# after which the Reaper will consider a connection reapable. (default
# 5 seconds).
# Reaper, which attempts to find and recover connections from dead
# threads, which can occur if a programmer forgets to close a
# connection at the end of a thread or a thread dies unexpectedly.
# Regardless of this setting, the Reaper will be invoked before every
# blocking wait. (Default nil, which means don't schedule the Reaper).
class ConnectionPool
# Threadsafe, fair, FIFO queue. Meant to be used by ConnectionPool
# with which it shares a Monitor. But could be a generic Queue.
Expand Down Expand Up @@ -222,7 +220,7 @@ def run

include MonitorMixin

attr_accessor :automatic_reconnect, :checkout_timeout, :dead_connection_timeout
attr_accessor :automatic_reconnect, :checkout_timeout
attr_reader :spec, :connections, :size, :reaper

# Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification
Expand All @@ -237,7 +235,6 @@ def initialize(spec)
@spec = spec

@checkout_timeout = spec.config[:checkout_timeout] || 5
@dead_connection_timeout = spec.config[:dead_connection_timeout] || 5
@reaper = Reaper.new self, spec.config[:reaping_frequency]
@reaper.run

Expand Down Expand Up @@ -361,11 +358,13 @@ def checkout
# calling +checkout+ on this pool.
def checkin(conn)
synchronize do
owner = conn.owner

conn.run_callbacks :checkin do
conn.expire
end

release conn
release conn, owner

@available.add conn
end
Expand All @@ -378,22 +377,28 @@ def remove(conn)
@connections.delete conn
@available.delete conn

# FIXME: we might want to store the key on the connection so that removing
# from the reserved hash will be a little easier.
release conn
release conn, conn.owner

@available.add checkout_new_connection if @available.any_waiting?
end
end

# Removes dead connections from the pool. A dead connection can occur
# if a programmer forgets to close a connection at the end of a thread
# Recover lost connections for the pool. A lost connection can occur if
# a programmer forgets to checkin a connection at the end of a thread
# or a thread dies unexpectedly.
def reap
synchronize do
stale = Time.now - @dead_connection_timeout
connections.dup.each do |conn|
if conn.in_use? && stale > conn.last_use && !conn.active_threadsafe?
stale_connections = synchronize do
@connections.select do |conn|
conn.in_use? && !conn.owner.alive?
end
end

stale_connections.each do |conn|
synchronize do
if conn.active?
conn.reset!
checkin conn
else
remove conn
end
end
Expand All @@ -415,20 +420,15 @@ def acquire_connection
elsif @connections.size < @size
checkout_new_connection
else
reap
@available.poll(@checkout_timeout)
end
end

def release(conn)
thread_id = if @reserved_connections[current_connection_id] == conn
current_connection_id
else
@reserved_connections.keys.find { |k|
@reserved_connections[k] == conn
}
end
def release(conn, owner)
thread_id = owner.object_id

@reserved_connections.delete thread_id if thread_id
@reserved_connections.delete thread_id
end

def new_connection
Expand Down
Expand Up @@ -71,8 +71,8 @@ class AbstractAdapter
define_callbacks :checkout, :checkin

attr_accessor :visitor, :pool
attr_reader :schema_cache, :last_use, :in_use, :logger
alias :in_use? :in_use
attr_reader :schema_cache, :owner, :logger
alias :in_use? :owner

def self.type_cast_config_to_integer(config)
if config =~ SIMPLE_INT
Expand All @@ -94,9 +94,8 @@ def initialize(connection, logger = nil, pool = nil) #:nodoc:
super()

@connection = connection
@in_use = false
@owner = nil
@instrumenter = ActiveSupport::Notifications.instrumenter
@last_use = false
@logger = logger
@pool = pool
@schema_cache = SchemaCache.new self
Expand All @@ -114,9 +113,8 @@ def schema_creation

def lease
synchronize do
unless in_use
@in_use = true
@last_use = Time.now
unless in_use?
@owner = Thread.current
end
end
end
Expand All @@ -127,7 +125,7 @@ def schema_cache=(cache)
end

def expire
@in_use = false
@owner = nil
end

def unprepared_visitor
Expand Down Expand Up @@ -262,12 +260,6 @@ def disable_referential_integrity
def active?
end

# Adapter should redefine this if it needs a threadsafe way to approximate
# if the connection is active
def active_threadsafe?
active?
end

# Disconnects from the database if already connected, and establishes a
# new connection with the database. Implementors should call super if they
# override the default implementation.
Expand Down
Expand Up @@ -603,10 +603,6 @@ def active?
false
end

def active_threadsafe?
@connection.connect_poll != PG::PGRES_POLLING_FAILED
end

# Close then reopen the connection.
def reconnect!
super
Expand Down
Expand Up @@ -29,12 +29,6 @@ def test_lease_twice
assert_not adapter.lease, 'should not lease adapter'
end

def test_last_use
assert_not adapter.last_use
adapter.lease
assert adapter.last_use
end

def test_expire_mutates_in_use
assert adapter.lease, 'lease adapter'
assert adapter.in_use?, 'adapter is in use'
Expand Down
23 changes: 13 additions & 10 deletions activerecord/test/cases/connection_pool_test.rb
Expand Up @@ -124,7 +124,6 @@ def test_reap_and_active
@pool.checkout
@pool.checkout
@pool.checkout
@pool.dead_connection_timeout = 0

connections = @pool.connections.dup

Expand All @@ -134,21 +133,25 @@ def test_reap_and_active
end

def test_reap_inactive
ready = false
@pool.checkout
@pool.checkout
@pool.checkout
@pool.dead_connection_timeout = 0

connections = @pool.connections.dup
connections.each do |conn|
conn.extend(Module.new { def active_threadsafe?; false; end; })
child = Thread.new do
@pool.checkout
@pool.checkout
ready = true
Thread.stop
end
Thread.pass until ready

assert_equal 3, active_connections(@pool).size

child.terminate
child.join
@pool.reap

assert_equal 0, @pool.connections.length
assert_equal 1, active_connections(@pool).size
ensure
connections.each(&:close)
@pool.connections.each(&:close)
end

def test_remove_connection
Expand Down
17 changes: 11 additions & 6 deletions activerecord/test/cases/reaper_test.rb
Expand Up @@ -63,17 +63,22 @@ def test_connection_pool_starts_reaper
spec.config[:reaping_frequency] = 0.0001

pool = ConnectionPool.new spec
pool.dead_connection_timeout = 0

conn = pool.checkout
count = pool.connections.length
conn = nil
child = Thread.new do
conn = pool.checkout
Thread.stop
end
Thread.pass while conn.nil?

assert conn.in_use?

conn.extend(Module.new { def active_threadsafe?; false; end; })
child.terminate

while count == pool.connections.length
while conn.in_use?
Thread.pass
end
assert_equal(count - 1, pool.connections.length)
assert !conn.in_use?
end
end
end
Expand Down

0 comments on commit 9e457a8

Please sign in to comment.