Skip to content

Commit

Permalink
Communicate enqueue failures to callers of perform_later
Browse files Browse the repository at this point in the history
There is presently no clean way of telling a caller of `perform_later`
the reason why a job failed to enqueue. When the job is enqueued
successfully, the job object itself is returned, but when the job can
not be enqueued, only `false` is returned. This does not allow callers
to distinguish between classes of failures.

One important class of failures is when the job backend experiences a
network partition when communicating with its underlying datastore. It
is entirely possible for that network partition to recover and as such,
code attempting to enqueue a job may wish to take action to reenqueue
that job after a brief delay. This is distinguished from the class of
failures where due a business rule defined in a callback in the
application, a job fails to enqueue and should not be retried.

This PR changes the following:

- Allows a block to be passed to the `perform_later` method. After the
  `enqueue` method is executed, but before the result is returned, the
  job will be yielded to the block. This allows the code invoking the
  `perform_later` method to inspect the job object, even in failure
  scenarios.

- Adds an exception `EnqueueError` which job adapters can raise if they
  detect a problem specific to their underlying implementation or
  infrastructure during the enqueue process.

- Adds two properties to the job base class: `successfully_enqueued` and
  `enqueue_error`. `enqueue_error` will be populated by the `enqueue`
  method if it rescues an `EnqueueError` raised by the job backend.
  `successfully_enqueued` will be true if the job is not rejected by
  callbacks and does not cause the job backend to raise an
  `EnqueueError` and will be `false` otherwise.

This will allow developers to do something like the following:

    MyJob.perform_later do |job|
      unless job.successfully_enqueued?
        if job.enqueue_error&.message == "Redis was unavailable"
          # invoke some code that will retry the job after a delay
        end
      end
    end
  • Loading branch information
djmortonShopify committed Feb 5, 2021
1 parent c8a86d3 commit ee60ce5
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 5 deletions.
20 changes: 20 additions & 0 deletions activejob/CHANGELOG.md
@@ -1,3 +1,23 @@
* Communicate enqueue failures to callers of `perform_later`.

`perform_later` can now optionally take a block which will execute after
the adapter attempts to enqueue the job. The block will receive the job
instance as an argument even if the enqueue was not successful.
Additionally, `ActiveJob` adapaters now have the ability to raise an
`ActiveJob::EnqueueError` which will be caught and stored in the job
instance so code attempting to enqueue jobs can inspect any raised
`EnqueueError` using the block.

MyJob.perform_later do |job|
unless job.successfully_enqueued?
if job.enqueue_error&.message == "Redis was unavailable"
# invoke some code that will retry the job after a delay
end
end
end

*Daniel Morton*

* Don't log rescuable exceptions defined with `rescue_from`.

*Hu Hailin*
Expand Down
10 changes: 10 additions & 0 deletions activejob/lib/active_job/core.rb
Expand Up @@ -43,6 +43,16 @@ module Core
# Track when a job was enqueued
attr_accessor :enqueued_at

# Track whether the adapter received the job successfully.
attr_writer :successfully_enqueued # :nodoc:

def successfully_enqueued?
@successfully_enqueued
end

# Track any exceptions raised by the backend so callers can inspect the errors.
attr_accessor :enqueue_error

# These methods will be included into any Active Job object, adding
# helpers for de/serialization and creation of job instances.
module ClassMethods
Expand Down
24 changes: 19 additions & 5 deletions activejob/lib/active_job/enqueuing.rb
Expand Up @@ -4,6 +4,11 @@

module ActiveJob
# Provides behavior for enqueuing jobs.

# Can be raised by adapters if they wish to communicate to the caller a reason
# why the adapter was unexpectedly unable to enqueue a job.
class EnqueueError < StandardError; end

module Enqueuing
extend ActiveSupport::Concern

Expand All @@ -17,9 +22,16 @@ module ClassMethods
# custom serializers.
#
# Returns an instance of the job class queued with arguments available in
# Job#arguments.
# Job#arguments or false if the enqueue did not succeed.
#
# After the attempted enqueue, the job will be yielded to an optional block.
def perform_later(*args)
job_or_instantiate(*args).enqueue
job = job_or_instantiate(*args)
enqueue_result = job.enqueue

yield job if block_given?

enqueue_result
end
ruby2_keywords(:perform_later) if respond_to?(:ruby2_keywords, true)

Expand Down Expand Up @@ -50,7 +62,7 @@ def enqueue(options = {})
self.scheduled_at = options[:wait_until].to_f if options[:wait_until]
self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue]
self.priority = options[:priority].to_i if options[:priority]
successfully_enqueued = false
self.successfully_enqueued = false

run_callbacks :enqueue do
if scheduled_at
Expand All @@ -59,10 +71,12 @@ def enqueue(options = {})
queue_adapter.enqueue self
end

successfully_enqueued = true
self.successfully_enqueued = true
rescue EnqueueError => e
self.enqueue_error = e
end

if successfully_enqueued
if successfully_enqueued?
self
else
false
Expand Down
17 changes: 17 additions & 0 deletions activejob/test/cases/queuing_test.rb
Expand Up @@ -2,6 +2,7 @@

require "helper"
require "jobs/hello_job"
require "jobs/enqueue_error_job"
require "active_support/core_ext/numeric/time"

class QueuingTest < ActiveSupport::TestCase
Expand Down Expand Up @@ -37,4 +38,20 @@ class QueuingTest < ActiveSupport::TestCase
rescue NotImplementedError
skip
end

test "job is yielded to block after enqueue with successfully_enqueued property set" do
HelloJob.perform_later "John" do |job|
assert_equal "John says hello", JobBuffer.last_value
assert_equal [ "John" ], job.arguments
assert_equal true, job.successfully_enqueued?
assert_nil job.enqueue_error
end
end

test "when enqueuing raises an EnqueueError job is yielded to block with error set on job" do
EnqueueErrorJob.perform_later do |job|
assert_equal false, job.successfully_enqueued?
assert_equal ActiveJob::EnqueueError, job.enqueue_error.class
end
end
end
21 changes: 21 additions & 0 deletions activejob/test/jobs/enqueue_error_job.rb
@@ -0,0 +1,21 @@
# frozen_string_literal: true

class EnqueueErrorJob < ActiveJob::Base
class EnqueueErrorAdapter
class << self
def enqueue(*)
raise ActiveJob::EnqueueError, "There was an error enqueuing the job"
end

def enqueue_at(*)
raise ActiveJob::EnqueueError, "There was an error enqueuing the job"
end
end
end

self.queue_adapter = EnqueueErrorAdapter

def perform
raise "This should never be called"
end
end

0 comments on commit ee60ce5

Please sign in to comment.