Skip to content

Commit

Permalink
Make both threaded connection pools avoid disconnecting connections w…
Browse files Browse the repository at this point in the history
…hile holding the connection pool mutex

This fixes a deadlock issue in the shared_threaded connection pool when using
the connection_validator or connection_expiration extensions, and fixes
deadlock issues when using :connection_handling=>:disconnect Database option
and either extension in both threaded connection pools.

To implement this, in the sharded threaded pool, add a
@connections_to_disconnect array and store the connections
to disconnect in it, then check that in the hold method's
ensure block.  This requires an additional grab of the mutex.

In the regular threaded pool, the only time where the disconnect
occurs while holding the pool mutex is when using
:connection_handling=>:disconnect, so just move the related
code in the hold method's ensure block.

In both pools, if the maximum number of connection is reached, and we are
checking for dead threads, assume connections allocated to the dead
threads are bad.  There's no guarantee what the state of those
connections is, and returning them to the pool is probably a bad
choice.  This also allows immediately creating a new connection in
such a case, where previously that case may block until the
connection pool timeout if the pool is otherwise idle.
  • Loading branch information
jeremyevans committed May 10, 2018
1 parent a984cb7 commit 0db9e00
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 17 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG
@@ -1,5 +1,11 @@
=== master

* Disconnect connections left allocated by dead threads instead of returning the connections to the pool (jeremyevans)

* Make both threaded connection pools avoid disconnecting connections while holding the connection pool mutex (jeremyevans)

* Don't deadlock when disconnecting connections in the sharded_threaded connection pool when using connection_validator or connection_expiration extensions (jeremyevans)

* Don't modify hash argument passed in Model.nested_attributes in the nested_attributes plugin (jeremyevans)

* Avoid unnecessary hash creation in many places (jeremyevans)
Expand Down
3 changes: 2 additions & 1 deletion lib/sequel/connection_pool.rb
Expand Up @@ -109,7 +109,8 @@ def servers

private

# Remove the connection from the pool.
# Remove the connection from the pool. For threaded connections, this should be
# called without the mutex, because the disconnection may block.
def disconnect_connection(conn)
db.disconnect_connection(conn)
end
Expand Down
16 changes: 12 additions & 4 deletions lib/sequel/connection_pool/sharded_threaded.rb
Expand Up @@ -19,6 +19,7 @@ def initialize(db, opts = OPTS)
super
@available_connections = {}
@connections_to_remove = []
@connections_to_disconnect = []
@servers = opts.fetch(:servers_hash, Hash.new(:default))
remove_instance_variable(:@waiter)
@waiters = {}
Expand Down Expand Up @@ -130,6 +131,9 @@ def hold(server=:default)
raise
ensure
sync{release(t, conn, server)} if conn
while dconn = sync{@connections_to_disconnect.shift}
disconnect_connection(dconn)
end
end
end

Expand Down Expand Up @@ -201,7 +205,7 @@ def acquire(thread, server)

# :nocov:
# It's difficult to get to this point, it can only happen if there is a race condition
# where a connection cannot be acquired even after the thread is signalled by the condition
# where a connection cannot be acquired even after the thread is signalled by the condition variable
sync do
@waiters[server].wait(@mutex, timeout - elapsed)
if conn = next_available(server)
Expand All @@ -227,7 +231,11 @@ def assign_connection(thread, server)
end

if (n = _size(server)) >= (max = @max_size)
alloc.to_a.each{|t,c| release(t, c, server) unless t.alive?}
alloc.to_a.each do |t,c|
unless t.alive?
remove(t, c, server)
end
end
n = nil
end

Expand Down Expand Up @@ -339,7 +347,7 @@ def release(thread, conn, server)
conn = allocated(server).delete(thread)

if @connection_handling == :disconnect
disconnect_connection(conn)
@connections_to_disconnect << conn
else
checkin_connection(server, conn)
end
Expand All @@ -355,6 +363,6 @@ def release(thread, conn, server)
def remove(thread, conn, server)
@connections_to_remove.delete(conn)
allocated(server).delete(thread) if @servers.include?(server)
disconnect_connection(conn)
@connections_to_disconnect << conn
end
end
26 changes: 19 additions & 7 deletions lib/sequel/connection_pool/threaded.rb
Expand Up @@ -101,7 +101,12 @@ def hold(server=nil)
end
raise
ensure
sync{release(t)} if conn
if conn
sync{release(t)}
if @connection_handling == :disconnect
disconnect_connection(conn)
end
end
end
end

Expand Down Expand Up @@ -150,7 +155,7 @@ def acquire(thread)

# :nocov:
# It's difficult to get to this point, it can only happen if there is a race condition
# where a connection cannot be acquired even after the thread is signalled by the condition
# where a connection cannot be acquired even after the thread is signalled by the condition variable
sync do
@waiter.wait(@mutex, timeout - elapsed)
if conn = next_available
Expand All @@ -167,15 +172,20 @@ def acquire(thread)
# The caller should NOT have the mutex before calling this.
def assign_connection(thread)
allocated = @allocated

