Skip to content

Commit

Permalink
Merge pull request #44576 from rails/defer-db-verify
Browse files Browse the repository at this point in the history
Defer verification of database connections
  • Loading branch information
matthewd committed Jul 29, 2022
2 parents b623494 + 57bc28f commit 7fe221d
Show file tree
Hide file tree
Showing 23 changed files with 732 additions and 354 deletions.
Expand Up @@ -705,7 +705,7 @@ def checkout_new_connection

def checkout_and_verify(c)
c._run_checkout_callbacks do
c.verify!
c.clean!
end
c
rescue
Expand Down
Expand Up @@ -389,6 +389,8 @@ def commit_db_transaction() end
# done if the transaction block raises an exception or returns false.
def rollback_db_transaction
exec_rollback_db_transaction
rescue ActiveRecord::ConnectionNotEstablished, ActiveRecord::ConnectionFailed
reconnect!
end

def exec_rollback_db_transaction() end # :nodoc:
Expand Down Expand Up @@ -478,6 +480,10 @@ def high_precision_current_timestamp
end

private
def internal_execute(sql, name = "SCHEMA")
execute(sql, name)
end

def execute_batch(statements, name = nil)
statements.each do |statement|
execute(statement, name)
Expand Down
Expand Up @@ -8,15 +8,15 @@ def current_savepoint_name
end

def create_savepoint(name = current_savepoint_name)
execute("SAVEPOINT #{name}", "TRANSACTION")
internal_execute("SAVEPOINT #{name}", "TRANSACTION")
end

def exec_rollback_to_savepoint(name = current_savepoint_name)
execute("ROLLBACK TO SAVEPOINT #{name}", "TRANSACTION")
internal_execute("ROLLBACK TO SAVEPOINT #{name}", "TRANSACTION")
end

def release_savepoint(name = current_savepoint_name)
execute("RELEASE SAVEPOINT #{name}", "TRANSACTION")
internal_execute("RELEASE SAVEPOINT #{name}", "TRANSACTION")
end
end
end
Expand Down
Expand Up @@ -142,7 +142,10 @@ def materialized?
end

def restore!
@materialized = false
if materialized?
@materialized = false
materialize!
end
end

def rollback_records
Expand Down Expand Up @@ -390,8 +393,6 @@ def restore_transactions

@stack.each(&:restore!)

materialize_transactions unless @lazy_transactions_enabled

true
end

Expand All @@ -402,24 +403,24 @@ def restorable?
def materialize_transactions
return if @materializing_transactions

if @has_unmaterialized_transactions
@connection.lock.synchronize do
begin
@materializing_transactions = true
@stack.each { |t| t.materialize! unless t.materialized? }
ensure
@materializing_transactions = false
end
@has_unmaterialized_transactions = false
end
end

# As a logical simplification for now, we assume anything that requests
# materialization is about to dirty the transaction. Note this is just
# an assumption about the caller, not a direct property of this method.
# It can go away later when callers are able to handle dirtiness for
# themselves.
dirty_current_transaction

return unless @has_unmaterialized_transactions

@connection.lock.synchronize do
begin
@materializing_transactions = true
@stack.each { |t| t.materialize! unless t.materialized? }
ensure
@materializing_transactions = false
end
@has_unmaterialized_transactions = false
end
end

def commit_transaction
Expand Down
196 changes: 179 additions & 17 deletions activerecord/lib/active_record/connection_adapters/abstract_adapter.rb
Expand Up @@ -114,6 +114,7 @@ def initialize(connection, logger = nil, config = {}) # :nodoc:
@default_timezone = self.class.validate_default_timezone(config[:default_timezone])

@raw_connection_dirty = false
@verified = false

configure_connection
end
Expand Down Expand Up @@ -145,6 +146,10 @@ def use_metadata_table?
@config.fetch(:use_metadata_table, true)
end

def connection_retries
(@config[:connection_retries] || 3).to_i
end

def default_timezone
@default_timezone || ActiveRecord.default_timezone
end
Expand Down Expand Up @@ -552,17 +557,42 @@ def all_foreign_keys_valid?
def active?
end

# Disconnects from the database if already connected, and establishes a
# new connection with the database. Implementors should call super
# immediately after establishing the new connection (and while still
# holding @lock).
# Disconnects from the database if already connected, and establishes a new
# connection with the database. Implementors should define private #reconnect
# instead.
def reconnect!(restore_transactions: false)
reset_transaction(restore: restore_transactions) do
clear_cache!(new_connection: true)
configure_connection
retries_available = connection_retries

@lock.synchronize do
reconnect

enable_lazy_transactions!
@raw_connection_dirty = false
@verified = true

reset_transaction(restore: restore_transactions) do
clear_cache!(new_connection: true)
configure_connection
end
rescue => original_exception
translated_exception = translate_exception_class(original_exception, nil, nil)

if retries_available > 0
retries_available -= 1

if retryable_connection_error?(translated_exception)
backoff(connection_retries - retries_available)
retry
end
end

@verified = false

raise translated_exception
end
end


# Disconnects from the database if already connected. Otherwise, this
# method does nothing.
def disconnect!
Expand Down Expand Up @@ -628,7 +658,13 @@ def requires_reloading?
# This is done under the hood by calling #active?. If the connection
# is no longer active, then this method will reconnect to the database.
def verify!
reconnect! unless active?
reconnect!(restore_transactions: true) unless active?
@verified = true
end

