Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to register transaction callbacks outside of a record #51474

Merged
merged 1 commit into from Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
48 changes: 48 additions & 0 deletions activerecord/CHANGELOG.md
@@ -1,3 +1,51 @@
* `ActiveRecord::Base.transaction` now yields an `ActiveRecord::Transation` object.
casperisfine marked this conversation as resolved.
Show resolved Hide resolved

This allows to register callbacks on it.

```ruby
Article.transaction do |transaction|
casperisfine marked this conversation as resolved.
Show resolved Hide resolved
article.update(published: true)
transaction.after_commit do
PublishNotificationMailer.with(article: article).deliver_later
end
end
```

*Jean Boussier*

* Add `ActiveRecord::Base.current_transaction`.

Returns the current transaction, to allow registering callbacks on it.

```ruby
Article.current_transaction.after_commit do
PublishNotificationMailer.with(article: article).deliver_later
end
```

*Jean Boussier*

* Add `ActiveRecord.after_all_transactions_commit` callback.

Useful for code that may run either inside or outside a transaction and need
to perform works after the state changes have been properly peristed.

```ruby
def publish_article(article)
article.update(published: true)
ActiveRecord.after_all_transactions_commit do
PublishNotificationMailer.with(article: article).deliver_later
end
end
```

In the above example, the block is either executed immediately if called outside
of a transaction, or called after the open transaction is committed.

If the transaction is rolled back, the block isn't called.

*Jean Boussier*

* Add the ability to ignore counter cache columns until they are backfilled

Starting to use counter caches on existing large tables can be troublesome, because the column
Expand Down
46 changes: 46 additions & 0 deletions activerecord/lib/active_record.rb
Expand Up @@ -86,6 +86,7 @@ module ActiveRecord
autoload :Timestamp
autoload :TokenFor
autoload :TouchLater
autoload :Transaction
autoload :Transactions
casperisfine marked this conversation as resolved.
Show resolved Hide resolved
autoload :Translation
autoload :Validations
Expand Down Expand Up @@ -540,6 +541,51 @@ def self.eager_load!
def self.disconnect_all!
ConnectionAdapters::PoolConfig.disconnect_all!
end

# Registers a block to be called after all the current transactions have been
# committed.
#
# If there is no currently open transaction, the block is called immediately.
#
# If there are multiple nested transactions, the block is called after the outermost one
# has been committed,
#
# If any of the currently open transactions is rolled back, the block is never called.
#
# If multiple transactions are open across multiple databases, the block will be invoked
# if and once all of them have been committed. But note that nesting transactions across
# two distinct databases is a sharding anti-pattern that comes with a world of hurts.
def self.after_all_transactions_commit(&block)
open_transactions = all_open_transactions

if open_transactions.empty?
yield
elsif open_transactions.size == 1
open_transactions.first.after_commit(&block)
else
count = open_transactions.size
callback = -> do
count -= 1
block.call if count.zero?
end
open_transactions.each do |t|
t.after_commit(&callback)
end
open_transactions = nil # rubocop:disable Lint/UselessAssignment avoid holding it in the closure
end
end

def self.all_open_transactions # :nodoc:
open_transactions = []
Base.connection_handler.each_connection_pool do |pool|
if active_connection = pool.active_connection
if active_connection.current_transaction.open? && active_connection.current_transaction.joinable?
open_transactions << active_connection.current_transaction
end
end
end
open_transactions
end
end

