Skip to content

Commit

Permalink
Merge pull request #48295 from luanzeba/connection_attr_reader
Browse files Browse the repository at this point in the history
Store `connection_pool` in database-related exceptions
  • Loading branch information
matthewd committed Jun 6, 2023
2 parents 54ce634 + 7d4c88d commit 3e01b26
Show file tree
Hide file tree
Showing 15 changed files with 191 additions and 80 deletions.
7 changes: 7 additions & 0 deletions activerecord/CHANGELOG.md
@@ -1,3 +1,10 @@
* Include the `connection_pool` with exceptions raised from an adapter.

The `connection_pool` provides added context such as the connection used
that led to the exception as well as which role and shard.

*Luan Vieira*

* Support multiple column ordering for `find_each`, `find_in_batches` and `in_batches`.

When find_each/find_in_batches/in_batches are performed on a table with composite primary keys, ascending or descending order can be selected for each key.
Expand Down
Expand Up @@ -690,6 +690,8 @@ def new_connection
connection.pool = self
connection.check_version
connection
rescue ConnectionNotEstablished => ex
raise ex.set_pool(self)
end

# If the pool is not at a <tt>@size</tt> limit, establish new connection. Connecting
Expand Down
Expand Up @@ -1154,7 +1154,7 @@ def translate_exception(exception, message:, sql:, binds:)
when RuntimeError, ActiveRecord::ActiveRecordError
exception
else
ActiveRecord::StatementInvalid.new(message, sql: sql, binds: binds)
ActiveRecord::StatementInvalid.new(message, sql: sql, binds: binds, connection_pool: @pool)
end
end

Expand Down
Expand Up @@ -720,7 +720,7 @@ def handle_warnings(sql)
@affected_rows_before_warnings = @raw_connection.affected_rows
result = @raw_connection.query("SHOW WARNINGS")
result.each do |level, code, message|
warning = SQLWarning.new(message, code, level, sql)
warning = SQLWarning.new(message, code, level, sql, @pool)
next if warning_ignored?(warning)

ActiveRecord.db_warnings_action.call(warning)
Expand Down Expand Up @@ -764,40 +764,40 @@ def translate_exception(exception, message:, sql:, binds:)
case error_number(exception)
when nil
if exception.message.match?(/MySQL client is not connected/i)
ConnectionNotEstablished.new(exception)
ConnectionNotEstablished.new(exception, connection_pool: @pool)
else
super
end
when ER_CONNECTION_KILLED, CR_SERVER_GONE_ERROR, CR_SERVER_LOST, ER_CLIENT_INTERACTION_TIMEOUT
ConnectionFailed.new(message, sql: sql, binds: binds)
ConnectionFailed.new(message, sql: sql, binds: binds, connection_pool: @pool)
when ER_DB_CREATE_EXISTS
DatabaseAlreadyExists.new(message, sql: sql, binds: binds)
DatabaseAlreadyExists.new(message, sql: sql, binds: binds, connection_pool: @pool)
when ER_DUP_ENTRY
RecordNotUnique.new(message, sql: sql, binds: binds)
RecordNotUnique.new(message, sql: sql, binds: binds, connection_pool: @pool)
when ER_NO_REFERENCED_ROW, ER_ROW_IS_REFERENCED, ER_ROW_IS_REFERENCED_2, ER_NO_REFERENCED_ROW_2
InvalidForeignKey.new(message, sql: sql, binds: binds)
InvalidForeignKey.new(message, sql: sql, binds: binds, connection_pool: @pool)
when ER_CANNOT_ADD_FOREIGN, ER_FK_INCOMPATIBLE_COLUMNS
mismatched_foreign_key(message, sql: sql, binds: binds)
mismatched_foreign_key(message, sql: sql, binds: binds, connection_pool: @pool)
when ER_CANNOT_CREATE_TABLE
if message.include?("errno: 150")
mismatched_foreign_key(message, sql: sql, binds: binds)
mismatched_foreign_key(message, sql: sql, binds: binds, connection_pool: @pool)
else
super
end
when ER_DATA_TOO_LONG
ValueTooLong.new(message, sql: sql, binds: binds)
ValueTooLong.new(message, sql: sql, binds: binds, connection_pool: @pool)
when ER_OUT_OF_RANGE
RangeError.new(message, sql: sql, binds: binds)
RangeError.new(message, sql: sql, binds: binds, connection_pool: @pool)
when ER_NOT_NULL_VIOLATION, ER_DO_NOT_HAVE_DEFAULT
NotNullViolation.new(message, sql: sql, binds: binds)
NotNullViolation.new(message, sql: sql, binds: binds, connection_pool: @pool)
when ER_LOCK_DEADLOCK
Deadlocked.new(message, sql: sql, binds: binds)
Deadlocked.new(message, sql: sql, binds: binds, connection_pool: @pool)
when ER_LOCK_WAIT_TIMEOUT
LockWaitTimeout.new(message, sql: sql, binds: binds)
LockWaitTimeout.new(message, sql: sql, binds: binds, connection_pool: @pool)
when ER_QUERY_TIMEOUT, ER_FILSORT_ABORT
StatementTimeout.new(message, sql: sql, binds: binds)
StatementTimeout.new(message, sql: sql, binds: binds, connection_pool: @pool)
when ER_QUERY_INTERRUPTED
QueryCanceled.new(message, sql: sql, binds: binds)
QueryCanceled.new(message, sql: sql, binds: binds, connection_pool: @pool)
else
super
end
Expand Down Expand Up @@ -943,11 +943,12 @@ def mismatched_foreign_key_details(message:, sql:)
options
end

