Skip to content

Commit

Permalink
Add new tests for deferred connection verification and auto-reconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewd committed Jul 26, 2022
1 parent 6693e5f commit 57bc28f
Show file tree
Hide file tree
Showing 14 changed files with 304 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,8 @@ def commit_db_transaction() end
# done if the transaction block raises an exception or returns false.
def rollback_db_transaction
exec_rollback_db_transaction
rescue ActiveRecord::ConnectionNotEstablished, ActiveRecord::ConnectionFailed
reconnect!
end

def exec_rollback_db_transaction() end # :nodoc:
Expand Down Expand Up @@ -478,6 +480,10 @@ def high_precision_current_timestamp
end

private
def internal_execute(sql, name = "SCHEMA")
execute(sql, name)
end

def execute_batch(statements, name = nil)
statements.each do |statement|
execute(statement, name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ def current_savepoint_name
end

def create_savepoint(name = current_savepoint_name)
execute("SAVEPOINT #{name}", "TRANSACTION")
internal_execute("SAVEPOINT #{name}", "TRANSACTION")
end

def exec_rollback_to_savepoint(name = current_savepoint_name)
execute("ROLLBACK TO SAVEPOINT #{name}", "TRANSACTION")
internal_execute("ROLLBACK TO SAVEPOINT #{name}", "TRANSACTION")
end

def release_savepoint(name = current_savepoint_name)
execute("RELEASE SAVEPOINT #{name}", "TRANSACTION")
internal_execute("RELEASE SAVEPOINT #{name}", "TRANSACTION")
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,24 +403,24 @@ def restorable?
def materialize_transactions
return if @materializing_transactions

if @has_unmaterialized_transactions
@connection.lock.synchronize do
begin
@materializing_transactions = true
@stack.each { |t| t.materialize! unless t.materialized? }
ensure
@materializing_transactions = false
end
@has_unmaterialized_transactions = false
end
end

# As a logical simplification for now, we assume anything that requests
# materialization is about to dirty the transaction. Note this is just
# an assumption about the caller, not a direct property of this method.
# It can go away later when callers are able to handle dirtiness for
# themselves.
dirty_current_transaction

return unless @has_unmaterialized_transactions

@connection.lock.synchronize do
begin
@materializing_transactions = true
@stack.each { |t| t.materialize! unless t.materialized? }
ensure
@materializing_transactions = false
end
@has_unmaterialized_transactions = false
end
end

def commit_transaction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,7 @@ def with_raw_connection(allow_retry: false, uses_transaction: true)
end

def retryable_connection_error?(exception)
exception.is_a?(ConnectionNotEstablished)
exception.is_a?(ConnectionNotEstablished) || exception.is_a?(ConnectionFailed)
end

def retryable_query_error?(exception)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,15 +224,15 @@ def begin_isolated_db_transaction(isolation) # :nodoc:
end

def commit_db_transaction # :nodoc:
internal_execute("COMMIT", "TRANSACTION")
internal_execute("COMMIT", "TRANSACTION", allow_retry: false, uses_transaction: true)
end

def exec_rollback_db_transaction # :nodoc:
internal_execute("ROLLBACK", "TRANSACTION")
internal_execute("ROLLBACK", "TRANSACTION", allow_retry: false, uses_transaction: true)
end

def exec_restart_db_transaction # :nodoc:
internal_execute("ROLLBACK AND CHAIN", "TRANSACTION")
internal_execute("ROLLBACK AND CHAIN", "TRANSACTION", allow_retry: false, uses_transaction: true)
end

def empty_insert_statement_value(primary_key = nil) # :nodoc:
Expand Down Expand Up @@ -679,8 +679,12 @@ def internal_execute(sql, name = "SCHEMA", allow_retry: true, uses_transaction:
ER_CANNOT_CREATE_TABLE = 1005
ER_LOCK_WAIT_TIMEOUT = 1205
ER_QUERY_INTERRUPTED = 1317
ER_CONNECTION_KILLED = 1927
CR_SERVER_GONE_ERROR = 2006
CR_SERVER_LOST = 2013
ER_QUERY_TIMEOUT = 3024
ER_FK_INCOMPATIBLE_COLUMNS = 3780
ER_CLIENT_INTERACTION_TIMEOUT = 4031

def translate_exception(exception, message:, sql:, binds:)
case error_number(exception)
Expand All @@ -690,6 +694,8 @@ def translate_exception(exception, message:, sql:, binds:)
else
super
end
when ER_CONNECTION_KILLED, CR_SERVER_GONE_ERROR, CR_SERVER_LOST, ER_CLIENT_INTERACTION_TIMEOUT
ConnectionFailed.new(message, sql: sql, binds: binds)
when ER_DB_CREATE_EXISTS
DatabaseAlreadyExists.new(message, sql: sql, binds: binds)
when ER_DUP_ENTRY
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ def get_full_version
def translate_exception(exception, message:, sql:, binds:)
if exception.is_a?(Mysql2::Error::TimeoutError) && !exception.error_number
ActiveRecord::AdapterTimeout.new(message, sql: sql, binds: binds)
elsif exception.is_a?(Mysql2::Error::ConnectionError)
if exception.message.match?(/MySQL client is not connected/i)
ActiveRecord::ConnectionNotEstablished.new(exception)
else
ActiveRecord::ConnectionFailed.new(message, sql: sql, binds: binds)
end
else
super
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,24 +125,18 @@ def begin_isolated_db_transaction(isolation) # :nodoc:

# Commits a transaction.
def commit_db_transaction # :nodoc:
internal_execute("COMMIT", "TRANSACTION")
internal_execute("COMMIT", "TRANSACTION", allow_retry: false, uses_transaction: true)
end

# Aborts a transaction.
def exec_rollback_db_transaction # :nodoc:
if @raw_connection
@raw_connection.cancel unless @raw_connection.transaction_status == PG::PQTRANS_IDLE
@raw_connection.block
end
internal_execute("ROLLBACK", "TRANSACTION")
cancel_any_running_query
internal_execute("ROLLBACK", "TRANSACTION", allow_retry: false, uses_transaction: true)
end

def exec_restart_db_transaction # :nodoc:
if @raw_connection
@raw_connection.cancel unless @raw_connection.transaction_status == PG::PQTRANS_IDLE
@raw_connection.block
end
internal_execute("ROLLBACK AND CHAIN", "TRANSACTION")
cancel_any_running_query
internal_execute("ROLLBACK AND CHAIN", "TRANSACTION", allow_retry: false, uses_transaction: true)
end

# From https://www.postgresql.org/docs/current/functions-datetime.html#FUNCTIONS-DATETIME-CURRENT
Expand All @@ -154,6 +148,13 @@ def high_precision_current_timestamp
end

private
def cancel_any_running_query
return unless @raw_connection && @raw_connection.transaction_status != PG::PQTRANS_IDLE
@raw_connection.cancel
@raw_connection.block
rescue PG::Error
end

def execute_batch(statements, name = nil)
execute(combine_multi_statements(statements))
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -680,8 +680,17 @@ def translate_exception(exception, message:, sql:, binds:)
when nil
if exception.message.match?(/connection is closed/i)
ConnectionNotEstablished.new(exception)
elsif exception.is_a?(PG::ConnectionBad) && !exception.message.end_with?("\n")
ConnectionNotEstablished.new(exception)
elsif exception.is_a?(PG::ConnectionBad)
# libpq message style always ends with a newline; the pg gem's internal
# errors do not. We separate these cases because a pg-internal
# ConnectionBad means it failed before it managed to send the query,
# whereas a libpq failure could have occurred at any time (meaning the
# server may have already executed part or all of the query).
if exception.message.end_with?("\n")
ConnectionFailed.new(exception)
else
ConnectionNotEstablished.new(exception)
end
else
super
end
Expand Down
5 changes: 5 additions & 0 deletions activerecord/lib/active_record/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,11 @@ class QueryCanceled < QueryAborted
class AdapterTimeout < QueryAborted
end

# ConnectionFailed will be raised when the network connection to the
# database fails while sending a query or waiting for its result.
class ConnectionFailed < QueryAborted
end

# UnknownAttributeReference is raised when an unknown and potentially unsafe
# value is passed to a query method. For example, passing a non column name
# value to a relation's #order method might cause this exception.
Expand Down

0 comments on commit 57bc28f

Please sign in to comment.