diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb index 7de0d12d08b50..552e9e52eacfc 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb @@ -113,7 +113,7 @@ def db_config # are now explicitly documented class ConnectionPool include MonitorMixin - include QueryCache::ConnectionPoolConfiguration + prepend QueryCache::ConnectionPoolConfiguration include ConnectionAdapters::AbstractPool attr_accessor :automatic_reconnect, :checkout_timeout @@ -176,13 +176,13 @@ def initialize(pool_config) # #connection can be called any number of times; the connection is # held in a cache keyed by a thread. def connection - @thread_cached_conns[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)] ||= checkout + @thread_cached_conns[ActiveSupport::IsolatedExecutionState.context] ||= checkout end def pin_connection!(lock_thread) # :nodoc: raise "There is already a pinned connection" if @pinned_connection - @pinned_connection = (@thread_cached_conns[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)] || checkout) + @pinned_connection = (@thread_cached_conns[ActiveSupport::IsolatedExecutionState.context] || checkout) # Any leased connection must be in @connections otherwise # some methods like #connected? won't behave correctly unless @connections.include?(@pinned_connection) @@ -226,7 +226,7 @@ def connection_class # :nodoc: # #connection or #with_connection methods. Connections obtained through # #checkout will not be detected by #active_connection? def active_connection? - @thread_cached_conns[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)] + @thread_cached_conns[ActiveSupport::IsolatedExecutionState.context] end # Signal that the thread is finished with the current connection. @@ -237,7 +237,7 @@ def active_connection? # #connection or #with_connection methods, connections obtained through # #checkout will not be automatically released. def release_connection(owner_thread = ActiveSupport::IsolatedExecutionState.context) - if conn = @thread_cached_conns.delete(connection_cache_key(owner_thread)) + if conn = @thread_cached_conns.delete(owner_thread) checkin conn end end @@ -252,7 +252,7 @@ def release_connection(owner_thread = ActiveSupport::IsolatedExecutionState.cont # connection will be properly returned to the pool by the code that checked # it out. def with_connection - unless conn = @thread_cached_conns[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)] + unless conn = @thread_cached_conns[ActiveSupport::IsolatedExecutionState.context] conn = connection fresh_connection = true end @@ -471,6 +471,8 @@ def reap remove conn end end + + prune_thread_cache end # Disconnect all connections that have been idle for at least @@ -558,15 +560,6 @@ def bulk_make_new_connections(num_new_conns_needed) end end - #-- - # From the discussion on GitHub: - # https://github.com/rails/rails/pull/14938#commitcomment-6601951 - # This hook-in method allows for easier monkey-patching fixes needed by - # JRuby users that use Fibers. - def connection_cache_key(thread) - thread - end - # Take control of all existing connections so a "group" action such as # reload/disconnect can be performed safely. It is no longer enough to # wrap it in +synchronize+ because some pool's actions are allowed @@ -710,10 +703,17 @@ def acquire_connection(checkout_timeout) #-- # if owner_thread param is omitted, this must be called in synchronize block def remove_connection_from_thread_cache(conn, owner_thread = conn.owner) - @thread_cached_conns.delete_pair(connection_cache_key(owner_thread), conn) + @thread_cached_conns.delete_pair(owner_thread, conn) end alias_method :release, :remove_connection_from_thread_cache + def prune_thread_cache + dead_threads = @thread_cached_conns.keys.reject(&:alive?) + dead_threads.each do |dead_thread| + @thread_cached_conns.delete(dead_thread) + end + end + def new_connection connection = db_config.new_connection connection.pool = self diff --git a/activerecord/lib/active_record/connection_adapters/abstract/query_cache.rb b/activerecord/lib/active_record/connection_adapters/abstract/query_cache.rb index 88c3240f8607d..5544f81cc02a6 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/query_cache.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/query_cache.rb @@ -13,8 +13,7 @@ def included(base) # :nodoc: :truncate_tables, :rollback_to_savepoint, :rollback_db_transaction, :restart_db_transaction, :exec_insert_all - base.set_callback :checkout, :after, :configure_query_cache! - base.set_callback :checkin, :after, :disable_query_cache! + base.set_callback :checkin, :after, :unset_query_cache! end def dirties_query_cache(base, *method_names) @@ -29,60 +28,153 @@ def #{method_name}(...) end end - module ConnectionPoolConfiguration - def initialize(*) + class Store # :nodoc: + attr_reader :enabled + alias_method :enabled?, :enabled + + def initialize(max_size) + @map = {} + @max_size = max_size + @enabled = false + end + + def enabled=(enabled) + clear if @enabled && !enabled + @enabled = enabled + end + + def size + @map.size + end + + def empty? + @map.empty? + end + + def [](key) + return unless @enabled + + if entry = @map.delete(key) + @map[key] = entry + end + end + + def compute_if_absent(key) + return yield unless @enabled + + if entry = @map.delete(key) + return @map[key] = entry + end + + if @max_size && @map.size >= @max_size + @map.shift # evict the oldest entry + end + + @map[key] ||= yield + end + + def clear + @map.clear + self + end + end + + module ConnectionPoolConfiguration # :nodoc: + def initialize(...) super - @query_cache_enabled = Concurrent::Map.new { false } + @thread_query_caches = Concurrent::Map.new(initial_capacity: @size) + @query_cache_max_size = \ + case query_cache = db_config&.query_cache + when 0, false + nil + when Integer + query_cache + when nil + DEFAULT_SIZE + end + end + + def checkout(...) + connection = super + connection.query_cache ||= query_cache + connection + end + + # Disable the query cache within the block. + def disable_query_cache + cache = query_cache + old, cache.enabled = cache.enabled, false + begin + yield + ensure + cache.enabled = old + end + end + + def enable_query_cache + cache = query_cache + old, cache.enabled = cache.enabled, true + begin + yield + ensure + cache.enabled = old + end end def enable_query_cache! - @query_cache_enabled[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)] = true - connection.enable_query_cache! if active_connection? + query_cache.enabled = true end def disable_query_cache! - @query_cache_enabled.delete connection_cache_key(ActiveSupport::IsolatedExecutionState.context) - connection.disable_query_cache! if active_connection? + query_cache.enabled = false end def query_cache_enabled - @query_cache_enabled[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)] + query_cache.enabled end + + private + def prune_thread_cache + dead_threads = @thread_query_caches.keys.reject(&:alive?) + dead_threads.each do |dead_thread| + @thread_query_caches.delete(dead_thread) + end + end + + def query_cache + @thread_query_caches.compute_if_absent(ActiveSupport::IsolatedExecutionState.context) do + Store.new(@query_cache_max_size) + end + end end - attr_reader :query_cache, :query_cache_enabled + attr_accessor :query_cache def initialize(*) super - @query_cache = {} - @query_cache_enabled = false - @query_cache_max_size = nil + @query_cache = nil + end + + def query_cache_enabled + @query_cache&.enabled? end # Enable the query cache within the block. - def cache - old, @query_cache_enabled = @query_cache_enabled, true - yield - ensure - @query_cache_enabled = old - clear_query_cache unless @query_cache_enabled + def cache(&) + pool.enable_query_cache(&) end def enable_query_cache! - @query_cache_enabled = true + pool.enable_query_cache! end - def disable_query_cache! - @query_cache_enabled = false - clear_query_cache + # Disable the query cache within the block. + def uncached(&) + pool.disable_query_cache(&) end - # Disable the query cache within the block. - def uncached - old, @query_cache_enabled = @query_cache_enabled, false - yield - ensure - @query_cache_enabled = old + def disable_query_cache! + pool.disable_query_cache! end # Clears the query cache. @@ -93,7 +185,7 @@ def uncached # undermining the randomness you were expecting. def clear_query_cache @lock.synchronize do - @query_cache.clear + @query_cache&.clear end end @@ -102,7 +194,7 @@ def select_all(arel, name = nil, binds = [], preparable: nil, async: false) # :n # If arel is locked this is a SELECT ... FOR UPDATE or somesuch. # Such queries should not be cached. - if @query_cache_enabled && !(arel.respond_to?(:locked) && arel.locked) + if @query_cache&.enabled? && !(arel.respond_to?(:locked) && arel.locked) sql, binds, preparable = to_sql_and_binds(arel, binds, preparable) if async @@ -117,42 +209,37 @@ def select_all(arel, name = nil, binds = [], preparable: nil, async: false) # :n end private + def unset_query_cache! + @query_cache = nil + end + def lookup_sql_cache(sql, name, binds) key = binds.empty? ? sql : [sql, binds] - hit = false - result = nil + result = nil @lock.synchronize do - if (result = @query_cache.delete(key)) - hit = true - @query_cache[key] = result - end + result = @query_cache[key] end - if hit + if result ActiveSupport::Notifications.instrument( "sql.active_record", cache_notification_info(sql, name, binds) ) - - result end + + result end def cache_sql(sql, name, binds) key = binds.empty? ? sql : [sql, binds] result = nil - hit = false + hit = true @lock.synchronize do - if (result = @query_cache.delete(key)) - hit = true - @query_cache[key] = result - else - result = @query_cache[key] = yield - if @query_cache_max_size && @query_cache.size > @query_cache_max_size - @query_cache.shift - end + result = @query_cache.compute_if_absent(key) do + hit = false + yield end end @@ -178,23 +265,6 @@ def cache_notification_info(sql, name, binds) cached: true } end - - def configure_query_cache! - case query_cache = pool.db_config.query_cache - when 0, false - return - when Integer - @query_cache_max_size = query_cache - when nil - @query_cache_max_size = DEFAULT_SIZE - else - @query_cache_max_size = nil # no limit - end - - if pool.query_cache_enabled - enable_query_cache! - end - end end end end diff --git a/activerecord/lib/active_record/query_cache.rb b/activerecord/lib/active_record/query_cache.rb index 08db278880fe1..18b0a7c901f53 100644 --- a/activerecord/lib/active_record/query_cache.rb +++ b/activerecord/lib/active_record/query_cache.rb @@ -8,7 +8,7 @@ module ClassMethods # If it's not, it will execute the given block. def cache(&block) if connected? || !configurations.empty? - connection.cache(&block) + connection_pool.enable_query_cache(&block) else yield end @@ -18,7 +18,7 @@ def cache(&block) # If it's not, it will execute the given block. def uncached(&block) if connected? || !configurations.empty? - connection.uncached(&block) + connection_pool.disable_query_cache(&block) else yield end @@ -26,11 +26,11 @@ def uncached(&block) end def self.run - ActiveRecord::Base.connection_handler.each_connection_pool.reject { |p| p.query_cache_enabled }.each { |p| p.enable_query_cache! } + ActiveRecord::Base.connection_handler.each_connection_pool.reject(&:query_cache_enabled).each(&:enable_query_cache!) end def self.complete(pools) - pools.each { |pool| pool.disable_query_cache! } + pools.each(&:disable_query_cache!) ActiveRecord::Base.connection_handler.each_connection_pool do |pool| pool.release_connection if pool.active_connection? && !pool.connection.transaction_open? diff --git a/activerecord/test/cases/query_cache_test.rb b/activerecord/test/cases/query_cache_test.rb index c8c01d1635d35..333b2f1ea02fb 100644 --- a/activerecord/test/cases/query_cache_test.rb +++ b/activerecord/test/cases/query_cache_test.rb @@ -808,7 +808,7 @@ def test_cache_gets_cleared_after_migration end def test_find - assert_called(Task.connection.query_cache, :clear) do + assert_called(Task.connection.query_cache, :clear, times: 2) do assert_not Task.connection.query_cache_enabled Task.cache do assert Task.connection.query_cache_enabled @@ -922,9 +922,12 @@ def test_cache_is_expired_by_habtm_delete end def test_query_cache_lru_eviction + store = ActiveRecord::ConnectionAdapters::QueryCache::Store.new(2) + store.enabled = true + connection = Post.connection - connection.pool.db_config.stub(:query_cache, 2) do - connection.send(:configure_query_cache!) + old_store, connection.query_cache = connection.query_cache, store + begin Post.cache do assert_queries_count(2) do connection.select_all("SELECT 1") @@ -945,9 +948,9 @@ def test_query_cache_lru_eviction connection.select_all("SELECT 2") end end + ensure + connection.query_cache = old_store end - ensure - connection.send(:configure_query_cache!) end test "threads use the same connection" do