Skip to content

Commit

Permalink
Merge pull request #50938 from Shopify/refactor-query-cache-to-pool
Browse files Browse the repository at this point in the history
Refactor QueryCache to be owned by the pool
  • Loading branch information
byroot committed Feb 14, 2024
2 parents cef567e + 94fc536 commit 13c1dfe
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 92 deletions.
Expand Up @@ -113,7 +113,7 @@ def db_config
# are now explicitly documented
class ConnectionPool
include MonitorMixin
include QueryCache::ConnectionPoolConfiguration
prepend QueryCache::ConnectionPoolConfiguration
include ConnectionAdapters::AbstractPool

attr_accessor :automatic_reconnect, :checkout_timeout
Expand Down Expand Up @@ -176,13 +176,13 @@ def initialize(pool_config)
# #connection can be called any number of times; the connection is
# held in a cache keyed by a thread.
def connection
@thread_cached_conns[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)] ||= checkout
@thread_cached_conns[ActiveSupport::IsolatedExecutionState.context] ||= checkout
end

def pin_connection!(lock_thread) # :nodoc:
raise "There is already a pinned connection" if @pinned_connection

@pinned_connection = (@thread_cached_conns[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)] || checkout)
@pinned_connection = (@thread_cached_conns[ActiveSupport::IsolatedExecutionState.context] || checkout)
# Any leased connection must be in @connections otherwise
# some methods like #connected? won't behave correctly
unless @connections.include?(@pinned_connection)
Expand Down Expand Up @@ -226,7 +226,7 @@ def connection_class # :nodoc:
# #connection or #with_connection methods. Connections obtained through
# #checkout will not be detected by #active_connection?
def active_connection?
@thread_cached_conns[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)]
@thread_cached_conns[ActiveSupport::IsolatedExecutionState.context]
end

# Signal that the thread is finished with the current connection.
Expand All @@ -237,7 +237,7 @@ def active_connection?
# #connection or #with_connection methods, connections obtained through
# #checkout will not be automatically released.
def release_connection(owner_thread = ActiveSupport::IsolatedExecutionState.context)
if conn = @thread_cached_conns.delete(connection_cache_key(owner_thread))
if conn = @thread_cached_conns.delete(owner_thread)
checkin conn
end
end
Expand All @@ -252,7 +252,7 @@ def release_connection(owner_thread = ActiveSupport::IsolatedExecutionState.cont
# connection will be properly returned to the pool by the code that checked
# it out.
def with_connection
unless conn = @thread_cached_conns[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)]
unless conn = @thread_cached_conns[ActiveSupport::IsolatedExecutionState.context]
conn = connection
fresh_connection = true
end
Expand Down Expand Up @@ -471,6 +471,8 @@ def reap
remove conn
end
end

prune_thread_cache
end

# Disconnect all connections that have been idle for at least
Expand Down Expand Up @@ -558,15 +560,6 @@ def bulk_make_new_connections(num_new_conns_needed)
end
end

#--
# From the discussion on GitHub:
# https://github.com/rails/rails/pull/14938#commitcomment-6601951
# This hook-in method allows for easier monkey-patching fixes needed by
# JRuby users that use Fibers.
def connection_cache_key(thread)
thread
end

# Take control of all existing connections so a "group" action such as
# reload/disconnect can be performed safely. It is no longer enough to
# wrap it in +synchronize+ because some pool's actions are allowed
Expand Down Expand Up @@ -710,10 +703,17 @@ def acquire_connection(checkout_timeout)
#--
# if owner_thread param is omitted, this must be called in synchronize block
def remove_connection_from_thread_cache(conn, owner_thread = conn.owner)
@thread_cached_conns.delete_pair(connection_cache_key(owner_thread), conn)
@thread_cached_conns.delete_pair(owner_thread, conn)
end
alias_method :release, :remove_connection_from_thread_cache

def prune_thread_cache
dead_threads = @thread_cached_conns.keys.reject(&:alive?)
dead_threads.each do |dead_thread|
@thread_cached_conns.delete(dead_thread)
end
end

def new_connection
connection = db_config.new_connection
connection.pool = self
Expand Down
Expand Up @@ -13,8 +13,7 @@ def included(base) # :nodoc:
:truncate_tables, :rollback_to_savepoint, :rollback_db_transaction, :restart_db_transaction,
:exec_insert_all

base.set_callback :checkout, :after, :configure_query_cache!
base.set_callback :checkin, :after, :disable_query_cache!
base.set_callback :checkin, :after, :unset_query_cache!
end

def dirties_query_cache(base, *method_names)
Expand All @@ -29,60 +28,153 @@ def #{method_name}(...)
end
end

module ConnectionPoolConfiguration
def initialize(*)
class Store # :nodoc:
attr_reader :enabled
alias_method :enabled?, :enabled

def initialize(max_size)
@map = {}
@max_size = max_size
@enabled = false
end

def enabled=(enabled)
clear if @enabled && !enabled
@enabled = enabled
end

def size
@map.size
end

def empty?
@map.empty?
end

def [](key)
return unless @enabled

if entry = @map.delete(key)
@map[key] = entry
end
end

def compute_if_absent(key)
return yield unless @enabled