def mismatched_foreign_key(message, sql:, binds:)
def mismatched_foreign_key(message, sql:, binds:, connection_pool:)
options = {
message: message,
sql: sql,
binds: binds,
connection_pool: connection_pool
}

if sql
Expand Down
Expand Up @@ -155,6 +155,8 @@ def text_type?(type)

def connect
@raw_connection = self.class.new_client(@connection_parameters)
rescue ConnectionNotEstablished => ex
raise ex.set_pool(@pool)
end

def reconnect
Expand All @@ -179,12 +181,12 @@ def get_full_version

def translate_exception(exception, message:, sql:, binds:)
if exception.is_a?(::Mysql2::Error::TimeoutError) && !exception.error_number
ActiveRecord::AdapterTimeout.new(message, sql: sql, binds: binds)
ActiveRecord::AdapterTimeout.new(message, sql: sql, binds: binds, connection_pool: @pool)
elsif exception.is_a?(::Mysql2::Error::ConnectionError)
if exception.message.match?(/MySQL client is not connected/i)
ActiveRecord::ConnectionNotEstablished.new(exception)
ActiveRecord::ConnectionNotEstablished.new(exception, connection_pool: @pool)
else
ActiveRecord::ConnectionFailed.new(message, sql: sql, binds: binds)
ActiveRecord::ConnectionFailed.new(message, sql: sql, binds: binds, connection_pool: @pool)
end
else
super
Expand Down
Expand Up @@ -742,41 +742,41 @@ def translate_exception(exception, message:, sql:, binds:)
case exception.result.try(:error_field, PG::PG_DIAG_SQLSTATE)
when nil
if exception.message.match?(/connection is closed/i)
ConnectionNotEstablished.new(exception)
ConnectionNotEstablished.new(exception, connection_pool: @pool)
elsif exception.is_a?(PG::ConnectionBad)
# libpq message style always ends with a newline; the pg gem's internal
# errors do not. We separate these cases because a pg-internal
# ConnectionBad means it failed before it managed to send the query,
# whereas a libpq failure could have occurred at any time (meaning the
# server may have already executed part or all of the query).
if exception.message.end_with?("\n")
ConnectionFailed.new(exception)
ConnectionFailed.new(exception, connection_pool: @pool)
else
ConnectionNotEstablished.new(exception)
ConnectionNotEstablished.new(exception, connection_pool: @pool)
end
else
super
end
when UNIQUE_VIOLATION
RecordNotUnique.new(message, sql: sql, binds: binds)
RecordNotUnique.new(message, sql: sql, binds: binds, connection_pool: @pool)
when FOREIGN_KEY_VIOLATION
InvalidForeignKey.new(message, sql: sql, binds: binds)
InvalidForeignKey.new(message, sql: sql, binds: binds, connection_pool: @pool)
when VALUE_LIMIT_VIOLATION
ValueTooLong.new(message, sql: sql, binds: binds)
ValueTooLong.new(message, sql: sql, binds: binds, connection_pool: @pool)
when NUMERIC_VALUE_OUT_OF_RANGE
RangeError.new(message, sql: sql, binds: binds)
RangeError.new(message, sql: sql, binds: binds, connection_pool: @pool)
when NOT_NULL_VIOLATION
NotNullViolation.new(message, sql: sql, binds: binds)
NotNullViolation.new(message, sql: sql, binds: binds, connection_pool: @pool)
when SERIALIZATION_FAILURE
SerializationFailure.new(message, sql: sql, binds: binds)
SerializationFailure.new(message, sql: sql, binds: binds, connection_pool: @pool)
when DEADLOCK_DETECTED
Deadlocked.new(message, sql: sql, binds: binds)
Deadlocked.new(message, sql: sql, binds: binds, connection_pool: @pool)
when DUPLICATE_DATABASE
DatabaseAlreadyExists.new(message, sql: sql, binds: binds)
DatabaseAlreadyExists.new(message, sql: sql, binds: binds, connection_pool: @pool)
when LOCK_NOT_AVAILABLE
LockWaitTimeout.new(message, sql: sql, binds: binds)
LockWaitTimeout.new(message, sql: sql, binds: binds, connection_pool: @pool)
when QUERY_CANCELED
QueryCanceled.new(message, sql: sql, binds: binds)
QueryCanceled.new(message, sql: sql, binds: binds, connection_pool: @pool)
else
super
end
Expand Down Expand Up @@ -940,6 +940,8 @@ def prepare_statement(sql, binds)
# connected server's characteristics.
def connect
@raw_connection = self.class.new_client(@connection_parameters)
rescue ConnectionNotEstablished => ex
raise ex.set_pool(@pool)
end

