Skip to content

Commit

Permalink
Merge pull request #25991 from rails/retry-and-discard-jobs
Browse files Browse the repository at this point in the history
Add retry_on/discard_on for better exception handling
  • Loading branch information
dhh committed Aug 2, 2016
2 parents 7f00b8f + b53da9e commit d46d61e
Show file tree
Hide file tree
Showing 8 changed files with 289 additions and 26 deletions.
21 changes: 21 additions & 0 deletions activejob/CHANGELOG.md
@@ -1,2 +1,23 @@
## Rails 5.1.0.alpha ##

* Added declarative exception handling via ActiveJob::Base.retry_on and ActiveJob::Base.discard_on.

Examples:

class RemoteServiceJob < ActiveJob::Base
retry_on CustomAppException # defaults to 3s wait, 5 attempts
retry_on AnotherCustomAppException, wait: ->(executions) { executions * 2 }
retry_on ActiveRecord::StatementInvalid, wait: 5.seconds, attempts: 3
retry_on Net::OpenTimeout, wait: :exponentially_longer, attempts: 10
discard_on ActiveJob::DeserializationError

def perform(*args)
# Might raise CustomAppException or AnotherCustomAppException for something domain specific
# Might raise ActiveRecord::StatementInvalid when a local db deadlock is detected
# Might raise Net::OpenTimeout when the remote service is down
end
end

*DHH*

Please check [5-0-stable](https://github.com/rails/rails/blob/5-0-stable/activejob/CHANGELOG.md) for previous changes.
2 changes: 2 additions & 0 deletions activejob/lib/active_job/base.rb
Expand Up @@ -5,6 +5,7 @@
require 'active_job/enqueuing'
require 'active_job/execution'
require 'active_job/callbacks'
require 'active_job/exceptions'
require 'active_job/logging'
require 'active_job/translation'

Expand Down Expand Up @@ -62,6 +63,7 @@ class Base
include Enqueuing
include Execution
include Callbacks
include Exceptions
include Logging
include Translation

Expand Down
6 changes: 6 additions & 0 deletions activejob/lib/active_job/core.rb
Expand Up @@ -24,6 +24,9 @@ module Core
# ID optionally provided by adapter
attr_accessor :provider_job_id

# Number of times this job has been executed (which increments on every retry, like after an exception).
attr_accessor :executions

# I18n.locale to be used during the job.
attr_accessor :locale
end
Expand Down Expand Up @@ -68,6 +71,7 @@ def initialize(*arguments)
@job_id = SecureRandom.uuid
@queue_name = self.class.queue_name
@priority = self.class.priority
@executions = 0
end

# Returns a hash with the job data that can safely be passed to the
Expand All @@ -79,6 +83,7 @@ def serialize
'queue_name' => queue_name,
'priority' => priority,
'arguments' => serialize_arguments(arguments),
'executions' => executions,
'locale' => I18n.locale.to_s
}
end
Expand Down Expand Up @@ -109,6 +114,7 @@ def deserialize(job_data)
self.queue_name = job_data['queue_name']
self.priority = job_data['priority']
self.serialized_arguments = job_data['arguments']
self.executions = job_data['executions']
self.locale = job_data['locale'] || I18n.locale.to_s
end

Expand Down
27 changes: 1 addition & 26 deletions activejob/lib/active_job/enqueuing.rb
@@ -1,7 +1,7 @@
require 'active_job/arguments'

module ActiveJob
# Provides behavior for enqueuing and retrying jobs.
# Provides behavior for enqueuing jobs.
module Enqueuing
extend ActiveSupport::Concern

Expand All @@ -24,31 +24,6 @@ def job_or_instantiate(*args)
end
end

# Reschedules the job to be re-executed. This is useful in combination
# with the +rescue_from+ option. When you rescue an exception from your job
# you can ask Active Job to retry performing your job.
#
# ==== Options
# * <tt>:wait</tt> - Enqueues the job with the specified delay
# * <tt>:wait_until</tt> - Enqueues the job at the time specified
# * <tt>:queue</tt> - Enqueues the job on the specified queue
# * <tt>:priority</tt> - Enqueues the job with the specified priority
#
# ==== Examples
#
# class SiteScraperJob < ActiveJob::Base
# rescue_from(ErrorLoadingSite) do
# retry_job queue: :low_priority
# end
#
# def perform(*args)
# # raise ErrorLoadingSite if cannot scrape
# end
# end
def retry_job(options={})
enqueue options
end

# Enqueues the job to be performed by the queue adapter.
#
# ==== Options
Expand Down
122 changes: 122 additions & 0 deletions activejob/lib/active_job/exceptions.rb
@@ -0,0 +1,122 @@
require 'active_support/core_ext/numeric/time'