do_make_new = false
to_disconnect = nil

sync do
if conn = next_available
return(allocated[thread] = conn)
end

if (n = _size) >= (max = @max_size)
allocated.keys.each{|t| release(t) unless t.alive?}
allocated.keys.each do |t|
unless t.alive?
(to_disconnect ||= []) << @allocated.delete(t)
end
end
n = nil
end

Expand All @@ -184,6 +194,10 @@ def assign_connection(thread)
end
end

if to_disconnect
to_disconnect.each{|dconn| disconnect_connection(dconn)}
end

# Connect to the database outside of the connection pool mutex,
# as that can take a long time and the connection pool mutex
# shouldn't be locked while the connection takes place.
Expand Down Expand Up @@ -252,9 +266,7 @@ def raise_pool_timeout(elapsed)
def release(thread)
conn = @allocated.delete(thread)

if @connection_handling == :disconnect
disconnect_connection(conn)
else
unless @connection_handling == :disconnect
checkin_connection(conn)
end

Expand Down
22 changes: 22 additions & 0 deletions spec/core/connection_pool_spec.rb
Expand Up @@ -549,6 +549,28 @@ def @pool.assign_connection(*) nil end
@pool.size.must_equal 0
d.must_equal [2, 1, 3, 4]
end


it "should handle dead threads with checked out connections" do
pool = Sequel::ConnectionPool.get_pool(mock_db.call(&@icpp), @cp_opts.merge(:max_connections=>1))

skip = true
# Leave allocated connection to emulate dead thread with checked out connection
pool.define_singleton_method(:release){|*a| return if skip; super(*a)}
Thread.new{pool.hold{Thread.current.kill}}.join
skip = false

pool.allocated.wont_be :empty?
pool.available_connections.must_be :empty?

pool.hold{|c1| c1}
pool.allocated.must_be :empty?
pool.available_connections.wont_be :empty?

pool.disconnect
pool.allocated.must_be :empty?
pool.available_connections.must_be :empty?
end
end

describe "Threaded Unsharded Connection Pool" do
Expand Down
22 changes: 20 additions & 2 deletions spec/extensions/connection_expiration_spec.rb
Expand Up @@ -3,11 +3,12 @@
connection_expiration_specs = shared_description do
describe "connection expiration" do
before do
@db.extend(Module.new do
@m = Module.new do
def disconnect_connection(conn)
@sqls << 'disconnect'
end
end)
end
@db.extend @m
@db.extension(:connection_expiration)
@db.pool.connection_expiration_timeout = 2
end
Expand All @@ -21,6 +22,23 @@ def disconnect_connection(conn)
@db.pool.connection_expiration_timeout.must_equal 2
end

it "should handle Database#disconnect calls while the connection is checked out" do
@db.synchronize{|c| @db.disconnect}
end

it "should handle disconnected connections" do
proc{@db.synchronize{|c| raise Sequel::DatabaseDisconnectError}}.must_raise Sequel::DatabaseDisconnectError
@db.sqls.must_equal ['disconnect']
end

it "should handle :connection_handling => :disconnect setting" do
@db = Sequel.mock(@db.opts.merge(:connection_handling => :disconnect))
@db.extend @m
@db.extension(:connection_expiration)
@db.synchronize{}
@db.sqls.must_equal ['disconnect']
end

it "should only expire if older than timeout" do
c1 = @db.synchronize{|c| c}
@db.sqls.must_equal []
Expand Down
23 changes: 20 additions & 3 deletions spec/extensions/connection_validator_spec.rb
Expand Up @@ -3,7 +3,7 @@
connection_validator_specs = shared_description do
describe "connection validator" do
before do
@db.extend(Module.new do
@m = Module.new do
def disconnect_connection(conn)
@sqls << 'disconnect'
end
Expand All @@ -19,7 +19,8 @@ def connect(server)
conn.valid = true
conn
end
end)
end
@db.extend @m
@db.extension(:connection_validator)
end

Expand Down Expand Up @@ -52,6 +53,23 @@ def connect(server)
c2.wont_be_same_as(c1)
end

it "should handle Database#disconnect calls while the connection is checked out" do
@db.synchronize{|c| @db.disconnect}
end

it "should handle disconnected connections" do
proc{@db.synchronize{|c| raise Sequel::DatabaseDisconnectError}}.must_raise Sequel::DatabaseDisconnectError
@db.sqls.must_equal ['disconnect']
end

it "should handle :connection_handling => :disconnect setting" do
@db = Sequel.mock(@db.opts.merge(:connection_handling => :disconnect))
@db.extend @m
@db.extension(:connection_validator)
@db.synchronize{}
@db.sqls.must_equal ['disconnect']
end

it "should disconnect multiple connections repeatedly if they are not valid" do
q, q1 = Queue.new, Queue.new
c1 = nil
Expand Down Expand Up @@ -124,4 +142,3 @@ def @db.valid_connection?(c) synchronize{}; true end
end
include connection_validator_specs
end

0 comments on commit 0db9e00

Please sign in to comment.