Skip to content

Commit

Permalink
TypeMap (PG): Store Typemap in Schema Cache and use another cache cla…
Browse files Browse the repository at this point in the history
…ss to keep it in memory
  • Loading branch information
mochnatiy committed Mar 14, 2023
1 parent 95fe1f3 commit b37a453
Show file tree
Hide file tree
Showing 10 changed files with 280 additions and 16 deletions.
Expand Up @@ -11,7 +11,7 @@ module ActiveRecord
module ConnectionAdapters
module AbstractPool # :nodoc:
def get_schema_cache(connection)
self.schema_cache ||= SchemaCache.new(connection)
self.schema_cache ||= connection.init_schema_cache
schema_cache.connection = connection
schema_cache
end
Expand All @@ -24,6 +24,8 @@ def lazily_set_schema_cache
return unless ActiveRecord.lazily_load_schema_cache

cache = SchemaCache.load_from(db_config.lazy_schema_cache_path)
PostgreSQL::TypeMapCache.init(cache) if connection.adapter_name == "PostgreSQL"

set_schema_cache(cache)
end
end
Expand Down
Expand Up @@ -203,6 +203,10 @@ def with_instrumenter(instrumenter, &block) # :nodoc:
end
end

def init_schema_cache
SchemaCache.new(self)
end

def check_if_write_query(sql) # :nodoc:
if preventing_writes? && write_query?(sql)
raise ActiveRecord::ReadOnlyError, "Write query attempted while in readonly mode: #{sql}"
Expand Down
@@ -0,0 +1,45 @@
# frozen_string_literal: true

module ActiveRecord
module ConnectionAdapters
module PostgreSQL
class SchemaCache < ActiveRecord::ConnectionAdapters::SchemaCache
attr_accessor :additional_type_records, :known_coder_type_records

def initialize(conn)
super(conn)

@additional_type_records = PostgreSQL::TypeMapCache.instance.additional_type_records || []
@known_coder_type_records = PostgreSQL::TypeMapCache.instance.known_coder_type_records || []
end

def encode_with(coder)
super

coder["additional_type_records"] = @additional_type_records
coder["known_coder_type_records"] = @known_coder_type_records
end

def init_with(coder)
@additional_type_records = coder["additional_type_records"]
@known_coder_type_records = coder["known_coder_type_records"]

super
end

def marshal_dump
reset_version!

[@version, @columns, {}, @primary_keys, @data_sources, @indexes, database_version, @known_coder_type_records, @additional_type_records]
end

def marshal_load(array)
@version, @columns, _columns_hash, @primary_keys, @data_sources, @indexes, @database_version, @known_coder_type_records, @additional_type_records = array
@indexes ||= {}

derive_columns_hash_and_deduplicate_values
end
end
end
end
end
@@ -0,0 +1,34 @@
# frozen_string_literal: true

module ActiveRecord
# :stopdoc:
module ConnectionAdapters
module PostgreSQL
class TypeMapCache
include Singleton

attr_accessor :additional_type_records
attr_accessor :known_coder_type_records

def initialize
@additional_type_records = []
@known_coder_type_records = []
end

class << self
def init(schema_cache)
return if schema_cache.nil? || !schema_cache.is_a?(PostgreSQL::SchemaCache)

self.instance.known_coder_type_records = schema_cache.known_coder_type_records
self.instance.additional_type_records = schema_cache.additional_type_records
end

def clear
self.instance.additional_type_records = []
self.instance.known_coder_type_records = []
end
end
end
end
end
end
Expand Up @@ -12,10 +12,12 @@
require "active_record/connection_adapters/postgresql/oid"
require "active_record/connection_adapters/postgresql/quoting"
require "active_record/connection_adapters/postgresql/referential_integrity"
require "active_record/connection_adapters/postgresql/schema_cache"
require "active_record/connection_adapters/postgresql/schema_creation"
require "active_record/connection_adapters/postgresql/schema_definitions"
require "active_record/connection_adapters/postgresql/schema_dumper"
require "active_record/connection_adapters/postgresql/schema_statements"
require "active_record/connection_adapters/postgresql/type_map_cache"
require "active_record/connection_adapters/postgresql/type_metadata"
require "active_record/connection_adapters/postgresql/utils"

Expand Down Expand Up @@ -346,6 +348,7 @@ def reload_type_map # :nodoc:
@type_map = Type::HashLookupTypeMap.new
end