def reconnect
Expand All @@ -966,7 +968,7 @@ def configure_connection
message = result.error_field(PG::Result::PG_DIAG_MESSAGE_PRIMARY)
code = result.error_field(PG::Result::PG_DIAG_SQLSTATE)
level = result.error_field(PG::Result::PG_DIAG_SEVERITY)
@notice_receiver_sql_warnings << SQLWarning.new(message, code, level)
@notice_receiver_sql_warnings << SQLWarning.new(message, code, level, nil, @pool)
end
end

Expand Down
Expand Up @@ -116,7 +116,7 @@ def initialize(...)
Dir.mkdir(dirname)
rescue Errno::ENOENT => error
if error.message.include?("No such file or directory")
raise ActiveRecord::NoDatabaseError
raise ActiveRecord::NoDatabaseError.new(connection_pool: @pool)
else
raise
end
Expand Down Expand Up @@ -609,13 +609,13 @@ def translate_exception(exception, message:, sql:, binds:)
# Older versions of SQLite return:
# column *column_name* is not unique
if exception.message.match?(/(column(s)? .* (is|are) not unique|UNIQUE constraint failed: .*)/i)
RecordNotUnique.new(message, sql: sql, binds: binds)
RecordNotUnique.new(message, sql: sql, binds: binds, connection_pool: @pool)
elsif exception.message.match?(/(.* may not be NULL|NOT NULL constraint failed: .*)/i)
NotNullViolation.new(message, sql: sql, binds: binds)
NotNullViolation.new(message, sql: sql, binds: binds, connection_pool: @pool)
elsif exception.message.match?(/FOREIGN KEY constraint failed/i)
InvalidForeignKey.new(message, sql: sql, binds: binds)
InvalidForeignKey.new(message, sql: sql, binds: binds, connection_pool: @pool)
elsif exception.message.match?(/called on a closed database/i)
ConnectionNotEstablished.new(exception)
ConnectionNotEstablished.new(exception, connection_pool: @pool)
else
super
end
Expand Down Expand Up @@ -679,6 +679,8 @@ def build_statement_pool