ActiveSupport.on_load(:active_record) do
Expand Down
Expand Up @@ -235,6 +235,17 @@ def truncate_tables(*table_names) # :nodoc:
# Runs the given block in a database transaction, and returns the result
# of the block.
#
# == Transaction callbacks
#
# #transaction yields an ActiveRecord::Transaction object on which it is
# possible to register callback:
#
# ActiveRecord::Base.transaction do |transaction|
# transaction.before_commit { puts "before commit!" }
# transaction.after_commit { puts "after commit!" }
# transaction.after_rollback { puts "after rollback!" }
# end
#
# == Nested transactions support
#
# #transaction calls can be nested. By default, this makes all database
Expand Down Expand Up @@ -345,7 +356,7 @@ def transaction(requires_new: nil, isolation: nil, joinable: true, &block)
if isolation
raise ActiveRecord::TransactionIsolationError, "cannot set isolation when joining a transaction"
end
yield
yield current_transaction
else
transaction_manager.within_new_transaction(isolation: isolation, joinable: joinable, &block)
end
Expand Down
Expand Up @@ -116,15 +116,19 @@ def dirty!; end
def invalidated?; false; end
def invalidate!; end
def materialized?; false; end
def before_commit; yield; end
def after_commit; yield; end
def after_rollback; end # noop
end

class Transaction # :nodoc:
class Transaction < ActiveRecord::Transaction # :nodoc:
attr_reader :connection, :state, :savepoint_name, :isolation_level
attr_accessor :written

delegate :invalidate!, :invalidated?, to: :@state

def initialize(connection, isolation: nil, joinable: true, run_commit_callbacks: false)
super()
@connection = connection
@state = TransactionState.new
@records = nil
Expand Down Expand Up @@ -191,60 +195,76 @@ def restore!
end

def rollback_records
return unless records

ite = unique_records
if records
begin
ite = unique_records

instances_to_run_callbacks_on = prepare_instances_to_run_callbacks_on(ite)
instances_to_run_callbacks_on = prepare_instances_to_run_callbacks_on(ite)

run_action_on_records(ite, instances_to_run_callbacks_on) do |record, should_run_callbacks|
record.rolledback!(force_restore_state: full_rollback?, should_run_callbacks: should_run_callbacks)
end
ensure
ite&.each do |i|
i.rolledback!(force_restore_state: full_rollback?, should_run_callbacks: false)
run_action_on_records(ite, instances_to_run_callbacks_on) do |record, should_run_callbacks|
record.rolledback!(force_restore_state: full_rollback?, should_run_callbacks: should_run_callbacks)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're not exposing full_rollback? to our new callbacks, but I'm not sure that's a generally useful piece of information... without digging in to history, this feels like it might be facilitating a historical-compatibility quirk rather than a first-principles desired function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hum, I'm not even sure what it means. That the outer transaction is rolled back too?

I don't think we need it.

end
ensure
ite&.each do |i|
i.rolledback!(force_restore_state: full_rollback?, should_run_callbacks: false)
end
end
end

@callbacks&.each(&:after_rollback)
end

def before_commit_records
return unless records

if @run_commit_callbacks
if ActiveRecord.before_committed_on_all_records
ite = unique_records
if records
if ActiveRecord.before_committed_on_all_records
ite = unique_records

instances_to_run_callbacks_on = records.each_with_object({}) do |record, candidates|
candidates[record] = record
end
instances_to_run_callbacks_on = records.each_with_object({}) do |record, candidates|
candidates[record] = record
end

run_action_on_records(ite, instances_to_run_callbacks_on) do |record, should_run_callbacks|
record.before_committed! if should_run_callbacks
run_action_on_records(ite, instances_to_run_callbacks_on) do |record, should_run_callbacks|
record.before_committed! if should_run_callbacks
end
else
records.uniq.each(&:before_committed!)
end
else
records.uniq.each(&:before_committed!)
end

@callbacks&.each(&:before_commit)
end
# Note: When @run_commit_callbacks is false #commit_records takes care of appending
# remaining callbacks to the parent transaction
end

def commit_records
return unless records

ite = unique_records
if records
begin
ite = unique_records

if @run_commit_callbacks
instances_to_run_callbacks_on = prepare_instances_to_run_callbacks_on(ite)
if @run_commit_callbacks
instances_to_run_callbacks_on = prepare_instances_to_run_callbacks_on(ite)