if entry = @map.delete(key)
return @map[key] = entry
end

if @max_size && @map.size >= @max_size
@map.shift # evict the oldest entry
end

@map[key] ||= yield
end

def clear
@map.clear
self
end
end

module ConnectionPoolConfiguration # :nodoc:
def initialize(...)
super
@query_cache_enabled = Concurrent::Map.new { false }
@thread_query_caches = Concurrent::Map.new(initial_capacity: @size)
@query_cache_max_size = \
case query_cache = db_config&.query_cache
when 0, false
nil
when Integer
query_cache
when nil
DEFAULT_SIZE
end
end

def checkout(...)
connection = super
connection.query_cache ||= query_cache
connection
end

# Disable the query cache within the block.
def disable_query_cache
cache = query_cache
old, cache.enabled = cache.enabled, false
begin
yield
ensure
cache.enabled = old
end
end

def enable_query_cache
cache = query_cache
old, cache.enabled = cache.enabled, true
begin
yield
ensure
cache.enabled = old
end
end

def enable_query_cache!
@query_cache_enabled[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)] = true
connection.enable_query_cache! if active_connection?
query_cache.enabled = true
end

def disable_query_cache!
@query_cache_enabled.delete connection_cache_key(ActiveSupport::IsolatedExecutionState.context)
connection.disable_query_cache! if active_connection?
query_cache.enabled = false
end

def query_cache_enabled
@query_cache_enabled[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)]
query_cache.enabled
end

private
def prune_thread_cache
dead_threads = @thread_query_caches.keys.reject(&:alive?)
dead_threads.each do |dead_thread|
@thread_query_caches.delete(dead_thread)
end
end

def query_cache
@thread_query_caches.compute_if_absent(ActiveSupport::IsolatedExecutionState.context) do
Store.new(@query_cache_max_size)
end
end
end

attr_reader :query_cache, :query_cache_enabled
attr_accessor :query_cache

def initialize(*)
super
@query_cache = {}
@query_cache_enabled = false
@query_cache_max_size = nil
@query_cache = nil
end

def query_cache_enabled
@query_cache&.enabled?
end

# Enable the query cache within the block.
def cache
old, @query_cache_enabled = @query_cache_enabled, true
yield
ensure
@query_cache_enabled = old
clear_query_cache unless @query_cache_enabled
def cache(&)
pool.enable_query_cache(&)
end

def enable_query_cache!
@query_cache_enabled = true
pool.enable_query_cache!
end

def disable_query_cache!
@query_cache_enabled = false
clear_query_cache
# Disable the query cache within the block.
def uncached(&)
pool.disable_query_cache(&)
end

# Disable the query cache within the block.
def uncached
old, @query_cache_enabled = @query_cache_enabled, false
yield
ensure
@query_cache_enabled = old
def disable_query_cache!
pool.disable_query_cache!
end

# Clears the query cache.
Expand All @@ -93,7 +185,7 @@ def uncached
# undermining the randomness you were expecting.
def clear_query_cache
@lock.synchronize do
@query_cache.clear
@query_cache&.clear
end
end

Expand All @@ -102,7 +194,7 @@ def select_all(arel, name = nil, binds = [], preparable: nil, async: false) # :n

# If arel is locked this is a SELECT ... FOR UPDATE or somesuch.
# Such queries should not be cached.
if @query_cache_enabled && !(arel.respond_to?(:locked) && arel.locked)
if @query_cache&.enabled? && !(arel.respond_to?(:locked) && arel.locked)
sql, binds, preparable = to_sql_and_binds(arel, binds, preparable)

if async
Expand All @@ -117,42 +209,37 @@ def select_all(arel, name = nil, binds = [], preparable: nil, async: false) # :n
end

private
def unset_query_cache!
@query_cache = nil
end

def lookup_sql_cache(sql, name, binds)
key = binds.empty? ? sql : [sql, binds]
hit = false
result = nil

result = nil
@lock.synchronize do
if (result = @query_cache.delete(key))
hit = true
@query_cache[key] = result
end
result = @query_cache[key]
end

if hit
if result
ActiveSupport::Notifications.instrument(
"sql.active_record",
cache_notification_info(sql, name, binds)
)

result
end

result
end

def cache_sql(sql, name, binds)
key = binds.empty? ? sql : [sql, binds]
result = nil
hit = false
hit = true

@lock.synchronize do
if (result = @query_cache.delete(key))
hit = true
@query_cache[key] = result
else
result = @query_cache[key] = yield
if @query_cache_max_size && @query_cache.size > @query_cache_max_size
@query_cache.shift
end
result = @query_cache.compute_if_absent(key) do
hit = false
yield
end
end

Expand All @@ -178,23 +265,6 @@ def cache_notification_info(sql, name, binds)
cached: true
}
end

def configure_query_cache!
case query_cache = pool.db_config.query_cache
when 0, false
return
when Integer
@query_cache_max_size = query_cache
when nil
@query_cache_max_size = DEFAULT_SIZE
else
@query_cache_max_size = nil # no limit
end

if pool.query_cache_enabled
enable_query_cache!
end
end
end
end
end

0 comments on commit 13c1dfe

Please sign in to comment.