Skip to content

Commit

Permalink
Merge pull request #45564 from eileencodes/replace-owner-and-spec-wit…
Browse files Browse the repository at this point in the history
…h-connection_name

Ensure connection_name is used everywhere
  • Loading branch information
eileencodes committed Jul 12, 2022
2 parents ce9d2a1 + 3dffb0d commit 3cbc727
Show file tree
Hide file tree
Showing 13 changed files with 72 additions and 72 deletions.
Expand Up @@ -56,7 +56,7 @@ class ConnectionHandler
FINALIZER = lambda { |_| ActiveSupport::ForkTracker.check! }
private_constant :FINALIZER

class StringConnectionOwner # :nodoc:
class StringConnectionName # :nodoc:
attr_reader :name

def initialize(name)
Expand All @@ -73,8 +73,8 @@ def current_preventing_writes
end

def initialize
# These caches are keyed by pool_config.connection_specification_name (PoolConfig#connection_specification_name).
@owner_to_pool_manager = Concurrent::Map.new(initial_capacity: 2)
# These caches are keyed by pool_config.connection_name (PoolConfig#connection_name).
@connection_name_to_pool_manager = Concurrent::Map.new(initial_capacity: 2)

# Backup finalizer: if the forked child skipped Kernel#fork the early discard has not occurred
ObjectSpace.define_finalizer self, FINALIZER
Expand All @@ -89,24 +89,25 @@ def prevent_writes=(prevent_writes) # :nodoc:
end

def connection_pool_names # :nodoc:
owner_to_pool_manager.keys
connection_name_to_pool_manager.keys
end

def all_connection_pools
owner_to_pool_manager.values.flat_map { |m| m.pool_configs.map(&:pool) }
connection_name_to_pool_manager.values.flat_map { |m| m.pool_configs.map(&:pool) }
end

def connection_pool_list(role = ActiveRecord::Base.current_role)
owner_to_pool_manager.values.flat_map { |m| m.pool_configs(role).map(&:pool) }
connection_name_to_pool_manager.values.flat_map { |m| m.pool_configs(role).map(&:pool) }
end
alias :connection_pools :connection_pool_list

def establish_connection(config, owner_name: Base, role: ActiveRecord::Base.current_role, shard: Base.current_shard)
owner_name = StringConnectionOwner.new(config.to_s) if config.is_a?(Symbol)
owner_name = StringConnectionName.new(config.to_s) if config.is_a?(Symbol)

pool_config = resolve_pool_config(config, owner_name, role, shard)
db_config = pool_config.db_config

pool_manager = set_pool_manager(pool_config.connection_specification_name)
pool_manager = set_pool_manager(pool_config.connection_name)