run_action_on_records(ite, instances_to_run_callbacks_on) do |record, should_run_callbacks|
record.committed!(should_run_callbacks: should_run_callbacks)
end
else
while record = ite.shift
# if not running callbacks, only adds the record to the parent transaction
connection.add_transaction_record(record)
run_action_on_records(ite, instances_to_run_callbacks_on) do |record, should_run_callbacks|
record.committed!(should_run_callbacks: should_run_callbacks)
end
else
while record = ite.shift
# if not running callbacks, only adds the record to the parent transaction
connection.add_transaction_record(record)
end
end
ensure
ite&.each { |i| i.committed!(should_run_callbacks: false) }
end
end
ensure
ite&.each { |i| i.committed!(should_run_callbacks: false) }

if @run_commit_callbacks
@callbacks&.each(&:after_commit)
elsif @callbacks
connection.current_transaction.append_callbacks(@callbacks)
end
end

def full_rollback?; true; end
Expand Down Expand Up @@ -533,7 +553,7 @@ def within_new_transaction(isolation: nil, joinable: true)
@connection.lock.synchronize do
transaction = begin_transaction(isolation: isolation, joinable: joinable)
begin
yield
yield transaction
rescue Exception => error
rollback_transaction
after_failure_actions(transaction, error)
Expand Down Expand Up @@ -573,7 +593,7 @@ def current_transaction
end

private
NULL_TRANSACTION = NullTransaction.new
NULL_TRANSACTION = NullTransaction.new.freeze

# Deallocate invalidated prepared statements outside of the transaction
def after_failure_actions(transaction, error)
Expand Down
68 changes: 68 additions & 0 deletions activerecord/lib/active_record/transaction.rb
@@ -0,0 +1,68 @@
# frozen_string_literal: true

module ActiveRecord
class Transaction
class Callback # :nodoc:
def initialize(event, callback)
@event = event
@callback = callback
end

def before_commit
@callback.call if @event == :before_commit
end

def after_commit
@callback.call if @event == :after_commit
end

def after_rollback
@callback.call if @event == :after_rollback
end
end

def initialize # :nodoc:
@callbacks = nil
end

# Registers a block to be called before the current transaction is fully committed.
#
# If there is no currently open transactions, the block is called immediately.
#
# If the current transaction has a parent transaction, the callback is transfered to
# the parent when the current transaction commits, or dropped when the current transaction
# is rolled back. This operation is repeated until the outermost transaction is reached.
def before_commit(&block)
(@callbacks ||= []) << Callback.new(:before_commit, block)
end

# Registers a block to be called after the current transaction is fully committed.
#
# If there is no currently open transactions, the block is called immediately.
#
# If the current transaction has a parent transaction, the callback is transfered to
# the parent when the current transaction commits, or dropped when the current transaction
# is rolled back. This operation is repeated until the outermost transaction is reached.
def after_commit(&block)
(@callbacks ||= []) << Callback.new(:after_commit, block)
end

# Registers a block to be called after the current transaction is rolled back.
#
# If there is no currently open transactions, the block is never called.
#
# If the current transaction is successfully committed but has a parent
# transaction, the callback is automatically added to the parent transaction.
#
# If the entire chain of nested transactions are all successfully committed,
# the block is never called.
casperisfine marked this conversation as resolved.
Show resolved Hide resolved
def after_rollback(&block)
(@callbacks ||= []) << Callback.new(:after_rollback, block)
end

protected
def append_callbacks(callbacks)
(@callbacks ||= []).concat(callbacks)
end
end
end
5 changes: 5 additions & 0 deletions activerecord/lib/active_record/transactions.rb
Expand Up @@ -224,6 +224,11 @@ def transaction(**options, &block)
end
end

# Returns the current transaction. See ActiveRecord::Transactions API docs.
def current_transaction
connection_pool.active_connection&.current_transaction || ConnectionAdapters::NULL_TRANSACTION
end

def before_commit(*args, &block) # :nodoc:
set_options_for_callbacks!(args)
set_callback(:before_commit, :before, *args, &block)
Expand Down