PostgreSQL::TypeMapCache.clear
initialize_type_map
end
end
Expand Down Expand Up @@ -667,6 +670,10 @@ def initialize_type_map(m) # :nodoc:
end
end

def init_schema_cache
PostgreSQL::SchemaCache.new(self)
end

private
def type_map
@type_map ||= Type::HashLookupTypeMap.new
Expand Down Expand Up @@ -797,9 +804,17 @@ def get_oid_type(oid, fmod, column_name, sql_type = "")

def load_additional_types(oids = nil)
initializer = OID::TypeMapInitializer.new(type_map)
load_types_queries(initializer, oids) do |query|
execute_and_clear(query, "SCHEMA", [], allow_retry: true, uses_transaction: false) do |records|
initializer.run(records)
# Will not work when dumping, a dump file should be recreated on each
# schema_cache:dump
if should_load_types_from_cache?(oids)
records = PostgreSQL::TypeMapCache.instance.additional_type_records
initializer.run(records)
else
load_types_queries(initializer, oids) do |query|
execute_and_clear(query, "SCHEMA", [], allow_retry: true, uses_transaction: false) do |records|
PostgreSQL::TypeMapCache.instance.additional_type_records |= records.to_a
initializer.run(records)
end
end
end
end
Expand Down Expand Up @@ -1123,14 +1138,24 @@ def add_pg_decoders
"timestamptz" => PG::TextDecoder::TimestampWithTimeZone,
}

known_coder_types = coders_by_name.keys.map { |n| quote(n) }
query = <<~SQL % known_coder_types.join(", ")
SELECT t.oid, t.typname
FROM pg_type as t
WHERE t.typname IN (%s)
SQL
coders = execute_and_clear(query, "SCHEMA", [], allow_retry: true, uses_transaction: false) do |result|
result.filter_map { |row| construct_coder(row, coders_by_name[row["typname"]]) }
if PostgreSQL::TypeMapCache.instance.known_coder_type_records.present?
coders = PostgreSQL::TypeMapCache.instance
.known_coder_type_records
.filter_map { |row| construct_coder(row, coders_by_name[row["typname"]]) }
else
known_coder_types = coders_by_name.keys.map { |n| quote(n) }

query = <<~SQL % known_coder_types.join(", ")
SELECT t.oid, t.typname
FROM pg_type as t
WHERE t.typname IN (%s)
SQL

coders = execute_and_clear(query, "SCHEMA", [], allow_retry: true, uses_transaction: false) do |result|
PostgreSQL::TypeMapCache.instance.known_coder_type_records |= result.to_a

result.filter_map { |row| construct_coder(row, coders_by_name[row["typname"]]) }
end
end

map = PG::TypeMapByOid.new
Expand All @@ -1152,6 +1177,15 @@ def construct_coder(row, coder_class)
coder_class.new(oid: row["oid"].to_i, name: row["typname"])
end

def should_load_types_from_cache?(oids)
if oids.blank?
PostgreSQL::TypeMapCache.instance.additional_type_records.present?
else
cached_oids = PostgreSQL::TypeMapCache.instance.additional_type_records.map { |oid| oid["oid"] }
(oids - cached_oids).empty?
end
end

class MoneyDecoder < PG::SimpleDecoder # :nodoc:
TYPE = OID::Money.new

Expand Down
11 changes: 10 additions & 1 deletion activerecord/lib/active_record/railtie.rb
Expand Up @@ -145,9 +145,18 @@ class Railtie < Rails::Railtie # :nodoc:
schema_cache_path: db_config.schema_cache_path
)

cache = ActiveRecord::ConnectionAdapters::SchemaCache.load_from(filename)
cache = if connection_pool.db_config.configuration_hash[:adapter] == "postgresql"
ActiveRecord::ConnectionAdapters::PostgreSQL::SchemaCache.load_from(filename)
else
ActiveRecord::ConnectionAdapters::SchemaCache.load_from(filename)
end

next if cache.nil?

if connection_pool.db_config.configuration_hash[:adapter] == "postgresql"
ActiveRecord::ConnectionAdapters::PostgreSQL::TypeMapCache.init(cache)
end

if check_schema_cache_dump_version
current_version = begin
ActiveRecord::Migrator.current_version
Expand Down
Expand Up @@ -26,16 +26,15 @@ def test_connection_error

def test_reconnection_error
fake_connection = Class.new do
attr_accessor :type_map_for_results

def async_exec(*)
[{}]
end

def type_map_for_queries=(_)
end

def type_map_for_results=(_)
end