module ActiveJob
# Provides behavior for retrying and discarding jobs on exceptions.
module Exceptions
extend ActiveSupport::Concern

module ClassMethods
# Catch the exception and reschedule job for re-execution after so many seconds, for a specific number of attempts.
# If the exception keeps getting raised beyond the specified number of attempts, the exception is allowed to
# bubble up to the underlying queuing system, which may have its own retry mechanism or place it in a
# holding queue for inspection.
#
# You can also pass a block that'll be invoked if the retry attempts fail for custom logic rather than letting
# the exception bubble up.
#
# ==== Options
# * <tt>:wait</tt> - Re-enqueues the job with a delay specified either in seconds (default: 3 seconds),
# as a computing proc that the number of executions so far as an argument, or as a symbol reference of
# <tt>:exponentially_longer<>, which applies the wait algorithm of <tt>(executions ** 4) + 2</tt>
# (first wait 3s, then 18s, then 83s, etc)
# * <tt>:attempts</tt> - Re-enqueues the job the specified number of times (default: 5 attempts)
# * <tt>:queue</tt> - Re-enqueues the job on a different queue
# * <tt>:priority</tt> - Re-enqueues the job with a different priority
#
# ==== Examples
#
# class RemoteServiceJob < ActiveJob::Base
# retry_on CustomAppException # defaults to 3s wait, 5 attempts
# retry_on AnotherCustomAppException, wait: ->(executions) { executions * 2 }
# retry_on(YetAnotherCustomAppException) do |exception|
# ExceptionNotifier.caught(exception)
# end
# retry_on ActiveRecord::StatementInvalid, wait: 5.seconds, attempts: 3
# retry_on Net::OpenTimeout, wait: :exponentially_longer, attempts: 10
#
# def perform(*args)
# # Might raise CustomAppException, AnotherCustomAppException, or YetAnotherCustomAppException for something domain specific
# # Might raise ActiveRecord::StatementInvalid when a local db deadlock is detected
# # Might raise Net::OpenTimeout when the remote service is down
# end
# end
def retry_on(exception, wait: 3.seconds, attempts: 5, queue: nil, priority: nil)
rescue_from exception do |error|
if executions < attempts
logger.error "Retrying #{self.class} in #{wait} seconds, due to a #{exception}. The original exception was #{error.cause.inspect}."
retry_job wait: determine_delay(wait), queue: queue, priority: priority
else
if block_given?
yield exception
else
logger.error "Stopped retrying #{self.class} due to a #{exception}, which reoccurred on #{executions} attempts. The original exception was #{error.cause.inspect}."
raise error
end
end
end
end

# Discard the job with no attempts to retry, if the exception is raised. This is useful when the subject of the job,
# like an Active Record, is no longer available, and the job is thus no longer relevant.
#
# ==== Example
#
# class SearchIndexingJob < ActiveJob::Base
# discard_on ActiveJob::DeserializationError
#
# def perform(record)
# # Will raise ActiveJob::DeserializationError if the record can't be deserialized
# end
# end
def discard_on(exception)
rescue_from exception do |error|
logger.error "Discarded #{self.class} due to a #{exception}. The original exception was #{error.cause.inspect}."
end
end
end

# Reschedules the job to be re-executed. This is useful in combination
# with the +rescue_from+ option. When you rescue an exception from your job
# you can ask Active Job to retry performing your job.
#
# ==== Options
# * <tt>:wait</tt> - Enqueues the job with the specified delay in seconds
# * <tt>:wait_until</tt> - Enqueues the job at the time specified
# * <tt>:queue</tt> - Enqueues the job on the specified queue
# * <tt>:priority</tt> - Enqueues the job with the specified priority
#
# ==== Examples
#
# class SiteScraperJob < ActiveJob::Base
# rescue_from(ErrorLoadingSite) do
# retry_job queue: :low_priority
# end
#
# def perform(*args)
# # raise ErrorLoadingSite if cannot scrape
# end
# end
def retry_job(options = {})
enqueue options
end

private
def determine_delay(seconds_or_duration_or_algorithm)
case seconds_or_duration_or_algorithm
when :exponentially_longer
(executions ** 4) + 2
when ActiveSupport::Duration
duration = seconds_or_duration_or_algorithm
duration.to_i
when Integer
seconds = seconds_or_duration_or_algorithm
seconds
when Proc
algorithm = seconds_or_duration_or_algorithm
algorithm.call(executions)
else
raise "Couldn't determine a delay based on #{seconds_or_duration_or_algorithm.inspect}"
end
end
end
end
1 change: 1 addition & 0 deletions activejob/lib/active_job/execution.rb
Expand Up @@ -31,6 +31,7 @@ def execute(job_data) #:nodoc:
def perform_now
deserialize_arguments_if_needed
run_callbacks :perform do
self.executions = executions + 1
perform(*arguments)
end
rescue => exception
Expand Down
107 changes: 107 additions & 0 deletions activejob/test/cases/exceptions_test.rb
@@ -0,0 +1,107 @@
require 'helper'
require 'jobs/retry_job'

