From 85742ce529b028812066cd6070e7de2ff3d44bc8 Mon Sep 17 00:00:00 2001 From: Jean Boussier Date: Thu, 25 Jan 2024 13:49:48 +0100 Subject: [PATCH 1/2] Refactor QueryCache to be owned by the pool Ref: https://github.com/rails/rails/pull/50793 If we want to stop caching the checked out connections, then we must persist the cache in the pool, and assign it to the connection when it's checked out. The pool become responsible for managing the cache lifecycle. This also open the door to sharing the cache between multiple connections, which is valuable for read replicas, etc. This change only really make sense if we go through with no longer caching checked out connections. Otherwise it's just extra complexity. --- .../abstract/connection_pool.rb | 2 +- .../abstract/query_cache.rb | 197 ++++++++++++------ activerecord/lib/active_record/query_cache.rb | 8 +- activerecord/test/cases/query_cache_test.rb | 13 +- 4 files changed, 143 insertions(+), 77 deletions(-) diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb index 7de0d12d08b50..9747f0cc21b36 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb @@ -113,7 +113,7 @@ def db_config # are now explicitly documented class ConnectionPool include MonitorMixin - include QueryCache::ConnectionPoolConfiguration + prepend QueryCache::ConnectionPoolConfiguration include ConnectionAdapters::AbstractPool attr_accessor :automatic_reconnect, :checkout_timeout diff --git a/activerecord/lib/active_record/connection_adapters/abstract/query_cache.rb b/activerecord/lib/active_record/connection_adapters/abstract/query_cache.rb index 88c3240f8607d..077bd1089d165 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/query_cache.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/query_cache.rb @@ -13,8 +13,7 @@ def included(base) # :nodoc: :truncate_tables, :rollback_to_savepoint, :rollback_db_transaction, :restart_db_transaction, :exec_insert_all - base.set_callback :checkout, :after, :configure_query_cache! - base.set_callback :checkin, :after, :disable_query_cache! + base.set_callback :checkin, :after, :unset_query_cache! end def dirties_query_cache(base, *method_names) @@ -29,60 +28,146 @@ def #{method_name}(...) end end - module ConnectionPoolConfiguration - def initialize(*) + class Store # :nodoc: + attr_reader :enabled + alias_method :enabled?, :enabled + + def initialize(max_size) + @map = {} + @max_size = max_size + @enabled = false + end + + def enabled=(enabled) + clear if @enabled && !enabled + @enabled = enabled + end + + def size + @map.size + end + + def empty? + @map.empty? + end + + def [](key) + return unless @enabled + + if entry = @map.delete(key) + @map[key] = entry + end + end + + def compute_if_absent(key) + return yield unless @enabled + + if entry = @map.delete(key) + return @map[key] = entry + end + + if @max_size && @map.size >= @max_size + @map.shift # evict the oldest entry + end + + @map[key] ||= yield + end + + def clear + @map.clear + self + end + end + + module ConnectionPoolConfiguration # :nodoc: + def initialize(...) super - @query_cache_enabled = Concurrent::Map.new { false } + @thread_query_caches = Concurrent::Map.new(initial_capacity: @size) + @query_cache_max_size = \ + case query_cache = db_config&.query_cache + when 0, false + nil + when Integer + query_cache + when nil + DEFAULT_SIZE + end + end + + def checkout(...) + connection = super + connection.query_cache ||= query_cache + connection + end + + # Disable the query cache within the block. + def disable_query_cache + cache = query_cache + old, cache.enabled = cache.enabled, false + begin + yield + ensure + cache.enabled = old + end + end + + def enable_query_cache + cache = query_cache + old, cache.enabled = cache.enabled, true + begin + yield + ensure + cache.enabled = old + end end def enable_query_cache! - @query_cache_enabled[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)] = true - connection.enable_query_cache! if active_connection? + query_cache.enabled = true end def disable_query_cache! - @query_cache_enabled.delete connection_cache_key(ActiveSupport::IsolatedExecutionState.context) - connection.disable_query_cache! if active_connection? + query_cache.enabled = false end def query_cache_enabled - @query_cache_enabled[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)] + query_cache.enabled end + + private + def query_cache + @thread_query_caches.compute_if_absent(connection_cache_key(ActiveSupport::IsolatedExecutionState.context)) do + Store.new(@query_cache_max_size) + end + end end - attr_reader :query_cache, :query_cache_enabled + attr_accessor :query_cache def initialize(*) super - @query_cache = {} - @query_cache_enabled = false - @query_cache_max_size = nil + @query_cache = nil + end + + def query_cache_enabled + @query_cache&.enabled? end # Enable the query cache within the block. - def cache - old, @query_cache_enabled = @query_cache_enabled, true - yield - ensure - @query_cache_enabled = old - clear_query_cache unless @query_cache_enabled + def cache(&) + pool.enable_query_cache(&) end def enable_query_cache! - @query_cache_enabled = true + pool.enable_query_cache! end - def disable_query_cache! - @query_cache_enabled = false - clear_query_cache + # Disable the query cache within the block. + def uncached(&) + pool.disable_query_cache(&) end - # Disable the query cache within the block. - def uncached - old, @query_cache_enabled = @query_cache_enabled, false - yield - ensure - @query_cache_enabled = old + def disable_query_cache! + pool.disable_query_cache! end # Clears the query cache. @@ -93,7 +178,7 @@ def uncached # undermining the randomness you were expecting. def clear_query_cache @lock.synchronize do - @query_cache.clear + @query_cache&.clear end end @@ -102,7 +187,7 @@ def select_all(arel, name = nil, binds = [], preparable: nil, async: false) # :n # If arel is locked this is a SELECT ... FOR UPDATE or somesuch. # Such queries should not be cached. - if @query_cache_enabled && !(arel.respond_to?(:locked) && arel.locked) + if @query_cache&.enabled? && !(arel.respond_to?(:locked) && arel.locked) sql, binds, preparable = to_sql_and_binds(arel, binds, preparable) if async @@ -117,42 +202,37 @@ def select_all(arel, name = nil, binds = [], preparable: nil, async: false) # :n end private + def unset_query_cache! + @query_cache = nil + end + def lookup_sql_cache(sql, name, binds) key = binds.empty? ? sql : [sql, binds] - hit = false - result = nil + result = nil @lock.synchronize do - if (result = @query_cache.delete(key)) - hit = true - @query_cache[key] = result - end + result = @query_cache[key] end - if hit + if result ActiveSupport::Notifications.instrument( "sql.active_record", cache_notification_info(sql, name, binds) ) - - result end + + result end def cache_sql(sql, name, binds) key = binds.empty? ? sql : [sql, binds] result = nil - hit = false + hit = true @lock.synchronize do - if (result = @query_cache.delete(key)) - hit = true - @query_cache[key] = result - else - result = @query_cache[key] = yield - if @query_cache_max_size && @query_cache.size > @query_cache_max_size - @query_cache.shift - end + result = @query_cache.compute_if_absent(key) do + hit = false + yield end end @@ -178,23 +258,6 @@ def cache_notification_info(sql, name, binds) cached: true } end - - def configure_query_cache! - case query_cache = pool.db_config.query_cache - when 0, false - return - when Integer - @query_cache_max_size = query_cache - when nil - @query_cache_max_size = DEFAULT_SIZE - else - @query_cache_max_size = nil # no limit - end - - if pool.query_cache_enabled - enable_query_cache! - end - end end end end diff --git a/activerecord/lib/active_record/query_cache.rb b/activerecord/lib/active_record/query_cache.rb index 08db278880fe1..18b0a7c901f53 100644 --- a/activerecord/lib/active_record/query_cache.rb +++ b/activerecord/lib/active_record/query_cache.rb @@ -8,7 +8,7 @@ module ClassMethods # If it's not, it will execute the given block. def cache(&block) if connected? || !configurations.empty? - connection.cache(&block) + connection_pool.enable_query_cache(&block) else yield end @@ -18,7 +18,7 @@ def cache(&block) # If it's not, it will execute the given block. def uncached(&block) if connected? || !configurations.empty? - connection.uncached(&block) + connection_pool.disable_query_cache(&block) else yield end @@ -26,11 +26,11 @@ def uncached(&block) end def self.run - ActiveRecord::Base.connection_handler.each_connection_pool.reject { |p| p.query_cache_enabled }.each { |p| p.enable_query_cache! } + ActiveRecord::Base.connection_handler.each_connection_pool.reject(&:query_cache_enabled).each(&:enable_query_cache!) end def self.complete(pools) - pools.each { |pool| pool.disable_query_cache! } + pools.each(&:disable_query_cache!) ActiveRecord::Base.connection_handler.each_connection_pool do |pool| pool.release_connection if pool.active_connection? && !pool.connection.transaction_open? diff --git a/activerecord/test/cases/query_cache_test.rb b/activerecord/test/cases/query_cache_test.rb index c8c01d1635d35..333b2f1ea02fb 100644 --- a/activerecord/test/cases/query_cache_test.rb +++ b/activerecord/test/cases/query_cache_test.rb @@ -808,7 +808,7 @@ def test_cache_gets_cleared_after_migration end def test_find - assert_called(Task.connection.query_cache, :clear) do + assert_called(Task.connection.query_cache, :clear, times: 2) do assert_not Task.connection.query_cache_enabled Task.cache do assert Task.connection.query_cache_enabled @@ -922,9 +922,12 @@ def test_cache_is_expired_by_habtm_delete end def test_query_cache_lru_eviction + store = ActiveRecord::ConnectionAdapters::QueryCache::Store.new(2) + store.enabled = true + connection = Post.connection - connection.pool.db_config.stub(:query_cache, 2) do - connection.send(:configure_query_cache!) + old_store, connection.query_cache = connection.query_cache, store + begin Post.cache do assert_queries_count(2) do connection.select_all("SELECT 1") @@ -945,9 +948,9 @@ def test_query_cache_lru_eviction connection.select_all("SELECT 2") end end + ensure + connection.query_cache = old_store end - ensure - connection.send(:configure_query_cache!) end test "threads use the same connection" do From 94fc536007aaa6c687480c22af3c1ed9ef953534 Mon Sep 17 00:00:00 2001 From: Jean Boussier Date: Mon, 12 Feb 2024 12:53:33 +0100 Subject: [PATCH 2/2] Prune dead thread from connection pool caches on reap Otherwise they could linger around and leak memory if a user checkout connections from short lived fibers or threads. The undocumented `connection_cache_key` hook point is eliminated because it was essentially add to allow the connection pool to be fiber based rather than thread based, which is now supported out of the box. --- .../abstract/connection_pool.rb | 30 +++++++++---------- .../abstract/query_cache.rb | 9 +++++- 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb index 9747f0cc21b36..552e9e52eacfc 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb @@ -176,13 +176,13 @@ def initialize(pool_config) # #connection can be called any number of times; the connection is # held in a cache keyed by a thread. def connection - @thread_cached_conns[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)] ||= checkout + @thread_cached_conns[ActiveSupport::IsolatedExecutionState.context] ||= checkout end def pin_connection!(lock_thread) # :nodoc: raise "There is already a pinned connection" if @pinned_connection - @pinned_connection = (@thread_cached_conns[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)] || checkout) + @pinned_connection = (@thread_cached_conns[ActiveSupport::IsolatedExecutionState.context] || checkout) # Any leased connection must be in @connections otherwise # some methods like #connected? won't behave correctly unless @connections.include?(@pinned_connection) @@ -226,7 +226,7 @@ def connection_class # :nodoc: # #connection or #with_connection methods. Connections obtained through # #checkout will not be detected by #active_connection? def active_connection? - @thread_cached_conns[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)] + @thread_cached_conns[ActiveSupport::IsolatedExecutionState.context] end # Signal that the thread is finished with the current connection. @@ -237,7 +237,7 @@ def active_connection? # #connection or #with_connection methods, connections obtained through # #checkout will not be automatically released. def release_connection(owner_thread = ActiveSupport::IsolatedExecutionState.context) - if conn = @thread_cached_conns.delete(connection_cache_key(owner_thread)) + if conn = @thread_cached_conns.delete(owner_thread) checkin conn end end @@ -252,7 +252,7 @@ def release_connection(owner_thread = ActiveSupport::IsolatedExecutionState.cont # connection will be properly returned to the pool by the code that checked # it out. def with_connection - unless conn = @thread_cached_conns[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)] + unless conn = @thread_cached_conns[ActiveSupport::IsolatedExecutionState.context] conn = connection fresh_connection = true end @@ -471,6 +471,8 @@ def reap remove conn end end + + prune_thread_cache end # Disconnect all connections that have been idle for at least @@ -558,15 +560,6 @@ def bulk_make_new_connections(num_new_conns_needed) end end - #-- - # From the discussion on GitHub: - # https://github.com/rails/rails/pull/14938#commitcomment-6601951 - # This hook-in method allows for easier monkey-patching fixes needed by - # JRuby users that use Fibers. - def connection_cache_key(thread) - thread - end - # Take control of all existing connections so a "group" action such as # reload/disconnect can be performed safely. It is no longer enough to # wrap it in +synchronize+ because some pool's actions are allowed @@ -710,10 +703,17 @@ def acquire_connection(checkout_timeout) #-- # if owner_thread param is omitted, this must be called in synchronize block def remove_connection_from_thread_cache(conn, owner_thread = conn.owner) - @thread_cached_conns.delete_pair(connection_cache_key(owner_thread), conn) + @thread_cached_conns.delete_pair(owner_thread, conn) end alias_method :release, :remove_connection_from_thread_cache + def prune_thread_cache + dead_threads = @thread_cached_conns.keys.reject(&:alive?) + dead_threads.each do |dead_thread| + @thread_cached_conns.delete(dead_thread) + end + end + def new_connection connection = db_config.new_connection connection.pool = self diff --git a/activerecord/lib/active_record/connection_adapters/abstract/query_cache.rb b/activerecord/lib/active_record/connection_adapters/abstract/query_cache.rb index 077bd1089d165..5544f81cc02a6 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/query_cache.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/query_cache.rb @@ -134,8 +134,15 @@ def query_cache_enabled end private + def prune_thread_cache + dead_threads = @thread_query_caches.keys.reject(&:alive?) + dead_threads.each do |dead_thread| + @thread_query_caches.delete(dead_thread) + end + end + def query_cache - @thread_query_caches.compute_if_absent(connection_cache_key(ActiveSupport::IsolatedExecutionState.context)) do + @thread_query_caches.compute_if_absent(ActiveSupport::IsolatedExecutionState.context) do Store.new(@query_cache_max_size) end end