Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add comprehensive locking around DB transactions #28726

Merged
merged 1 commit into from
Apr 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,57 +149,67 @@ def initialize(connection)
end

def begin_transaction(options = {})
run_commit_callbacks = !current_transaction.joinable?
transaction =
if @stack.empty?
RealTransaction.new(@connection, options, run_commit_callbacks: run_commit_callbacks)
else
SavepointTransaction.new(@connection, "active_record_#{@stack.size}", options,
run_commit_callbacks: run_commit_callbacks)
end
@connection.lock.synchronize do
run_commit_callbacks = !current_transaction.joinable?
transaction =
if @stack.empty?
RealTransaction.new(@connection, options, run_commit_callbacks: run_commit_callbacks)
else
SavepointTransaction.new(@connection, "active_record_#{@stack.size}", options,
run_commit_callbacks: run_commit_callbacks)
end

@stack.push(transaction)
transaction
@stack.push(transaction)
transaction
end
end

def commit_transaction
transaction = @stack.last
@connection.lock.synchronize do
transaction = @stack.last

begin
transaction.before_commit_records
ensure
@stack.pop
end
begin
transaction.before_commit_records
ensure
@stack.pop
end

transaction.commit
transaction.commit_records
transaction.commit
transaction.commit_records
end
end

def rollback_transaction(transaction = nil)
transaction ||= @stack.pop
transaction.rollback
transaction.rollback_records
@connection.lock.synchronize do
transaction ||= @stack.pop
transaction.rollback
transaction.rollback_records
end
end

def within_new_transaction(options = {})
transaction = begin_transaction options
yield
rescue Exception => error
if transaction
rollback_transaction
after_failure_actions(transaction, error)
end
raise
ensure
unless error
if Thread.current.status == "aborting"
rollback_transaction if transaction
else
begin
commit_transaction
rescue Exception
rollback_transaction(transaction) unless transaction.state.completed?
raise
@connection.lock.synchronize do
begin
transaction = begin_transaction options
yield
rescue Exception => error
if transaction
rollback_transaction
after_failure_actions(transaction, error)
end
raise
ensure
unless error
if Thread.current.status == "aborting"
rollback_transaction if transaction
else
begin
commit_transaction
rescue Exception
rollback_transaction(transaction) unless transaction.state.completed?
raise
end
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class AbstractAdapter
SIMPLE_INT = /\A\d+\z/

attr_accessor :visitor, :pool
attr_reader :schema_cache, :owner, :logger, :prepared_statements
attr_reader :schema_cache, :owner, :logger, :prepared_statements, :lock
alias :in_use? :owner

def self.type_cast_config_to_integer(config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,34 +239,42 @@ def truncate(table_name, name = nil)

# Is this connection alive and ready for queries?
def active?
@connection.query "SELECT 1"
@lock.synchronize do
@connection.query "SELECT 1"
end
true
rescue PG::Error
false
end

# Close then reopen the connection.
def reconnect!
super
@connection.reset
configure_connection
@lock.synchronize do
super
@connection.reset
configure_connection
end
end

def reset!
clear_cache!
reset_transaction
unless @connection.transaction_status == ::PG::PQTRANS_IDLE
@connection.query "ROLLBACK"
@lock.synchronize do
clear_cache!
reset_transaction
unless @connection.transaction_status == ::PG::PQTRANS_IDLE
@connection.query "ROLLBACK"
end
@connection.query "DISCARD ALL"
configure_connection
end
@connection.query "DISCARD ALL"
configure_connection
end

# Disconnects from the database if already connected. Otherwise, this
# method does nothing.
def disconnect!
super
@connection.close rescue nil
@lock.synchronize do
super
@connection.close rescue nil
end
end

def native_database_types #:nodoc:
Expand Down