Skip to content

Commit

Permalink
Refactor QueryCache to be owned by the pool
Browse files Browse the repository at this point in the history
Ref: #50793

If we want to stop caching the checked out connections,
then we must persist the cache in the pool, and assign it
to the connection when it's checked out.

The pool become responsible for managing the cache lifecycle.

This also open the door to sharing the cache between multiple
connections, which is valuable for read replicas, etc.

This change only really make sense if we go through with no
longer caching checked out connections. Otherwise it's just
extra complexity.
  • Loading branch information
byroot committed Feb 1, 2024
1 parent c0be0a2 commit bbf34f3
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 77 deletions.
Expand Up @@ -113,7 +113,7 @@ def db_config
# are now explicitly documented
class ConnectionPool
include MonitorMixin
include QueryCache::ConnectionPoolConfiguration
prepend QueryCache::ConnectionPoolConfiguration
include ConnectionAdapters::AbstractPool

attr_accessor :automatic_reconnect, :checkout_timeout
Expand Down
Expand Up @@ -13,8 +13,7 @@ def included(base) # :nodoc:
:truncate_tables, :rollback_to_savepoint, :rollback_db_transaction, :restart_db_transaction,
:exec_insert_all

base.set_callback :checkout, :after, :configure_query_cache!
base.set_callback :checkin, :after, :disable_query_cache!
base.set_callback :checkin, :after, :unset_query_cache!
end

def dirties_query_cache(base, *method_names)
Expand All @@ -29,60 +28,146 @@ def #{method_name}(...)
end
end

module ConnectionPoolConfiguration
def initialize(*)
class Store # :nodoc:
attr_reader :enabled
alias_method :enabled?, :enabled

def initialize(max_size)
@map = {}
@max_size = max_size
@enabled = false
end

def enabled=(enabled)
clear if @enabled && !enabled
@enabled = enabled
end

def size
@map.size
end

def empty?
@map.empty?
end

def [](key)
return unless @enabled

if entry = @map.delete(key)
@map[key] = entry
end
end

def compute_if_absent(key)
return yield unless @enabled

if entry = @map.delete(key)
return @map[key] = entry
end

if @max_size && @map.size >= @max_size
@map.shift # evict the oldest entry
end

@map[key] ||= yield
end

def clear
@map.clear
self
end
end

module ConnectionPoolConfiguration # :nodoc:
def initialize(...)
super
@query_cache_enabled = Concurrent::Map.new { false }
@thread_query_caches = Concurrent::Map.new(initial_capacity: @size)
@query_cache_max_size = \
case query_cache = db_config&.query_cache
when 0, false
nil
when Integer
query_cache
when nil
DEFAULT_SIZE
end
end

def checkout(...)
connection = super
connection.query_cache ||= query_cache
connection
end

# Disable the query cache within the block.
def disable_query_cache
cache = query_cache
old, cache.enabled = cache.enabled, false
begin
yield
ensure
cache.enabled = old
end
end

def enable_query_cache
cache = query_cache
old, cache.enabled = cache.enabled, true
begin
yield
ensure
cache.enabled = old
end
end

def enable_query_cache!
@query_cache_enabled[connection_cache_key(current_thread)] = true
connection.enable_query_cache! if active_connection?
query_cache.enabled = true
end

def disable_query_cache!
@query_cache_enabled.delete connection_cache_key(current_thread)
connection.disable_query_cache! if active_connection?
query_cache.enabled = false
end

def query_cache_enabled
@query_cache_enabled[connection_cache_key(current_thread)]
query_cache.enabled
end

private
def query_cache
@thread_query_caches.compute_if_absent(connection_cache_key(current_thread)) do
Store.new(@query_cache_max_size)
end
end
end

attr_reader :query_cache, :query_cache_enabled
attr_accessor :query_cache

def initialize(*)
super
@query_cache = {}
@query_cache_enabled = false
@query_cache_max_size = nil
@query_cache = nil
end

def query_cache_enabled
@query_cache&.enabled?
end

# Enable the query cache within the block.
def cache
old, @query_cache_enabled = @query_cache_enabled, true
yield
ensure
@query_cache_enabled = old
clear_query_cache unless @query_cache_enabled
def cache(&)
pool.enable_query_cache(&)
end

def enable_query_cache!
@query_cache_enabled = true
pool.enable_query_cache!
end

def disable_query_cache!
@query_cache_enabled = false
clear_query_cache
# Disable the query cache within the block.
def uncached(&)
pool.disable_query_cache(&)
end

# Disable the query cache within the block.
def uncached
old, @query_cache_enabled = @query_cache_enabled, false
yield
ensure
@query_cache_enabled = old
def disable_query_cache!
pool.disable_query_cache!
end