class ExceptionsTest < ActiveJob::TestCase
setup do
JobBuffer.clear
skip if ActiveJob::Base.queue_adapter.is_a?(ActiveJob::QueueAdapters::InlineAdapter)
end

test "successfully retry job throwing exception against defaults" do
perform_enqueued_jobs do
RetryJob.perform_later 'DefaultsError', 5

assert_equal [
"Raised DefaultsError for the 1st time",
"Raised DefaultsError for the 2nd time",
"Raised DefaultsError for the 3rd time",
"Raised DefaultsError for the 4th time",
"Successfully completed job" ], JobBuffer.values
end
end

test "successfully retry job throwing exception against higher limit" do
perform_enqueued_jobs do
RetryJob.perform_later 'ShortWaitTenAttemptsError', 9
assert_equal 9, JobBuffer.values.count
end
end

test "failed retry job when exception kept occurring against defaults" do
perform_enqueued_jobs do
begin
RetryJob.perform_later 'DefaultsError', 6
assert_equal "Raised DefaultsError for the 5th time", JobBuffer.last_value
rescue DefaultsError
pass
end
end
end

test "failed retry job when exception kept occurring against higher limit" do
perform_enqueued_jobs do
begin
RetryJob.perform_later 'ShortWaitTenAttemptsError', 11
assert_equal "Raised ShortWaitTenAttemptsError for the 10th time", JobBuffer.last_value
rescue ShortWaitTenAttemptsError
pass
end
end
end

test "discard job" do
perform_enqueued_jobs do
RetryJob.perform_later 'DiscardableError', 2
assert_equal "Raised DiscardableError for the 1st time", JobBuffer.last_value
end
end

test "custom handling of job that exceeds retry attempts" do
perform_enqueued_jobs do
RetryJob.perform_later 'CustomCatchError', 6
assert_equal "Dealt with a job that failed to retry in a custom way", JobBuffer.last_value
end
end

test "long wait job" do
travel_to Time.now

perform_enqueued_jobs do
assert_performed_with at: (Time.now + 3600.seconds).to_i do
RetryJob.perform_later 'LongWaitError', 5
end
end
end

test "exponentially retrying job" do
travel_to Time.now

perform_enqueued_jobs do
assert_performed_with at: (Time.now + 3.seconds).to_i do
assert_performed_with at: (Time.now + 18.seconds).to_i do
assert_performed_with at: (Time.now + 83.seconds).to_i do
assert_performed_with at: (Time.now + 258.seconds).to_i do
RetryJob.perform_later 'ExponentialWaitTenAttemptsError', 5
end
end
end
end
end
end

test "custom wait retrying job" do
travel_to Time.now

perform_enqueued_jobs do
assert_performed_with at: (Time.now + 2.seconds).to_i do
assert_performed_with at: (Time.now + 4.seconds).to_i do
assert_performed_with at: (Time.now + 6.seconds).to_i do
assert_performed_with at: (Time.now + 8.seconds).to_i do
RetryJob.perform_later 'CustomWaitTenAttemptsError', 5
end
end
end
end
end
end
end
29 changes: 29 additions & 0 deletions activejob/test/jobs/retry_job.rb
@@ -0,0 +1,29 @@
require_relative '../support/job_buffer'
require 'active_support/core_ext/integer/inflections'

class DefaultsError < StandardError; end
class LongWaitError < StandardError; end
class ShortWaitTenAttemptsError < StandardError; end
class ExponentialWaitTenAttemptsError < StandardError; end
class CustomWaitTenAttemptsError < StandardError; end
class CustomCatchError < StandardError; end
class DiscardableError < StandardError; end

class RetryJob < ActiveJob::Base
retry_on DefaultsError
retry_on LongWaitError, wait: 1.hour, attempts: 10
retry_on ShortWaitTenAttemptsError, wait: 1.second, attempts: 10
retry_on ExponentialWaitTenAttemptsError, wait: :exponentially_longer, attempts: 10
retry_on CustomWaitTenAttemptsError, wait: ->(executions) { executions * 2 }, attempts: 10
retry_on(CustomCatchError) { |exception| JobBuffer.add("Dealt with a job that failed to retry in a custom way") }
discard_on DiscardableError

def perform(raising, attempts)
if executions < attempts
JobBuffer.add("Raised #{raising} for the #{executions.ordinalize} time")
raise raising.constantize
else
JobBuffer.add("Successfully completed job")
end
end
end

0 comments on commit d46d61e

Please sign in to comment.