def clean! # :nodoc:
@raw_connection_dirty = false
@verified = nil
end

# Provides access to the underlying database driver for this adapter. For
Expand All @@ -638,9 +674,11 @@ def verify!
# This is useful for when you need to call a proprietary method such as
# PostgreSQL's lo_* methods.
def raw_connection
disable_lazy_transactions!
@raw_connection_dirty = true
@raw_connection
with_raw_connection do |conn|
disable_lazy_transactions!
@raw_connection_dirty = true
conn
end
end

def default_uniqueness_comparison(attribute, value) # :nodoc:
Expand Down Expand Up @@ -796,6 +834,130 @@ def reconnect_can_restore_state?
transaction_manager.restorable? && !@raw_connection_dirty
end

# Lock the monitor, ensure we're properly connected and
# transactions are materialized, and then yield the underlying
# raw connection object.
#
# If +allow_retry+ is true, a connection-related exception will
# cause an automatic reconnect and re-run of the block, up to
# the connection's configured +connection_retries+ setting.
#
# If +uses_transaction+ is false, the block will be run without
# ensuring virtual transactions have been materialized in the DB
# server's state. The active transaction will also remain clean
# (if it is not already dirty), meaning it's able to be restored
# by reconnecting and opening an equivalent-depth set of new
# transactions. This should only be used by transaction control
# methods, and internal transaction-agnostic queries.
#
###
#
# It's not the primary use case, so not something to optimize
# for, but note that this method does need to be re-entrant:
# +materialize_transactions+ will re-enter if it has work to do,
# and the yield block can also do so under some circumstances.
#
# In the latter case, we really ought to guarantee the inner
# call will not reconnect (which would interfere with the
# still-yielded connection in the outer block), but we currently
# provide no special enforcement there.
#
def with_raw_connection(allow_retry: false, uses_transaction: true)
@lock.synchronize do
materialize_transactions if uses_transaction

retries_available = allow_retry ? connection_retries : 0
reconnectable = reconnect_can_restore_state?

if @verified
# Cool, we're confident the connection's ready to use. (Note this might have
# become true during the above #materialize_transactions.)
elsif reconnectable
if allow_retry
# Not sure about the connection yet, but if anything goes wrong we can
# just reconnect and re-run our query
else
# We can reconnect if needed, but we don't trust the upcoming query to be
# safely re-runnable: let's verify the connection to be sure
verify!
end
else
# We don't know whether the connection is okay, but it also doesn't matter:
# we wouldn't be able to reconnect anyway. We're just going to run our query
# and hope for the best.
end

begin
result = yield @raw_connection
@verified = true
result
rescue => original_exception
translated_exception = translate_exception_class(original_exception, nil, nil)

if retries_available > 0
retries_available -= 1

if retryable_query_error?(translated_exception)
backoff(connection_retries - retries_available)
retry
elsif reconnectable && retryable_connection_error?(translated_exception)
reconnect!(restore_transactions: true)
# Only allowed to reconnect once, because reconnect! has its own retry
# loop
reconnectable = false
retry
end
end

raise translated_exception
ensure
dirty_current_transaction if uses_transaction
end
end
end

def retryable_connection_error?(exception)
exception.is_a?(ConnectionNotEstablished) || exception.is_a?(ConnectionFailed)
end

def retryable_query_error?(exception)
# We definitely can't retry if we were inside a transaction that was instantly
# rolled back by this error
if exception.is_a?(TransactionRollbackError) && savepoint_errors_invalidate_transactions? && open_transactions > 0
false
else
exception.is_a?(Deadlocked) || exception.is_a?(LockWaitTimeout)
end
end

def backoff(counter)
sleep 0.1 * counter
end

def reconnect
raise NotImplementedError
end

# Returns a raw connection for internal use with methods that are known
# to both be thread-safe and not rely upon actual server communication.
# This is useful for e.g. string escaping methods.
def any_raw_connection
@raw_connection
end

# Similar to any_raw_connection, but ensures it is validated and
# connected. Any method called on this result still needs to be
# independently thread-safe, so it probably shouldn't talk to the
# server... but some drivers fail if they know the connection has gone
# away.
def valid_raw_connection
(@verified && @raw_connection) ||
# `allow_retry: false`, to force verification: the block won't
# raise, so a retry wouldn't help us get the valid connection we
# need.
with_raw_connection(allow_retry: false, uses_transaction: false) { |conn| conn }
end

def extended_type_map_key
if @default_timezone
{ default_timezone: @default_timezone }
Expand Down Expand Up @@ -831,11 +993,11 @@ def log(sql, name = "SQL", binds = [], type_casted_binds = [], statement_name =
type_casted_binds: type_casted_binds,
statement_name: statement_name,
async: async,
connection: self) do
@lock.synchronize(&block)
rescue => e
raise translate_exception_class(e, sql, binds)
end
connection: self,
&block
)
rescue ActiveRecord::StatementInvalid => ex
raise ex.set_query(sql, binds)
end

def transform_query(sql)
Expand All @@ -848,7 +1010,7 @@ def transform_query(sql)
def translate_exception(exception, message:, sql:, binds:)
# override in derived class
case exception
when RuntimeError
when RuntimeError, ActiveRecord::ActiveRecordError
exception
else
ActiveRecord::StatementInvalid.new(message, sql: sql, binds: binds)
Expand Down

0 comments on commit 7fe221d

Please sign in to comment.