Skip to content

Commit

Permalink
Prune dead thread from connection pool caches on reap
Browse files Browse the repository at this point in the history
Otherwise they could linger around and leak memory if a user
checkout connections from short lived fibers or threads.

The undocumented `connection_cache_key` hook point is eliminated
because it was essentially add to allow the connection pool to
be fiber based rather than thread based, which is now supported
out of the box.
  • Loading branch information
byroot committed Feb 12, 2024
1 parent f771d9b commit 9a8d06d
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 16 deletions.
Expand Up @@ -176,13 +176,13 @@ def initialize(pool_config)
# #connection can be called any number of times; the connection is
# held in a cache keyed by a thread.
def connection
@thread_cached_conns[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)] ||= checkout
@thread_cached_conns[ActiveSupport::IsolatedExecutionState.context] ||= checkout
end

def pin_connection!(lock_thread) # :nodoc:
raise "There is already a pinned connection" if @pinned_connection

@pinned_connection = (@thread_cached_conns[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)] || checkout)
@pinned_connection = (@thread_cached_conns[ActiveSupport::IsolatedExecutionState.context] || checkout)
# Any leased connection must be in @connections otherwise
# some methods like #connected? won't behave correctly
unless @connections.include?(@pinned_connection)
Expand Down Expand Up @@ -226,7 +226,7 @@ def connection_class # :nodoc:
# #connection or #with_connection methods. Connections obtained through
# #checkout will not be detected by #active_connection?
def active_connection?
@thread_cached_conns[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)]
@thread_cached_conns[ActiveSupport::IsolatedExecutionState.context]
end

# Signal that the thread is finished with the current connection.
Expand All @@ -237,7 +237,7 @@ def active_connection?
# #connection or #with_connection methods, connections obtained through
# #checkout will not be automatically released.
def release_connection(owner_thread = ActiveSupport::IsolatedExecutionState.context)
if conn = @thread_cached_conns.delete(connection_cache_key(owner_thread))
if conn = @thread_cached_conns.delete(owner_thread)
checkin conn
end
end
Expand All @@ -252,7 +252,7 @@ def release_connection(owner_thread = ActiveSupport::IsolatedExecutionState.cont
# connection will be properly returned to the pool by the code that checked
# it out.
def with_connection
unless conn = @thread_cached_conns[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)]
unless conn = @thread_cached_conns[ActiveSupport::IsolatedExecutionState.context]
conn = connection
fresh_connection = true
end
Expand Down Expand Up @@ -471,6 +471,8 @@ def reap
remove conn
end
end

prune_thread_cache
end

# Disconnect all connections that have been idle for at least
Expand All @@ -489,6 +491,8 @@ def flush(minimum_idle = @idle_timeout)
@available.delete conn
@connections.delete conn
end

prune_thread_cache
end

idle_connections.each do |conn|
Expand Down Expand Up @@ -558,15 +562,6 @@ def bulk_make_new_connections(num_new_conns_needed)
end
end

#--
# From the discussion on GitHub:
# https://github.com/rails/rails/pull/14938#commitcomment-6601951
# This hook-in method allows for easier monkey-patching fixes needed by
# JRuby users that use Fibers.
def connection_cache_key(thread)
thread
end

# Take control of all existing connections so a "group" action such as
# reload/disconnect can be performed safely. It is no longer enough to
# wrap it in +synchronize+ because some pool's actions are allowed
Expand Down Expand Up @@ -710,10 +705,17 @@ def acquire_connection(checkout_timeout)
#--
# if owner_thread param is omitted, this must be called in synchronize block
def remove_connection_from_thread_cache(conn, owner_thread = conn.owner)
@thread_cached_conns.delete_pair(connection_cache_key(owner_thread), conn)
@thread_cached_conns.delete_pair(owner_thread, conn)
end
alias_method :release, :remove_connection_from_thread_cache

def prune_thread_cache
dead_threads = @thread_cached_conns.keys.reject(&:alive?)
dead_threads.each do |dead_thread|
@thread_cached_conns.delete(dead_thread)
end
end

def new_connection
connection = db_config.new_connection
connection.pool = self
Expand Down
Expand Up @@ -134,8 +134,15 @@ def query_cache_enabled
end

private
def prune_thread_cache
dead_threads = @thread_query_caches.keys.reject(&:alive?)
dead_threads.each do |dead_thread|
@thread_query_caches.delete(dead_thread)
end
end

def query_cache
@thread_query_caches.compute_if_absent(connection_cache_key(ActiveSupport::IsolatedExecutionState.context)) do
@thread_query_caches.compute_if_absent(ActiveSupport::IsolatedExecutionState.context) do
Store.new(@query_cache_max_size)
end
end
Expand Down

0 comments on commit 9a8d06d

Please sign in to comment.