Skip to content

Commit

Permalink
Automatically drop all connections after fork (#166)
Browse files Browse the repository at this point in the history
* Automatically drop all connections after fork

Fix: #165

Using connections inherited from a parent process can have
very nasty consequences and it's pretty much never desired.

This patch use the `Process._fork` API added in Ruby 3.1 to
automatically detect when a fork is happening, and when it
does, drop all existing connections.

* after_fork callback: keep instances in a weakref set

This is much faster than relying on `ObjectSpace.each_object`

Co-authored-by: Jean Boussier <jean.boussier@gmail.com>
  • Loading branch information
casperisfine and byroot committed Oct 4, 2022
1 parent a8bc713 commit 428c06f
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 3 deletions.
48 changes: 45 additions & 3 deletions lib/connection_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,47 @@ def self.wrap(options, &block)
Wrapper.new(options, &block)
end

if Process.respond_to?(:fork)
INSTANCES = ObjectSpace::WeakMap.new
private_constant :INSTANCES

def self.after_fork
INSTANCES.values.each do |pool|
# We're on after fork, so we know all other threads are dead.
# All we need to do is to ensure the main thread doesn't have a
# checked out connection
pool.checkin(force: true)

pool.reload do |connection|
# Unfortunately we don't know what method to call to close the connection,
# so we try the most common one.
connection.close if connection.respond_to?(:close)
end
end
nil
end

if ::Process.respond_to?(:_fork) # MRI 3.1+
module ForkTracker
def _fork
pid = super
if pid == 0
ConnectionPool.after_fork
end
pid
end
end
Process.singleton_class.prepend(ForkTracker)
end
else
INSTANCES = nil
private_constant :INSTANCES

def self.after_fork
# noop
end
end

def initialize(options = {}, &block)
raise ArgumentError, "Connection pool requires a block" unless block

Expand All @@ -55,6 +96,7 @@ def initialize(options = {}, &block)
@available = TimedStack.new(@size, &block)
@key = :"pool-#{@available.object_id}"
@key_count = :"pool-#{@available.object_id}-count"
INSTANCES[self] = self if INSTANCES
end

def with(options = {})
Expand All @@ -81,16 +123,16 @@ def checkout(options = {})
end
end

def checkin
def checkin(force: false)
if ::Thread.current[@key]
if ::Thread.current[@key_count] == 1
if ::Thread.current[@key_count] == 1 || force
@available.push(::Thread.current[@key])
::Thread.current[@key] = nil
::Thread.current[@key_count] = nil
else
::Thread.current[@key_count] -= 1
end
else
elsif !force
raise ConnectionPool::Error, "no connections are checked out"
end

Expand Down
38 changes: 38 additions & 0 deletions test/test_connection_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -547,4 +547,42 @@ def test_stats_with_string_size
assert_equal(1, pool.available)
end
end

def test_after_fork_callback
skip("MRI feature") unless Process.respond_to?(:fork)
GC.start # cleanup instances created by other tests

pool = ConnectionPool.new(size: 2) { NetworkConnection.new }
prefork_connection = pool.with { |c| c }
assert_equal(prefork_connection, pool.with { |c| c })
ConnectionPool.after_fork
refute_equal(prefork_connection, pool.with { |c| c })
end

def test_after_fork_callback_checkin
skip("MRI feature") unless Process.respond_to?(:fork)
GC.start # cleanup instances created by other tests

pool = ConnectionPool.new(size: 2) { NetworkConnection.new }
prefork_connection = pool.checkout
assert_equal(prefork_connection, pool.checkout)
ConnectionPool.after_fork
refute_equal(prefork_connection, pool.checkout)
end

def test_automatic_after_fork_callback
skip("MRI 3.1 feature") unless Process.respond_to?(:_fork)
GC.start # cleanup instances created by other tests

pool = ConnectionPool.new(size: 2) { NetworkConnection.new }
prefork_connection = pool.with { |c| c }
assert_equal(prefork_connection, pool.with { |c| c })
pid = fork do
refute_equal(prefork_connection, pool.with { |c| c })
exit!(0)
end
assert_equal(prefork_connection, pool.with { |c| c })
_, status = Process.waitpid2(pid)
assert_predicate(status, :success?)
end
end

0 comments on commit 428c06f

Please sign in to comment.