diff --git a/activejob/CHANGELOG.md b/activejob/CHANGELOG.md index 5e72c67aea155..39503caeeadfb 100644 --- a/activejob/CHANGELOG.md +++ b/activejob/CHANGELOG.md @@ -1,2 +1,23 @@ +## Rails 5.1.0.alpha ## + +* Added declarative exception handling via ActiveJob::Base.retry_on and ActiveJob::Base.discard_on. + + Examples: + + class RemoteServiceJob < ActiveJob::Base + retry_on CustomAppException # defaults to 3s wait, 5 attempts + retry_on AnotherCustomAppException, wait: ->(executions) { executions * 2 } + retry_on ActiveRecord::StatementInvalid, wait: 5.seconds, attempts: 3 + retry_on Net::OpenTimeout, wait: :exponentially_longer, attempts: 10 + discard_on ActiveJob::DeserializationError + + def perform(*args) + # Might raise CustomAppException or AnotherCustomAppException for something domain specific + # Might raise ActiveRecord::StatementInvalid when a local db deadlock is detected + # Might raise Net::OpenTimeout when the remote service is down + end + end + + *DHH* Please check [5-0-stable](https://github.com/rails/rails/blob/5-0-stable/activejob/CHANGELOG.md) for previous changes. diff --git a/activejob/lib/active_job/base.rb b/activejob/lib/active_job/base.rb index ff5c69ddc6f1f..94a1faba3f3b6 100644 --- a/activejob/lib/active_job/base.rb +++ b/activejob/lib/active_job/base.rb @@ -5,6 +5,7 @@ require 'active_job/enqueuing' require 'active_job/execution' require 'active_job/callbacks' +require 'active_job/exceptions' require 'active_job/logging' require 'active_job/translation' @@ -62,6 +63,7 @@ class Base include Enqueuing include Execution include Callbacks + include Exceptions include Logging include Translation diff --git a/activejob/lib/active_job/core.rb b/activejob/lib/active_job/core.rb index f0606b3834d44..1b5ddff17b521 100644 --- a/activejob/lib/active_job/core.rb +++ b/activejob/lib/active_job/core.rb @@ -24,6 +24,9 @@ module Core # ID optionally provided by adapter attr_accessor :provider_job_id + # Number of times this job has been executed (which increments on every retry, like after an exception). + attr_accessor :executions + # I18n.locale to be used during the job. attr_accessor :locale end @@ -68,6 +71,7 @@ def initialize(*arguments) @job_id = SecureRandom.uuid @queue_name = self.class.queue_name @priority = self.class.priority + @executions = 0 end # Returns a hash with the job data that can safely be passed to the @@ -79,6 +83,7 @@ def serialize 'queue_name' => queue_name, 'priority' => priority, 'arguments' => serialize_arguments(arguments), + 'executions' => executions, 'locale' => I18n.locale.to_s } end @@ -109,6 +114,7 @@ def deserialize(job_data) self.queue_name = job_data['queue_name'] self.priority = job_data['priority'] self.serialized_arguments = job_data['arguments'] + self.executions = job_data['executions'] self.locale = job_data['locale'] || I18n.locale.to_s end diff --git a/activejob/lib/active_job/enqueuing.rb b/activejob/lib/active_job/enqueuing.rb index 9dc3c0fa57644..7bc0fe6c3a09f 100644 --- a/activejob/lib/active_job/enqueuing.rb +++ b/activejob/lib/active_job/enqueuing.rb @@ -1,7 +1,7 @@ require 'active_job/arguments' module ActiveJob - # Provides behavior for enqueuing and retrying jobs. + # Provides behavior for enqueuing jobs. module Enqueuing extend ActiveSupport::Concern @@ -24,31 +24,6 @@ def job_or_instantiate(*args) end end - # Reschedules the job to be re-executed. This is useful in combination - # with the +rescue_from+ option. When you rescue an exception from your job - # you can ask Active Job to retry performing your job. - # - # ==== Options - # * :wait - Enqueues the job with the specified delay - # * :wait_until - Enqueues the job at the time specified - # * :queue - Enqueues the job on the specified queue - # * :priority - Enqueues the job with the specified priority - # - # ==== Examples - # - # class SiteScraperJob < ActiveJob::Base - # rescue_from(ErrorLoadingSite) do - # retry_job queue: :low_priority - # end - # - # def perform(*args) - # # raise ErrorLoadingSite if cannot scrape - # end - # end - def retry_job(options={}) - enqueue options - end - # Enqueues the job to be performed by the queue adapter. # # ==== Options diff --git a/activejob/lib/active_job/exceptions.rb b/activejob/lib/active_job/exceptions.rb new file mode 100644 index 0000000000000..acbe0c40570a0 --- /dev/null +++ b/activejob/lib/active_job/exceptions.rb @@ -0,0 +1,122 @@ +require 'active_support/core_ext/numeric/time' + +module ActiveJob + # Provides behavior for retrying and discarding jobs on exceptions. + module Exceptions + extend ActiveSupport::Concern + + module ClassMethods + # Catch the exception and reschedule job for re-execution after so many seconds, for a specific number of attempts. + # If the exception keeps getting raised beyond the specified number of attempts, the exception is allowed to + # bubble up to the underlying queuing system, which may have its own retry mechanism or place it in a + # holding queue for inspection. + # + # You can also pass a block that'll be invoked if the retry attempts fail for custom logic rather than letting + # the exception bubble up. + # + # ==== Options + # * :wait - Re-enqueues the job with a delay specified either in seconds (default: 3 seconds), + # as a computing proc that the number of executions so far as an argument, or as a symbol reference of + # :exponentially_longer<>, which applies the wait algorithm of (executions ** 4) + 2 + # (first wait 3s, then 18s, then 83s, etc) + # * :attempts - Re-enqueues the job the specified number of times (default: 5 attempts) + # * :queue - Re-enqueues the job on a different queue + # * :priority - Re-enqueues the job with a different priority + # + # ==== Examples + # + # class RemoteServiceJob < ActiveJob::Base + # retry_on CustomAppException # defaults to 3s wait, 5 attempts + # retry_on AnotherCustomAppException, wait: ->(executions) { executions * 2 } + # retry_on(YetAnotherCustomAppException) do |exception| + # ExceptionNotifier.caught(exception) + # end + # retry_on ActiveRecord::StatementInvalid, wait: 5.seconds, attempts: 3 + # retry_on Net::OpenTimeout, wait: :exponentially_longer, attempts: 10 + # + # def perform(*args) + # # Might raise CustomAppException, AnotherCustomAppException, or YetAnotherCustomAppException for something domain specific + # # Might raise ActiveRecord::StatementInvalid when a local db deadlock is detected + # # Might raise Net::OpenTimeout when the remote service is down + # end + # end + def retry_on(exception, wait: 3.seconds, attempts: 5, queue: nil, priority: nil) + rescue_from exception do |error| + if executions < attempts + logger.error "Retrying #{self.class} in #{wait} seconds, due to a #{exception}. The original exception was #{error.cause.inspect}." + retry_job wait: determine_delay(wait), queue: queue, priority: priority + else + if block_given? + yield exception + else + logger.error "Stopped retrying #{self.class} due to a #{exception}, which reoccurred on #{executions} attempts. The original exception was #{error.cause.inspect}." + raise error + end + end + end + end + + # Discard the job with no attempts to retry, if the exception is raised. This is useful when the subject of the job, + # like an Active Record, is no longer available, and the job is thus no longer relevant. + # + # ==== Example + # + # class SearchIndexingJob < ActiveJob::Base + # discard_on ActiveJob::DeserializationError + # + # def perform(record) + # # Will raise ActiveJob::DeserializationError if the record can't be deserialized + # end + # end + def discard_on(exception) + rescue_from exception do |error| + logger.error "Discarded #{self.class} due to a #{exception}. The original exception was #{error.cause.inspect}." + end + end + end + + # Reschedules the job to be re-executed. This is useful in combination + # with the +rescue_from+ option. When you rescue an exception from your job + # you can ask Active Job to retry performing your job. + # + # ==== Options + # * :wait - Enqueues the job with the specified delay in seconds + # * :wait_until - Enqueues the job at the time specified + # * :queue - Enqueues the job on the specified queue + # * :priority - Enqueues the job with the specified priority + # + # ==== Examples + # + # class SiteScraperJob < ActiveJob::Base + # rescue_from(ErrorLoadingSite) do + # retry_job queue: :low_priority + # end + # + # def perform(*args) + # # raise ErrorLoadingSite if cannot scrape + # end + # end + def retry_job(options = {}) + enqueue options + end + + private + def determine_delay(seconds_or_duration_or_algorithm) + case seconds_or_duration_or_algorithm + when :exponentially_longer + (executions ** 4) + 2 + when ActiveSupport::Duration + duration = seconds_or_duration_or_algorithm + duration.to_i + when Integer + seconds = seconds_or_duration_or_algorithm + seconds + when Proc + algorithm = seconds_or_duration_or_algorithm + algorithm.call(executions) + else + raise "Couldn't determine a delay based on #{seconds_or_duration_or_algorithm.inspect}" + end + end + end +end diff --git a/activejob/lib/active_job/execution.rb b/activejob/lib/active_job/execution.rb index 4e4acfc2c2adc..0c047cd4e14f7 100644 --- a/activejob/lib/active_job/execution.rb +++ b/activejob/lib/active_job/execution.rb @@ -31,6 +31,7 @@ def execute(job_data) #:nodoc: def perform_now deserialize_arguments_if_needed run_callbacks :perform do + self.executions = executions + 1 perform(*arguments) end rescue => exception diff --git a/activejob/test/cases/exceptions_test.rb b/activejob/test/cases/exceptions_test.rb new file mode 100644 index 0000000000000..58d87a339ac3a --- /dev/null +++ b/activejob/test/cases/exceptions_test.rb @@ -0,0 +1,107 @@ +require 'helper' +require 'jobs/retry_job' + +class ExceptionsTest < ActiveJob::TestCase + setup do + JobBuffer.clear + skip if ActiveJob::Base.queue_adapter.is_a?(ActiveJob::QueueAdapters::InlineAdapter) + end + + test "successfully retry job throwing exception against defaults" do + perform_enqueued_jobs do + RetryJob.perform_later 'DefaultsError', 5 + + assert_equal [ + "Raised DefaultsError for the 1st time", + "Raised DefaultsError for the 2nd time", + "Raised DefaultsError for the 3rd time", + "Raised DefaultsError for the 4th time", + "Successfully completed job" ], JobBuffer.values + end + end + + test "successfully retry job throwing exception against higher limit" do + perform_enqueued_jobs do + RetryJob.perform_later 'ShortWaitTenAttemptsError', 9 + assert_equal 9, JobBuffer.values.count + end + end + + test "failed retry job when exception kept occurring against defaults" do + perform_enqueued_jobs do + begin + RetryJob.perform_later 'DefaultsError', 6 + assert_equal "Raised DefaultsError for the 5th time", JobBuffer.last_value + rescue DefaultsError + pass + end + end + end + + test "failed retry job when exception kept occurring against higher limit" do + perform_enqueued_jobs do + begin + RetryJob.perform_later 'ShortWaitTenAttemptsError', 11 + assert_equal "Raised ShortWaitTenAttemptsError for the 10th time", JobBuffer.last_value + rescue ShortWaitTenAttemptsError + pass + end + end + end + + test "discard job" do + perform_enqueued_jobs do + RetryJob.perform_later 'DiscardableError', 2 + assert_equal "Raised DiscardableError for the 1st time", JobBuffer.last_value + end + end + + test "custom handling of job that exceeds retry attempts" do + perform_enqueued_jobs do + RetryJob.perform_later 'CustomCatchError', 6 + assert_equal "Dealt with a job that failed to retry in a custom way", JobBuffer.last_value + end + end + + test "long wait job" do + travel_to Time.now + + perform_enqueued_jobs do + assert_performed_with at: (Time.now + 3600.seconds).to_i do + RetryJob.perform_later 'LongWaitError', 5 + end + end + end + + test "exponentially retrying job" do + travel_to Time.now + + perform_enqueued_jobs do + assert_performed_with at: (Time.now + 3.seconds).to_i do + assert_performed_with at: (Time.now + 18.seconds).to_i do + assert_performed_with at: (Time.now + 83.seconds).to_i do + assert_performed_with at: (Time.now + 258.seconds).to_i do + RetryJob.perform_later 'ExponentialWaitTenAttemptsError', 5 + end + end + end + end + end + end + + test "custom wait retrying job" do + travel_to Time.now + + perform_enqueued_jobs do + assert_performed_with at: (Time.now + 2.seconds).to_i do + assert_performed_with at: (Time.now + 4.seconds).to_i do + assert_performed_with at: (Time.now + 6.seconds).to_i do + assert_performed_with at: (Time.now + 8.seconds).to_i do + RetryJob.perform_later 'CustomWaitTenAttemptsError', 5 + end + end + end + end + end + end +end diff --git a/activejob/test/jobs/retry_job.rb b/activejob/test/jobs/retry_job.rb new file mode 100644 index 0000000000000..350edee61c9bf --- /dev/null +++ b/activejob/test/jobs/retry_job.rb @@ -0,0 +1,29 @@ +require_relative '../support/job_buffer' +require 'active_support/core_ext/integer/inflections' + +class DefaultsError < StandardError; end +class LongWaitError < StandardError; end +class ShortWaitTenAttemptsError < StandardError; end +class ExponentialWaitTenAttemptsError < StandardError; end +class CustomWaitTenAttemptsError < StandardError; end +class CustomCatchError < StandardError; end +class DiscardableError < StandardError; end + +class RetryJob < ActiveJob::Base + retry_on DefaultsError + retry_on LongWaitError, wait: 1.hour, attempts: 10 + retry_on ShortWaitTenAttemptsError, wait: 1.second, attempts: 10 + retry_on ExponentialWaitTenAttemptsError, wait: :exponentially_longer, attempts: 10 + retry_on CustomWaitTenAttemptsError, wait: ->(executions) { executions * 2 }, attempts: 10 + retry_on(CustomCatchError) { |exception| JobBuffer.add("Dealt with a job that failed to retry in a custom way") } + discard_on DiscardableError + + def perform(raising, attempts) + if executions < attempts + JobBuffer.add("Raised #{raising} for the #{executions.ordinalize} time") + raise raising.constantize + else + JobBuffer.add("Successfully completed job") + end + end +end