def connect
@raw_connection = self.class.new_client(@connection_parameters)
rescue ConnectionNotEstablished => ex
raise ex.set_pool(@pool)
end

def reconnect
Expand Down
Expand Up @@ -6,10 +6,11 @@ module Trilogy
class LostConnectionExceptionTranslator
attr_reader :exception, :message, :error_number

def initialize(exception, message, error_number)
def initialize(exception, message, error_number, connection_pool)
@exception = exception
@message = message
@error_number = error_number
@connection_pool = connection_pool
end

def translate
Expand All @@ -25,23 +26,23 @@ def translate
def translate_database_exception
case error_number
when ER_SERVER_SHUTDOWN
Errors::ServerShutdown.new(message)
Errors::ServerShutdown.new(message, connection_pool: @connection_pool)
when CR_SERVER_LOST, CR_SERVER_LOST_EXTENDED
Errors::ServerLost.new(message)
Errors::ServerLost.new(message, connection_pool: @connection_pool)
when CR_SERVER_GONE_ERROR
Errors::ServerGone.new(message)
Errors::ServerGone.new(message, connection_pool: @connection_pool)
end
end

def translate_ruby_exception
case exception
when Errno::EPIPE
Errors::BrokenPipe.new(message)
Errors::BrokenPipe.new(message, connection_pool: @connection_pool)
when SocketError, IOError
Errors::SocketError.new(message)
Errors::SocketError.new(message, connection_pool: @connection_pool)
when ::Trilogy::ConnectionError
if message.include?("Connection reset by peer")
Errors::ConnectionResetByPeer.new(message)
Errors::ConnectionResetByPeer.new(message, connection_pool: @connection_pool)
end
end
end
Expand All @@ -51,11 +52,11 @@ def translate_trilogy_exception

case message
when /TRILOGY_CLOSED_CONNECTION/
Errors::ClosedConnection.new(message)
Errors::ClosedConnection.new(message, connection_pool: @connection_pool)
when /TRILOGY_INVALID_SEQUENCE_ID/
Errors::InvalidSequenceId.new(message)
Errors::InvalidSequenceId.new(message, connection_pool: @connection_pool)
when /TRILOGY_UNEXPECTED_PACKET/
Errors::UnexpectedPacket.new(message)
Errors::UnexpectedPacket.new(message, connection_pool: @connection_pool)
end
end
end
Expand Down
Expand Up @@ -192,6 +192,8 @@ def connection=(conn)

def connect
self.connection = self.class.new_client(@config)
rescue ConnectionNotEstablished => ex
raise ex.set_pool(@pool)
end

def reconnect
Expand All @@ -212,11 +214,11 @@ def get_full_version

def translate_exception(exception, message:, sql:, binds:)
if exception.is_a?(::Trilogy::TimeoutError) && !exception.error_code
return ActiveRecord::AdapterTimeout.new(message, sql: sql, binds: binds)
return ActiveRecord::AdapterTimeout.new(message, sql: sql, binds: binds, connection_pool: @pool)
end
error_code = exception.error_code if exception.respond_to?(:error_code)

Trilogy::LostConnectionExceptionTranslator.new(exception, message, error_code).translate || super
Trilogy::LostConnectionExceptionTranslator.new(exception, message, error_code, @pool).translate || super
end

def default_prepared_statements
Expand Down

0 comments on commit 3e01b26

Please sign in to comment.