# Clears the query cache.
Expand All @@ -93,7 +178,7 @@ def uncached
# undermining the randomness you were expecting.
def clear_query_cache
@lock.synchronize do
@query_cache.clear
@query_cache&.clear
end
end

Expand All @@ -102,7 +187,7 @@ def select_all(arel, name = nil, binds = [], preparable: nil, async: false) # :n

# If arel is locked this is a SELECT ... FOR UPDATE or somesuch.
# Such queries should not be cached.
if @query_cache_enabled && !(arel.respond_to?(:locked) && arel.locked)
if @query_cache&.enabled? && !(arel.respond_to?(:locked) && arel.locked)
sql, binds, preparable = to_sql_and_binds(arel, binds, preparable)

if async
Expand All @@ -117,42 +202,37 @@ def select_all(arel, name = nil, binds = [], preparable: nil, async: false) # :n
end

private
def unset_query_cache!
@query_cache = nil
end

def lookup_sql_cache(sql, name, binds)
key = binds.empty? ? sql : [sql, binds]
hit = false
result = nil

result = nil
@lock.synchronize do
if (result = @query_cache.delete(key))
hit = true
@query_cache[key] = result
end
result = @query_cache[key]
end

if hit
if result
ActiveSupport::Notifications.instrument(
"sql.active_record",
cache_notification_info(sql, name, binds)
)

result
end

result
end

def cache_sql(sql, name, binds)
key = binds.empty? ? sql : [sql, binds]
result = nil
hit = false
hit = true

@lock.synchronize do
if (result = @query_cache.delete(key))
hit = true
@query_cache[key] = result
else
result = @query_cache[key] = yield
if @query_cache_max_size && @query_cache.size > @query_cache_max_size
@query_cache.shift
end
result = @query_cache.compute_if_absent(key) do
hit = false
yield
end
end

Expand All @@ -178,23 +258,6 @@ def cache_notification_info(sql, name, binds)
cached: true
}
end

def configure_query_cache!
case query_cache = pool.db_config.query_cache
when 0, false
return
when Integer
@query_cache_max_size = query_cache
when nil
@query_cache_max_size = DEFAULT_SIZE
else
@query_cache_max_size = nil # no limit
end

if pool.query_cache_enabled
enable_query_cache!
end
end
end
end
end
8 changes: 4 additions & 4 deletions activerecord/lib/active_record/query_cache.rb
Expand Up @@ -8,7 +8,7 @@ module ClassMethods
# If it's not, it will execute the given block.
def cache(&block)
if connected? || !configurations.empty?
connection.cache(&block)
connection_pool.enable_query_cache(&block)
else
yield
end
Expand All @@ -18,19 +18,19 @@ def cache(&block)
# If it's not, it will execute the given block.
def uncached(&block)
if connected? || !configurations.empty?
connection.uncached(&block)
connection_pool.disable_query_cache(&block)
else
yield
end
end
end

def self.run
ActiveRecord::Base.connection_handler.each_connection_pool.reject { |p| p.query_cache_enabled }.each { |p| p.enable_query_cache! }
ActiveRecord::Base.connection_handler.each_connection_pool.reject(&:query_cache_enabled).each(&:enable_query_cache!)
end

def self.complete(pools)
pools.each { |pool| pool.disable_query_cache! }
pools.each(&:disable_query_cache!)

ActiveRecord::Base.connection_handler.each_connection_pool do |pool|
pool.release_connection if pool.active_connection? && !pool.connection.transaction_open?
Expand Down
13 changes: 8 additions & 5 deletions activerecord/test/cases/query_cache_test.rb
Expand Up @@ -806,7 +806,7 @@ def test_cache_gets_cleared_after_migration
end

def test_find
assert_called(Task.connection.query_cache, :clear) do
assert_called(Task.connection.query_cache, :clear, times: 2) do
assert_not Task.connection.query_cache_enabled
Task.cache do
assert Task.connection.query_cache_enabled
Expand Down Expand Up @@ -920,9 +920,12 @@ def test_cache_is_expired_by_habtm_delete
end

def test_query_cache_lru_eviction
store = ActiveRecord::ConnectionAdapters::QueryCache::Store.new(2)
store.enabled = true

connection = Post.connection
connection.pool.db_config.stub(:query_cache, 2) do
connection.send(:configure_query_cache!)
old_store, connection.query_cache = connection.query_cache, store
begin
Post.cache do
assert_queries_count(2) do
connection.select_all("SELECT 1")
Expand All @@ -943,9 +946,9 @@ def test_query_cache_lru_eviction
connection.select_all("SELECT 2")
end
end
ensure
connection.query_cache = old_store
end
ensure
connection.send(:configure_query_cache!)
end

test "threads use the same connection" do
Expand Down

0 comments on commit bbf34f3

Please sign in to comment.