def exec_params(*)
{}
end
Expand Down Expand Up @@ -452,6 +451,7 @@ def test_reload_type_map_for_newly_defined_types

def test_only_reload_type_map_once_for_every_unrecognized_type
reset_connection

connection = ActiveRecord::Base.connection

silence_warnings do
Expand Down
@@ -0,0 +1,84 @@
# frozen_string_literal: true

require "cases/helper"

module ActiveRecord
module ConnectionAdapters
module PostgreSQL
class SchemaCacheTest < ActiveRecord::TestCase
if current_adapter?(:PostgreSQLAdapter)
def setup
@connection = ActiveRecord::Base.connection
end

def test_type_map_existence_in_schema_cache
assert_not(@connection.schema_cache.additional_type_records.empty?)
assert_not(@connection.schema_cache.known_coder_type_records.empty?)
end

def test_type_map_queries_when_initialize_connection
db_config = ActiveRecord::Base.configurations.configs_for(
env_name: "arunit",
name: "primary"
)

assert_no_sql("SELECT t.oid, t.typname") do
ActiveRecord::Base.postgresql_connection(db_config.configuration_hash)
end
end

def test_type_map_cache_with_lazy_load_option
PostgreSQL::TypeMapCache.clear
tempfile = Tempfile.new(["schema_cache-", ".yml"])

original_config = ActiveRecord::Base.connection_db_config
new_config = original_config.configuration_hash.merge(schema_cache_path: tempfile.path)

ActiveRecord::Base.establish_connection(new_config)

assert_not_empty(PostgreSQL::TypeMapCache.instance.additional_type_records)
assert_not_empty(PostgreSQL::TypeMapCache.instance.known_coder_type_records)

assert_not_empty(ActiveRecord::Base.connection.schema_cache.instance_variable_get(:@known_coder_type_records))
assert_not_empty(ActiveRecord::Base.connection.schema_cache.instance_variable_get(:@additional_type_records))

cache = PostgreSQL::SchemaCache.new(ActiveRecord::Base.connection)

cache.dump_to(tempfile.path)
ActiveRecord::Base.connection.schema_cache = cache

assert(File.exist?(tempfile))

ActiveRecord.lazily_load_schema_cache = true

PostgreSQL::TypeMapCache.clear

assert_sql(/SELECT t.oid, t.typname/) do
ActiveRecord::Base.establish_connection(new_config)
end
end

def test_type_map_queries_with_custom_types
cache = SchemaCache.new(@connection)
tempfile = Tempfile.new(["schema_cache-", ".yml"])

assert_no_sql("SELECT t.oid, t.typname") do
cache.dump_to(tempfile.path)
end

cache = SchemaCache.load_from(tempfile.path)
cache.connection = @connection

assert_sql(/SELECT t.oid, t.typname, t.typelem/) do
@connection.execute("CREATE TYPE account_status AS ENUM ('new', 'open', 'closed');")
@connection.execute("ALTER TABLE accounts ADD status account_status NOT NULL DEFAULT 'new';")
cache.dump_to(tempfile.path)
end
ensure
@connection.execute("DELETE FROM accounts; ALTER TABLE accounts DROP COLUMN status;DROP TYPE IF EXISTS account_status;")
end
end
end
end
end
end
@@ -0,0 +1,40 @@
# frozen_string_literal: true

require "cases/helper"

module ActiveRecord
module ConnectionAdapters
module PostgreSQL
class TypeMapCacheTest < ActiveRecord::TestCase
if current_adapter?(:PostgreSQLAdapter)
def setup
@connection = ActiveRecord::Base.connection
@type_map_cache = TypeMapCache.instance
end

def test_type_map_cache_init_with_schema_cache
assert(@type_map_cache.additional_type_records.empty?)
assert(@type_map_cache.known_coder_type_records.empty?)

TypeMapCache.init(@connection.schema_cache)

assert_not(@type_map_cache.additional_type_records.empty?)
assert_not(@type_map_cache.known_coder_type_records.empty?)
end

def test_type_map_cache_clear
TypeMapCache.init(@connection.schema_cache)

assert_not(@type_map_cache.additional_type_records.empty?)
assert_not(@type_map_cache.known_coder_type_records.empty?)

TypeMapCache.clear

assert(@type_map_cache.additional_type_records.empty?)
assert(@type_map_cache.known_coder_type_records.empty?)
end
end
end
end
end
end

0 comments on commit b37a453

Please sign in to comment.