# If there is an existing pool with the same values as the pool_config
# don't remove the connection. Connections should only be removed if we are
Expand All @@ -121,7 +122,7 @@ def establish_connection(config, owner_name: Base, role: ActiveRecord::Base.curr
pool_manager.set_pool_config(role, shard, pool_config)

payload = {
spec_name: pool_config.connection_specification_name,
connection_name: pool_config.connection_name,
shard: shard,
config: db_config.configuration_hash
}
Expand Down Expand Up @@ -167,18 +168,18 @@ def flush_idle_connections!(role = ActiveRecord::Base.current_role)
# active or defined connection: if it is the latter, it will be
# opened and set as the active connection for the class it was defined
# for (not necessarily the current class).
def retrieve_connection(spec_name, role: ActiveRecord::Base.current_role, shard: ActiveRecord::Base.current_shard) # :nodoc:
pool = retrieve_connection_pool(spec_name, role: role, shard: shard)
def retrieve_connection(connection_name, role: ActiveRecord::Base.current_role, shard: ActiveRecord::Base.current_shard) # :nodoc:
pool = retrieve_connection_pool(connection_name, role: role, shard: shard)

unless pool
if shard != ActiveRecord::Base.default_shard
message = "No connection pool for '#{spec_name}' found for the '#{shard}' shard."
message = "No connection pool for '#{connection_name}' found for the '#{shard}' shard."
elsif ActiveRecord::Base.connection_handler != ActiveRecord::Base.default_connection_handler
message = "No connection pool for '#{spec_name}' found for the '#{ActiveRecord::Base.current_role}' role."
message = "No connection pool for '#{connection_name}' found for the '#{ActiveRecord::Base.current_role}' role."
elsif role != ActiveRecord::Base.default_role
message = "No connection pool for '#{spec_name}' found for the '#{role}' role."
message = "No connection pool for '#{connection_name}' found for the '#{role}' role."
else
message = "No connection pool for '#{spec_name}' found."
message = "No connection pool for '#{connection_name}' found."
end

raise ConnectionNotEstablished, message
Expand All @@ -189,36 +190,36 @@ def retrieve_connection(spec_name, role: ActiveRecord::Base.current_role, shard:

# Returns true if a connection that's accessible to this class has
# already been opened.
def connected?(spec_name, role: ActiveRecord::Base.current_role, shard: ActiveRecord::Base.current_shard)
pool = retrieve_connection_pool(spec_name, role: role, shard: shard)
def connected?(connection_name, role: ActiveRecord::Base.current_role, shard: ActiveRecord::Base.current_shard)
pool = retrieve_connection_pool(connection_name, role: role, shard: shard)
pool && pool.connected?
end

def remove_connection_pool(owner, role: ActiveRecord::Base.current_role, shard: ActiveRecord::Base.current_shard)
if pool_manager = get_pool_manager(owner)
def remove_connection_pool(connection_name, role: ActiveRecord::Base.current_role, shard: ActiveRecord::Base.current_shard)
if pool_manager = get_pool_manager(connection_name)
disconnect_pool_from_pool_manager(pool_manager, role, shard)
end
end

# Retrieving the connection pool happens a lot, so we cache it in @owner_to_pool_manager.
# Retrieving the connection pool happens a lot, so we cache it in @connection_name_to_pool_manager.
# This makes retrieving the connection pool O(1) once the process is warm.
# When a connection is established or removed, we invalidate the cache.
def retrieve_connection_pool(owner, role: ActiveRecord::Base.current_role, shard: ActiveRecord::Base.current_shard)
pool_config = get_pool_manager(owner)&.get_pool_config(role, shard)
def retrieve_connection_pool(connection_name, role: ActiveRecord::Base.current_role, shard: ActiveRecord::Base.current_shard)
pool_config = get_pool_manager(connection_name)&.get_pool_config(role, shard)
pool_config&.pool
end

private
attr_reader :owner_to_pool_manager
attr_reader :connection_name_to_pool_manager

# Returns the pool manager for an owner.
def get_pool_manager(owner)
owner_to_pool_manager[owner]
# Returns the pool manager for a connection name / identifier.
def get_pool_manager(connection_name)
connection_name_to_pool_manager[connection_name]
end

# Get the existing pool manager or initialize and assign a new one.
def set_pool_manager(owner)
owner_to_pool_manager[owner] ||= PoolManager.new
def set_pool_manager(connection_name)
connection_name_to_pool_manager[connection_name] ||= PoolManager.new
end

def disconnect_pool_from_pool_manager(pool_manager, role, shard)
Expand All @@ -240,7 +241,7 @@ def disconnect_pool_from_pool_manager(pool_manager, role, shard)
# pool_config.db_config.configuration_hash
# # => { host: "localhost", database: "foo", adapter: "sqlite3" }
#
def resolve_pool_config(config, owner_name, role, shard)
def resolve_pool_config(config, connection_name, role, shard)
db_config = Base.configurations.resolve(config)

raise(AdapterNotSpecified, "database configuration does not specify adapter") unless db_config.adapter
Expand Down Expand Up @@ -270,7 +271,7 @@ def resolve_pool_config(config, owner_name, role, shard)
raise AdapterNotFound, "database configuration specifies nonexistent #{db_config.adapter} adapter"
end

ConnectionAdapters::PoolConfig.new(owner_name, db_config, role, shard)
ConnectionAdapters::PoolConfig.new(connection_name, db_config, role, shard)
end
end
end
Expand Down
Expand Up @@ -171,17 +171,17 @@ def migration_context # :nodoc:
def schema_migration # :nodoc:
@schema_migration ||= begin
conn = self
spec_name = conn.pool.pool_config.connection_specification_name
connection_name = conn.pool.pool_config.connection_name

return ActiveRecord::SchemaMigration if spec_name == "ActiveRecord::Base"
return ActiveRecord::SchemaMigration if connection_name == "ActiveRecord::Base"

schema_migration_name = "#{spec_name}::SchemaMigration"
schema_migration_name = "#{connection_name}::SchemaMigration"

Class.new(ActiveRecord::SchemaMigration) do
define_singleton_method(:name) { schema_migration_name }
define_singleton_method(:to_s) { schema_migration_name }

self.connection_specification_name = spec_name
self.connection_specification_name = connection_name
end
end
end
Expand Down
Expand Up @@ -27,7 +27,7 @@ def initialize(connection_class, db_config, role, shard)
INSTANCES[self] = self
end

def connection_specification_name
def connection_name
if connection_class.primary_class?
"ActiveRecord::Base"
else
Expand Down
16 changes: 8 additions & 8 deletions activerecord/lib/active_record/connection_handling.rb
Expand Up @@ -48,8 +48,8 @@ module ConnectionHandling
# may be returned on an error.
def establish_connection(config_or_env = nil)
config_or_env ||= DEFAULT_ENV.call.to_sym
db_config, owner_name = resolve_config_for_connection(config_or_env)
connection_handler.establish_connection(db_config, owner_name: owner_name, role: current_role, shard: current_shard)
db_config, connection_class = resolve_config_for_connection(config_or_env)
connection_handler.establish_connection(db_config, owner_name: connection_class, role: current_role, shard: current_shard)
end

# Connects a model to the databases specified. The +database+ keyword
Expand Down Expand Up @@ -87,18 +87,18 @@ def connects_to(database: {}, shards: {})
connections = []

database.each do |role, database_key|
db_config, owner_name = resolve_config_for_connection(database_key)
db_config, connection_class = resolve_config_for_connection(database_key)

self.connection_class = true
connections << connection_handler.establish_connection(db_config, owner_name: owner_name, role: role)
connections << connection_handler.establish_connection(db_config, owner_name: connection_class, role: role)
end

shards.each do |shard, database_keys|
database_keys.each do |role, database_key|
db_config, owner_name = resolve_config_for_connection(database_key)
db_config, connection_class = resolve_config_for_connection(database_key)

self.connection_class = true
connections << connection_handler.establish_connection(db_config, owner_name: owner_name, role: role, shard: shard.to_sym)
connections << connection_handler.establish_connection(db_config, owner_name: connection_class, role: role, shard: shard.to_sym)
end
end

Expand Down Expand Up @@ -315,8 +315,8 @@ def clear_cache! # :nodoc:
def resolve_config_for_connection(config_or_env)
raise "Anonymous class is not allowed." unless name

owner_name = primary_class? ? Base.name : name
self.connection_specification_name = owner_name
connection_name = primary_class? ? Base.name : name
self.connection_specification_name = connection_name

db_config = Base.configurations.resolve(config_or_env)
[db_config, self]
Expand Down
10 changes: 5 additions & 5 deletions activerecord/lib/active_record/test_fixtures.rb
Expand Up @@ -115,13 +115,13 @@ def setup_fixtures(config = ActiveRecord::Base)

# When connections are established in the future, begin a transaction too
@connection_subscriber = ActiveSupport::Notifications.subscribe("!connection.active_record") do |_, _, _, _, payload|
spec_name = payload[:spec_name] if payload.key?(:spec_name)
connection_name = payload[:connection_name] if payload.key?(:connection_name)
shard = payload[:shard] if payload.key?(:shard)
setup_shared_connection_pool

if spec_name
if connection_name
begin
connection = ActiveRecord::Base.connection_handler.retrieve_connection(spec_name, shard: shard)
connection = ActiveRecord::Base.connection_handler.retrieve_connection(connection_name, shard: shard)
rescue ConnectionNotEstablished
connection = nil
end
Expand Down Expand Up @@ -179,7 +179,7 @@ def setup_shared_connection_pool
handler = ActiveRecord::Base.connection_handler

handler.connection_pool_names.each do |name|
pool_manager = handler.send(:owner_to_pool_manager)[name]
pool_manager = handler.send(:connection_name_to_pool_manager)[name]
pool_manager.shard_names.each do |shard_name|
writing_pool_config = pool_manager.get_pool_config(ActiveRecord.writing_role, shard_name)
@saved_pool_configs[name][shard_name] ||= {}
Expand All @@ -198,7 +198,7 @@ def teardown_shared_connection_pool
handler = ActiveRecord::Base.connection_handler

@saved_pool_configs.each_pair do |name, shards|
pool_manager = handler.send(:owner_to_pool_manager)[name]
pool_manager = handler.send(:connection_name_to_pool_manager)[name]
shards.each_pair do |shard_name, roles|
roles.each_pair do |role, pool_config|
next unless pool_manager.get_pool_config(role, shard_name)
Expand Down
Expand Up @@ -12,7 +12,7 @@ class ConnectionHandlerTest < ActiveRecord::TestCase

def setup
@handler = ConnectionHandler.new
@owner_name = "ActiveRecord::Base"
@connection_name = "ActiveRecord::Base"
db_config = ActiveRecord::Base.configurations.configs_for(env_name: "arunit", name: "primary")
@pool = @handler.establish_connection(db_config)
end
Expand Down Expand Up @@ -210,19 +210,19 @@ def test_symbolized_configurations_assignment
end

def test_retrieve_connection
assert @handler.retrieve_connection(@owner_name)
assert @handler.retrieve_connection(@connection_name)
end

def test_active_connections?
assert_not_predicate @handler, :active_connections?
assert @handler.retrieve_connection(@owner_name)
assert @handler.retrieve_connection(@connection_name)
assert_predicate @handler, :active_connections?
@handler.clear_active_connections!
assert_not_predicate @handler, :active_connections?
end

def test_retrieve_connection_pool
assert_not_nil @handler.retrieve_connection_pool(@owner_name)
assert_not_nil @handler.retrieve_connection_pool(@connection_name)
end

def test_retrieve_connection_pool_with_invalid_id
Expand Down Expand Up @@ -391,7 +391,7 @@ def test_retrieve_connection_pool_copies_schema_cache_from_ancestor_pool

pid = fork {
rd.close
pool = @handler.retrieve_connection_pool(@owner_name)
pool = @handler.retrieve_connection_pool(@connection_name)
wr.write Marshal.dump pool.schema_cache.size
wr.close
exit!
Expand Down
Expand Up @@ -12,7 +12,7 @@ class ConnectionHandlersMultiDbTest < ActiveRecord::TestCase

def setup
@handler = ConnectionHandler.new
@owner_name = "ActiveRecord::Base"
@connection_name = "ActiveRecord::Base"
db_config = ActiveRecord::Base.configurations.configs_for(env_name: "arunit", name: "primary")
@rw_pool = @handler.establish_connection(db_config)
@ro_pool = @handler.establish_connection(db_config, role: :reading)
Expand Down Expand Up @@ -325,15 +325,15 @@ def test_connection_pools
end

def test_retrieve_connection
assert @handler.retrieve_connection(@owner_name)
assert @handler.retrieve_connection(@owner_name, role: :reading)
assert @handler.retrieve_connection(@connection_name)
assert @handler.retrieve_connection(@connection_name, role: :reading)
end

def test_active_connections?
assert_not_predicate @handler, :active_connections?

assert @handler.retrieve_connection(@owner_name)
assert @handler.retrieve_connection(@owner_name, role: :reading)
assert @handler.retrieve_connection(@connection_name)
assert @handler.retrieve_connection(@connection_name, role: :reading)

assert_predicate @handler, :active_connections?

Expand All @@ -342,8 +342,8 @@ def test_active_connections?
end

def test_retrieve_connection_pool
assert_not_nil @handler.retrieve_connection_pool(@owner_name)
assert_not_nil @handler.retrieve_connection_pool(@owner_name, role: :reading)
assert_not_nil @handler.retrieve_connection_pool(@connection_name)
assert_not_nil @handler.retrieve_connection_pool(@connection_name, role: :reading)
end

def test_retrieve_connection_pool_with_invalid_id
Expand Down
Expand Up @@ -12,7 +12,6 @@ class ConnectionHandlersShardingDbTest < ActiveRecord::TestCase

def setup
@handler = ConnectionHandler.new
@owner_name = "ActiveRecord::Base"
db_config = ActiveRecord::Base.configurations.configs_for(env_name: "arunit", name: "primary")
@rw_pool = @handler.establish_connection(db_config)
@ro_pool = @handler.establish_connection(db_config, role: :reading)
Expand All @@ -29,7 +28,7 @@ def test_establishing_a_connection_in_connected_to_block_uses_current_role_and_s
ActiveRecord::Base.establish_connection(db_config)
assert_nothing_raised { Person.first }

assert_equal [:default, :shard_one], ActiveRecord::Base.connection_handler.send(:owner_to_pool_manager).fetch("ActiveRecord::Base").instance_variable_get(:@name_to_role_mapping).values.flat_map(&:keys).uniq
assert_equal [:default, :shard_one], ActiveRecord::Base.connection_handler.send(:connection_name_to_pool_manager).fetch("ActiveRecord::Base").instance_variable_get(:@name_to_role_mapping).values.flat_map(&:keys).uniq
end
end

Expand Down
8 changes: 4 additions & 4 deletions activerecord/test/cases/connection_pool_test.rb
Expand Up @@ -508,8 +508,8 @@ def test_connection_notification_is_called

@connection_test_model_class.establish_connection :arunit

assert_equal [:config, :shard, :spec_name], payloads[0].keys.sort
assert_equal @connection_test_model_class.name, payloads[0][:spec_name]
assert_equal [:config, :connection_name, :shard], payloads[0].keys.sort
assert_equal @connection_test_model_class.name, payloads[0][:connection_name]
assert_equal ActiveRecord::Base.default_shard, payloads[0][:shard]
ensure
ActiveSupport::Notifications.unsubscribe(subscription) if subscription
Expand All @@ -522,8 +522,8 @@ def test_connection_notification_is_called_for_shard
end
@connection_test_model_class.connects_to shards: { shard_two: { writing: :arunit } }

assert_equal [:config, :shard, :spec_name], payloads[0].keys.sort
assert_equal @connection_test_model_class.name, payloads[0][:spec_name]
assert_equal [:config, :connection_name, :shard], payloads[0].keys.sort
assert_equal @connection_test_model_class.name, payloads[0][:connection_name]
assert_equal :shard_two, payloads[0][:shard]
ensure
ActiveSupport::Notifications.unsubscribe(subscription) if subscription
Expand Down

0 comments on commit 3cbc727

Please sign in to comment.