Skip to content

Conversation

@djmb
Copy link
Contributor

@djmb djmb commented May 29, 2025

Continuations provide a mechanism for interrupting and resuming jobs. This allows long running jobs to make progress across application restarts.

Jobs should include the ActiveJob::Continuable module to enable continuations. Continuable jobs are automatically retried when interrupted.

Use the step method to define the steps in your job. Steps can use an optional cursor to track progress in the step.

Steps are executed as soon as they are encountered. If a job is interrupted, previously completed steps will be skipped. If a step is in progress, it will be resumed with the last recorded cursor.

Code that is not part of a step will be executed on each job execution.

You can pass a block or a method name to the step method. The block will be called with the step object as an argument. Methods can either take no arguments or a single argument for the step object.

class ProcessImportJob < ApplicationJob
  include ActiveJob::Continuable

  def perform(import_id)
    # This always runs, even if the job is resumed.
    @import = Import.find(import_id)

    step :validate do
      @import.validate!
    end

    step :process_records do |step|
      @import.records.find_each(start: step.cursor)
        record.process
        step.advance! from: record.id
      end
    end

    step :reprocess_records
    step :finalize
  end

  def reprocess_records(step)
    @import.records.find_each(start: step.cursor)
      record.reprocess
      step.advance! from: record.id
    end
  end

  def finalize
    @import.finalize!
  end
end

Cursors

Cursors are used to track progress within a step. The cursor can be any object that is serializable as an argument to
ActiveJob::Base.serialize. It defaults to nil.

When a step is resumed, the last cursor value is restored. The code in the step is responsible for using the cursor to continue from the right point.

set! sets the cursor to a specific value.

step :iterate_items do |step|
  items[step.cursor..].each do |item|
    process(item)
    step.set! (step.cursor || 0) + 1
  end
end

A starting value for the cursor can be set when defining the step:

step :iterate_items, start: 0 do |step|
  items[step.cursor..].each do |item|
    process(item)
    step.set! step.cursor + 1
  end
end

The cursor can be advanced with advance!. This calls succ on the current cursor value. It raises an
ActiveJob::Continuation::UnadvanceableCursorError if the cursor does not implement succ.

step :iterate_items, start: 0 do |step|
  items[step.cursor..].each do |item|
    process(item)
    step.advance!
  end
end

You can optionally pass a from argument to advance!. This is useful when iterating over a collection of records where IDs may not be contiguous.

step :process_records do |step|
  import.records.find_each(start: step.cursor)
    record.process
    step.advance! from: record.id
  end
end

You can use an array to iterate over nested records:

step :process_nested_records, start: [ 0, 0 ] do |step|
  Account.find_each(start: step.cursor[0]) do |account|
    account.records.find_each(start: step.cursor[1]) do |record|
      record.process
      step.set! [ account.id, record.id + 1 ]
    end
    step.set! [ account.id + 1, 0 ]
  end
end

Setting or advancing the cursor creates a checkpoint. You can also create a checkpoint manually by calling the checkpoint! method on the step. This is useful if you want to allow interruptions, but don't need to update the cursor.

step :destroy_records do |step|
  import.records.find_each do |record|
    record.destroy!
    step.checkpoint!
  end
end

Checkpoints

A checkpoint is where a job can be interrupted. At a checkpoint the job will call queue_adapter.stopping?. If it returns true, the job will raise an ActiveJob::Continuation::Interrupt exception.

There is an automatic checkpoint at the end of each step. Within a step calling one is created when calling set!, advance! or checkpoint!.

Jobs are not automatically interrupted when the queue adapter is marked as stopping - they will continue to run either until the next checkpoint, or when the process is stopped.

This is to allow jobs to be interrupted at a safe point, but it also means that the jobs should checkpoint more frequently than the shutdown timeout to ensure a graceful restart.

When interrupted, the job will automatically retry with the progress serialized in the job data under the continuation key.

The serialized progress contains:

  • a list of the completed steps
  • the current step and its cursor value (if one is in progress)

Errors

If a job raises an error and is not retried via ActiveJob, it will be passed back to the queue adapter and any progress in this execution will be lost.

To mitigate this, the job will automatically retried if it raises an error after it has made progress. Making progress is defined as having completed a step or advanced the cursor within the current step.

Queue Adapter support

Active Job Continuations call the stopping? method on the queue adapter to check if we are in the shutdown phase. By default this will return false, so the adapters will need to be updated to implement this method.

This implements the stopping? method in the Test and Sidekiq adapters.

It would be possible to add support to Delayed Job via a plugin, but it would probably be better to add a new lifecycle callback to DJ for when it is shutting down.

Resque also will require a new hook before it can be supported.

Solid Queue's adapter is not part of Rails, but support can be added there via the on_worker_stop hook.

Inspiration

This took a lot inspiration from Shopify's job-iteration gem.

The main differences are:

  • Continuations are Active Job only, so they don't provide the custom enumerators that job-iteration does.
  • They allow multi-step flows
  • They don't intercept the perform method
  • Continuations are a sharp knife - you need to manually checkpoint and update the cursor. But you could build a job-iteration-like API on top of them.

Future work

It would be a good exercise to see if the job-iteration gem could be adapted to run on top of Active Job Continuations to highlight any missing features - we'd want to add things like max iteration time, max job runtime and forcing a job to stop.

Another thing to consider is a mechanism for checking whether it is safe to call a checkpoint. Ideally you wouldn't allow them within database transactions as they'll cause a rollback. We can maybe inject checkpoint safety handlers and add a default one that checks whether we are in any active transactions.

Continuations provide a mechanism for interrupting and resuming jobs.
This allows long running jobs to make progress across application
restarts.

Jobs should include the `ActiveJob::Continuable` module to enable
continuations. Continuable jobs are automatically retried when
interrupted.

Use the `step` method to define the steps in your job. Steps can use an
optional cursor to track progress in the step.

Steps are executed as soon as they are encountered. If a job is
interrupted, previously completed steps will be skipped. If a step is in
progress, it will be resumed with the last recorded cursor.

Code that is not part of a step will be executed on each job execution.

You can pass a block or a method name to the step method. The block
will be called with the step object as an argument. Methods can either
take no arguments or a single argument for the step object.

```ruby
class ProcessImportJob < ApplicationJob
  include ActiveJob::Continuable

  def perform(import_id)
    # This always runs, even if the job is resumed.
    @import = Import.find(import_id)

    step :validate do
      @import.validate!
    end

    step :process_records do |step|
      @import.records.find_each(start: step.cursor)
        record.process
        step.advance! from: record.id
      end
    end

    step :reprocess_records
    step :finalize
  end

  def reprocess_records(step)
    @import.records.find_each(start: step.cursor)
      record.reprocess
      step.advance! from: record.id
    end
  end

  def finalize
    @import.finalize!
  end
end
```

**Cursors**

Cursors are used to track progress within a step. The cursor can be any
object that is serializable as an argument to
`ActiveJob::Base.serialize`. It defaults to `nil`.

When a step is resumed, the last cursor value is restored. The code in
the step is responsible for using the cursor to continue from the right
point.

`set!` sets the cursor to a specific value.

```ruby
step :iterate_items do |step|
  items[step.cursor..].each do |item|
    process(item)
    step.set! (step.cursor || 0) + 1
  end
end
```

A starting value for the cursor can be set when defining the step:

```ruby
step :iterate_items, start: 0 do |step|
  items[step.cursor..].each do |item|
    process(item)
    step.set! step.cursor + 1
  end
end
```

The cursor can be advanced with `advance!`. This calls `succ` on
the current cursor value. It raises an
`ActiveJob::Continuation::UnadvanceableCursorError` if the cursor does
not implement `succ`.

```ruby
step :iterate_items, start: 0 do |step|
  items[step.cursor..].each do |item|
    process(item)
    step.advance!
  end
end
```

You can optionally pass a `from` argument to `advance!`. This is
useful when iterating over a collection of records where IDs may not be
contiguous.

```ruby
step :process_records do |step|
  import.records.find_each(start: step.cursor)
    record.process
    step.advance! from: record.id
  end
end
```

You can use an array to iterate over nested records:

```ruby
step :process_nested_records, start: [ 0, 0 ] do |step|
  Account.find_each(start: step.cursor[0]) do |account|
    account.records.find_each(start: step.cursor[1]) do |record|
      record.process
      step.set! [ account.id, record.id + 1 ]
    end
    step.set! [ account.id + 1, 0 ]
  end
end
```

Setting or advancing the cursor creates a checkpoint. You can also
create a checkpoint manually by calling the `checkpoint!` method on the
step. This is useful if you want to allow interruptions, but don't need
to update the cursor.

```ruby
step :destroy_records do |step|
  import.records.find_each do |record|
    record.destroy!
    step.checkpoint!
  end
end
```

**Checkpoints**

A checkpoint is where a job can be interrupted. At a checkpoint the job
will call `queue_adapter.stopping?`. If it returns true, the job will
raise an `ActiveJob::Continuation::Interrupt` exception.

There is an automatic checkpoint at the end of each step. Within a step
calling one is created when calling `set!`, `advance!` or
`checkpoint!`.

Jobs are not automatically interrupted when the queue adapter is marked
as stopping - they will continue to run either until the next
checkpoint, or when the process is stopped.

This is to allow jobs to be interrupted at a safe point, but it also
means that the jobs should checkpoint more frequently than the shutdown
timeout to ensure a graceful restart.

When interrupted, the job will automatically retry with the progress
serialized in the job data under the `continuation` key.

The serialized progress contains:
- a list of the completed steps
- the current step and its cursor value (if one is in progress)

**Errors**

If a job raises an error and is not retried via ActiveJob, it will be
passed back to the queue adapter and any progress in this execution will
be lost.

To mitigate this, the job will automatically retried if it raises an
error after it has made progress. Making progress is defined as having
completed a step or advanced the cursor within the current step.

**Queue Adapter support**

Active Job Continuations call the `stopping?` method on the queue
adapter to check if we are in the shutdown phase. By default this will
return false, so the adapters will need to be updated to implement this
method.

This implements the `stopping?` method in the test and Sidekiq adapters.

It would be possible to add support to Delayed Job via a plugin, but it
would probably be better to add a new lifecycle callback to DJ for when
it is shutting down.

Resque also will require a new hook before it can be supported.

Solid Queue's adapter is not part of Rails, but support can be added
there via the `on_worker_stop` hook.

**Inspiration**

This took a lot inspiration from Shopify's
[job-iteration](https://github.com/Shopify/job-iteration) gem.

The main differences are:
- Continuations are Active Job only, so they don't provide the custom
  enumerators that job-iteration does.
- They allow multi-step flows
- They don't intercept the perform method
- Continuations are a sharp knife - you need to manually checkpoint and
  update the cursor. But you could build a job-iteration-like API on top
  of them.

**Future work**

It would be a good exercise to see if the job-iteration gem could be
adapted to run on top of Active Job Continuations to highlight any
missing features - we'd want to add things like max iteration time, max
job runtime and forcing a job to stop.

Another thing to consider is a mechanism for checking whether it is safe
to call a checkpoint. Ideally you wouldn't allow them within database
transactions as they'll cause a rollback. We can maybe inject checkpoint
safety handlers and add a default one that checks whether we are in any
active transactions.
@rails-bot rails-bot bot added the activejob label May 29, 2025
@dhh
Copy link
Member

dhh commented May 30, 2025

Fantastic work, @djmb!

@dhh dhh merged commit 0ac3fba into rails:main May 30, 2025
2 of 3 checks passed
Comment on lines +30 to +36
raise ArgumentError, "Step method '#{step_name}' must accept 0 or 1 arguments" if step_method.arity > 1

if step_method.parameters.any? { |type, name| type == :key || type == :keyreq }
raise ArgumentError, "Step method '#{step_name}' must not accept keyword arguments"
end

step_method.arity == 0 ? step_method.call : step_method.call(step)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is making the argument optional really this necessary? Allocating a Method instance and then its parameters isn't cheap.

It's actually very expensive once you compare it to a simple method call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that a job that is using continuations would be expected to be long running and to have a reasonably small number of steps, I wouldn't expect the cost of this to be significant.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. This isn't a hill I'm gonna die on, and indeed in this specific case it is probably fine.

I just tend to question more and more how helpful this type of DSL heavy API design really is. Aside from the performance consideration it makes the code harder to grep for / statically analyze for not a huge gain in term of readability:

e.g.

step :finalize

Is only marginally nicer than:

step :finalize { finalize }

But:

  • Will be qualified of "magic".
  • Will defeat JITs.
  • Will dead code analysis harder.
  • Requires you to know the continuation API well to understand which method is called. It's not immediately obvious to the uninitiated that step :foo does call foo.

But again, if you're set on it, I'm not gonna argue you forever on this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I take your points and just allowing the block format will make the implementation and documentation simpler.

There's not much in step :finalize vs step :finalize { finalize }, but when you have say 5 or more steps the repetition of the block format does start to grate a bit.

What do you think @dhh?

module Continuable
extend ActiveSupport::Concern

CONTINUATION_KEY = "continuation"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the past, the keys used by Active Job itself were prefixed with _aj_, e.g.:

      GLOBALID_KEY = "_aj_globalid"
      # :nodoc:
      SYMBOL_KEYS_KEY = "_aj_symbol_keys"
      # :nodoc:
      RUBY2_KEYWORDS_KEY = "_aj_ruby2_keywords"

We probably should do the same here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that this is live is it too late to change it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ghiculescu it wasn't released yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _aj_ prefix is used as part of argument serialization and not for job data keys, so I think the unprefixed name makes more sense here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad. I mistakenly assumed that keys was in the arguments. If it's at the top-level, there shouldn't be risks of conflict indeed.

# Raised when a job is interrupted, allowing Active Job to requeue it.
# This inherits from +Exception+ rather than +StandardError+, so it's not
# caught by normal exception handling.
class Interrupt < Exception; end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered using throw/catch instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah that's a much better way to do it 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I've been looking at this and I'm concerned about the fact that throw/catch could do unexpected things. It will execute all the ensure blocks, but skip any attempts at cleanup via rescue blocks.

For example if we were inside a database transaction Rails would commit that transaction because it doesn't see an exception.

Throwing an exception has the drawback that the calling code might rescue and swallow it, but I think that maybe the lesser of two evils.

(We should actually disallow attempting to interrupt jobs inside database transactions and I have a plan to add that soon but that's a separate issue I think).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking a look.

end

def instrument(event, payload = {})
job.send(:instrument, event, **payload)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be best to avoid send here, as it not good for JIT and such. The method can be public but marked as # :nodoc:.

# An UnadvanceableCursorError error will be raised if the cursor does not implement +succ+.
def advance!(from: nil)
from = cursor if from.nil?
raise UnadvanceableCursorError, "Cursor class '#{from.class}' does not implement succ, " unless from.respond_to?(:succ)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than to check respond_to? on every single call, it would be preferable to handle NoMethodError.

Comment on lines +3 to +12
class ContinuableDuplicateStepJob < ActiveJob::Base
include ActiveJob::Continuable

def perform
step :duplicate do |step|
end
step :duplicate do |step|
end
end
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All these "fixture" jobs could be defined inside the test classes themselves.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a couple of test jobs defined in their classes, but the pattern here is separate files even though most jobs are only used by one test case. But either way works for me if you'd prefer I moved them.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. IMHO it's much easier to understand tests when the job they rely on is just above.

@ghiculescu
Copy link
Member

@djmb will you be working on a solid queue PR or is that open to the community? It would be good to have this supported in the default Rails setup.

Copy link
Contributor Author

@djmb djmb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review @byroot! I'll raise another PR to address your points.

Comment on lines +30 to +36
raise ArgumentError, "Step method '#{step_name}' must accept 0 or 1 arguments" if step_method.arity > 1

if step_method.parameters.any? { |type, name| type == :key || type == :keyreq }
raise ArgumentError, "Step method '#{step_name}' must not accept keyword arguments"
end

step_method.arity == 0 ? step_method.call : step_method.call(step)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that a job that is using continuations would be expected to be long running and to have a reasonably small number of steps, I wouldn't expect the cost of this to be significant.

# Raised when a job is interrupted, allowing Active Job to requeue it.
# This inherits from +Exception+ rather than +StandardError+, so it's not
# caught by normal exception handling.
class Interrupt < Exception; end
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah that's a much better way to do it 👍

Comment on lines +3 to +12
class ContinuableDuplicateStepJob < ActiveJob::Base
include ActiveJob::Continuable

def perform
step :duplicate do |step|
end
step :duplicate do |step|
end
end
end
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a couple of test jobs defined in their classes, but the pattern here is separate files even though most jobs are only used by one test case. But either way works for me if you'd prefer I moved them.

djmb added a commit to djmb/sidekiq that referenced this pull request Jun 3, 2025
rails/rails#55127 adds a new `stopping?` method
to the `AbstractAdapter`.

`SidekiqAdapter` should implement that at some point, but for now by
subclassing `AbstractAdapter` we ensure that it falls back to the
default implementation.
mperham pushed a commit to sidekiq/sidekiq that referenced this pull request Jun 3, 2025
rails/rails#55127 adds a new `stopping?` method
to the `AbstractAdapter`.

`SidekiqAdapter` should implement that at some point, but for now by
subclassing `AbstractAdapter` we ensure that it falls back to the
default implementation.
@djmb
Copy link
Contributor Author

djmb commented Jun 4, 2025

@ghiculescu - yes I'll be adding a PR for SolidQueue soon. I have some additional changes coming soon to add configuration and safety checks (e.g. ensuring that you don't interrupt database transactions) so I want to get those out first.

@djmb djmb mentioned this pull request Jun 9, 2025
djmb added a commit to djmb/resque that referenced this pull request Jun 9, 2025
Rails is getting a new feature called
[Active Job Continuations](rails/rails#55127),
which will allow jobs to be interrupted and resumed.

For this to be integrated with Resque, we need to know when a worker is
shutting down.

There's a new `stopping?` method on the Rails queue adapter. Add a
`shutdown` hook to Resque to allow us to implement that method.
djmb added a commit to djmb/solid_queue that referenced this pull request Jun 10, 2025
[Active Job Continuations](rails/rails#55127)
will be added in Rails 8.1. Queue adapters need to implement the
`stopping?` method to support them.

Implement the method and set it to true when a worker is stopping.

Also add Rails versions so we can test against Rails main. To prevent an
explosion of combinations, I've switched to only testing against minor
versions of Ruby.
djmb added a commit to djmb/solid_queue that referenced this pull request Jun 10, 2025
[Active Job Continuations](rails/rails#55127)
will be added in Rails 8.1. Queue adapters need to implement the
`stopping?` method to support them.

Implement the method and set it to true when a worker is stopping.

Also add Rails versions so we can test against Rails main. To prevent an
explosion of combinations, I've switched to only testing against minor
versions of Ruby.
djmb added a commit to djmb/solid_queue that referenced this pull request Jun 10, 2025
[Active Job Continuations](rails/rails#55127)
will be added in Rails 8.1. Queue adapters need to implement the
`stopping?` method to support them.

Implement the method and set it to true when a worker is stopping.

Also add Rails versions so we can test against Rails main. To prevent an
explosion of combinations, I've switched to only testing against minor
versions of Ruby.
djmb added a commit to djmb/solid_queue that referenced this pull request Jun 10, 2025
[Active Job Continuations](rails/rails#55127)
will be added in Rails 8.1. Queue adapters need to implement the
`stopping?` method to support them.

Implement the method and set it to true when a worker is stopping.

Also add Rails versions so we can test against Rails main. To prevent an
explosion of combinations, I've switched to only testing against minor
versions of Ruby.
djmb added a commit to djmb/solid_queue that referenced this pull request Jun 10, 2025
[Active Job Continuations](rails/rails#55127)
will be added in Rails 8.1. Queue adapters need to implement the
`stopping?` method to support them.

Implement the method and set it to true when a worker is stopping.

Also add Rails versions so we can test against Rails main. To prevent an
explosion of combinations, I've switched to only testing against minor
versions of Ruby.
djmb added a commit to djmb/solid_queue that referenced this pull request Jun 10, 2025
[Active Job Continuations](rails/rails#55127)
will be added in Rails 8.1. Queue adapters need to implement the
`stopping?` method to support them.

Implement the method and set it to true when a worker is stopping.

Also add Rails versions so we can test against Rails main. To prevent an
explosion of combinations, I've switched to only testing against minor
versions of Ruby.
djmb added a commit to djmb/solid_queue that referenced this pull request Jun 10, 2025
[Active Job Continuations](rails/rails#55127)
will be added in Rails 8.1. Queue adapters need to implement the
`stopping?` method to support them.

Implement the method and set it to true when a worker is stopping.

Also add Rails versions so we can test against Rails main. To prevent an
explosion of combinations, I've switched to only testing against minor
versions of Ruby.
djmb added a commit to djmb/solid_queue that referenced this pull request Jun 10, 2025
[Active Job Continuations](rails/rails#55127)
will be added in Rails 8.1. Queue adapters need to implement the
`stopping?` method to support them.

Implement the method and set it to true when a worker is stopping.

Also add Rails versions so we can test against Rails main. To prevent an
explosion of combinations, I've switched to only testing against minor
versions of Ruby.
djmb added a commit to djmb/solid_queue that referenced this pull request Jun 10, 2025
[Active Job Continuations](rails/rails#55127)
will be added in Rails 8.1. Queue adapters need to implement the
`stopping?` method to support them.

Implement the method and set it to true when a worker is stopping.

Also add Rails versions so we can test against Rails main. To prevent an
explosion of combinations, I've switched to only testing against minor
versions of Ruby.
djmb added a commit to djmb/rails that referenced this pull request Jun 10, 2025
Follow up to rails#55127 and rails#55151

- Instrument steps in a block so that runtime is recorded
  (matching perform/perform_started)
- Allow job resumption configuration - max_resumptions, resume_options,
  resume_errors_after_advancing
- Add Active Record Railtie to prevent checkpoints in database
  transactions
- Checkpoint before each step except the first, rather than after
  each step.
- Error if order of completed steps changes when re-running
djmb added a commit to djmb/solid_queue that referenced this pull request Jun 10, 2025
[Active Job Continuations](rails/rails#55127)
will be added in Rails 8.1. Queue adapters need to implement the
`stopping?` method to support them.

Implement the method and set it to true when a worker is stopping.

Also add Rails versions so we can test against Rails main. To prevent an
explosion of combinations, I've switched to only testing against minor
versions of Ruby.
rosa pushed a commit to rails/solid_queue that referenced this pull request Jun 10, 2025
[Active Job Continuations](rails/rails#55127)
will be added in Rails 8.1. Queue adapters need to implement the
`stopping?` method to support them.

Implement the method and set it to true when a worker is stopping.

Also add Rails versions so we can test against Rails main. To prevent an
explosion of combinations, I've switched to only testing against minor
versions of Ruby.
jeremy pushed a commit that referenced this pull request Jun 10, 2025
Follow up to #55127 and #55151

- Instrument steps in a block so that runtime is recorded
  (matching perform/perform_started)
- Allow job resumption configuration - max_resumptions, resume_options,
  resume_errors_after_advancing
- Add Active Record Railtie to prevent checkpoints in database
  transactions
- Checkpoint before each step except the first, rather than after
  each step.
- Error if order of completed steps changes when re-running
Ridhwana pushed a commit to Ridhwana/rails that referenced this pull request Jun 17, 2025
Continuations provide a mechanism for interrupting and resuming jobs.
This allows long running jobs to make progress across application
restarts.

Jobs should include the `ActiveJob::Continuable` module to enable
continuations. Continuable jobs are automatically retried when
interrupted.

Use the `step` method to define the steps in your job. Steps can use an
optional cursor to track progress in the step.

Steps are executed as soon as they are encountered. If a job is
interrupted, previously completed steps will be skipped. If a step is in
progress, it will be resumed with the last recorded cursor.

Code that is not part of a step will be executed on each job execution.

You can pass a block or a method name to the step method. The block
will be called with the step object as an argument. Methods can either
take no arguments or a single argument for the step object.

```ruby
class ProcessImportJob < ApplicationJob
  include ActiveJob::Continuable

  def perform(import_id)
    # This always runs, even if the job is resumed.
    @import = Import.find(import_id)

    step :validate do
      @import.validate!
    end

    step :process_records do |step|
      @import.records.find_each(start: step.cursor)
        record.process
        step.advance! from: record.id
      end
    end

    step :reprocess_records
    step :finalize
  end

  def reprocess_records(step)
    @import.records.find_each(start: step.cursor)
      record.reprocess
      step.advance! from: record.id
    end
  end

  def finalize
    @import.finalize!
  end
end
```

**Cursors**

Cursors are used to track progress within a step. The cursor can be any
object that is serializable as an argument to
`ActiveJob::Base.serialize`. It defaults to `nil`.

When a step is resumed, the last cursor value is restored. The code in
the step is responsible for using the cursor to continue from the right
point.

`set!` sets the cursor to a specific value.

```ruby
step :iterate_items do |step|
  items[step.cursor..].each do |item|
    process(item)
    step.set! (step.cursor || 0) + 1
  end
end
```

A starting value for the cursor can be set when defining the step:

```ruby
step :iterate_items, start: 0 do |step|
  items[step.cursor..].each do |item|
    process(item)
    step.set! step.cursor + 1
  end
end
```

The cursor can be advanced with `advance!`. This calls `succ` on
the current cursor value. It raises an
`ActiveJob::Continuation::UnadvanceableCursorError` if the cursor does
not implement `succ`.

```ruby
step :iterate_items, start: 0 do |step|
  items[step.cursor..].each do |item|
    process(item)
    step.advance!
  end
end
```

You can optionally pass a `from` argument to `advance!`. This is
useful when iterating over a collection of records where IDs may not be
contiguous.

```ruby
step :process_records do |step|
  import.records.find_each(start: step.cursor)
    record.process
    step.advance! from: record.id
  end
end
```

You can use an array to iterate over nested records:

```ruby
step :process_nested_records, start: [ 0, 0 ] do |step|
  Account.find_each(start: step.cursor[0]) do |account|
    account.records.find_each(start: step.cursor[1]) do |record|
      record.process
      step.set! [ account.id, record.id + 1 ]
    end
    step.set! [ account.id + 1, 0 ]
  end
end
```

Setting or advancing the cursor creates a checkpoint. You can also
create a checkpoint manually by calling the `checkpoint!` method on the
step. This is useful if you want to allow interruptions, but don't need
to update the cursor.

```ruby
step :destroy_records do |step|
  import.records.find_each do |record|
    record.destroy!
    step.checkpoint!
  end
end
```

**Checkpoints**

A checkpoint is where a job can be interrupted. At a checkpoint the job
will call `queue_adapter.stopping?`. If it returns true, the job will
raise an `ActiveJob::Continuation::Interrupt` exception.

There is an automatic checkpoint at the end of each step. Within a step
calling one is created when calling `set!`, `advance!` or
`checkpoint!`.

Jobs are not automatically interrupted when the queue adapter is marked
as stopping - they will continue to run either until the next
checkpoint, or when the process is stopped.

This is to allow jobs to be interrupted at a safe point, but it also
means that the jobs should checkpoint more frequently than the shutdown
timeout to ensure a graceful restart.

When interrupted, the job will automatically retry with the progress
serialized in the job data under the `continuation` key.

The serialized progress contains:
- a list of the completed steps
- the current step and its cursor value (if one is in progress)

**Errors**

If a job raises an error and is not retried via ActiveJob, it will be
passed back to the queue adapter and any progress in this execution will
be lost.

To mitigate this, the job will automatically retried if it raises an
error after it has made progress. Making progress is defined as having
completed a step or advanced the cursor within the current step.

**Queue Adapter support**

Active Job Continuations call the `stopping?` method on the queue
adapter to check if we are in the shutdown phase. By default this will
return false, so the adapters will need to be updated to implement this
method.

This implements the `stopping?` method in the test and Sidekiq adapters.

It would be possible to add support to Delayed Job via a plugin, but it
would probably be better to add a new lifecycle callback to DJ for when
it is shutting down.

Resque also will require a new hook before it can be supported.

Solid Queue's adapter is not part of Rails, but support can be added
there via the `on_worker_stop` hook.

**Inspiration**

This took a lot inspiration from Shopify's
[job-iteration](https://github.com/Shopify/job-iteration) gem.

The main differences are:
- Continuations are Active Job only, so they don't provide the custom
  enumerators that job-iteration does.
- They allow multi-step flows
- They don't intercept the perform method
- Continuations are a sharp knife - you need to manually checkpoint and
  update the cursor. But you could build a job-iteration-like API on top
  of them.

**Future work**

It would be a good exercise to see if the job-iteration gem could be
adapted to run on top of Active Job Continuations to highlight any
missing features - we'd want to add things like max iteration time, max
job runtime and forcing a job to stop.

Another thing to consider is a mechanism for checking whether it is safe
to call a checkpoint. Ideally you wouldn't allow them within database
transactions as they'll cause a rollback. We can maybe inject checkpoint
safety handlers and add a default one that checks whether we are in any
active transactions.
Ridhwana pushed a commit to Ridhwana/rails that referenced this pull request Jun 17, 2025
Follow up to rails#55127 and rails#55151

- Instrument steps in a block so that runtime is recorded
  (matching perform/perform_started)
- Allow job resumption configuration - max_resumptions, resume_options,
  resume_errors_after_advancing
- Add Active Record Railtie to prevent checkpoints in database
  transactions
- Checkpoint before each step except the first, rather than after
  each step.
- Error if order of completed steps changes when re-running
bensheldon pushed a commit to bensheldon/rails that referenced this pull request Jun 17, 2025
Continuations provide a mechanism for interrupting and resuming jobs.
This allows long running jobs to make progress across application
restarts.

Jobs should include the `ActiveJob::Continuable` module to enable
continuations. Continuable jobs are automatically retried when
interrupted.

Use the `step` method to define the steps in your job. Steps can use an
optional cursor to track progress in the step.

Steps are executed as soon as they are encountered. If a job is
interrupted, previously completed steps will be skipped. If a step is in
progress, it will be resumed with the last recorded cursor.

Code that is not part of a step will be executed on each job execution.

You can pass a block or a method name to the step method. The block
will be called with the step object as an argument. Methods can either
take no arguments or a single argument for the step object.

```ruby
class ProcessImportJob < ApplicationJob
  include ActiveJob::Continuable

  def perform(import_id)
    # This always runs, even if the job is resumed.
    @import = Import.find(import_id)

    step :validate do
      @import.validate!
    end

    step :process_records do |step|
      @import.records.find_each(start: step.cursor)
        record.process
        step.advance! from: record.id
      end
    end

    step :reprocess_records
    step :finalize
  end

  def reprocess_records(step)
    @import.records.find_each(start: step.cursor)
      record.reprocess
      step.advance! from: record.id
    end
  end

  def finalize
    @import.finalize!
  end
end
```

**Cursors**

Cursors are used to track progress within a step. The cursor can be any
object that is serializable as an argument to
`ActiveJob::Base.serialize`. It defaults to `nil`.

When a step is resumed, the last cursor value is restored. The code in
the step is responsible for using the cursor to continue from the right
point.

`set!` sets the cursor to a specific value.

```ruby
step :iterate_items do |step|
  items[step.cursor..].each do |item|
    process(item)
    step.set! (step.cursor || 0) + 1
  end
end
```

A starting value for the cursor can be set when defining the step:

```ruby
step :iterate_items, start: 0 do |step|
  items[step.cursor..].each do |item|
    process(item)
    step.set! step.cursor + 1
  end
end
```

The cursor can be advanced with `advance!`. This calls `succ` on
the current cursor value. It raises an
`ActiveJob::Continuation::UnadvanceableCursorError` if the cursor does
not implement `succ`.

```ruby
step :iterate_items, start: 0 do |step|
  items[step.cursor..].each do |item|
    process(item)
    step.advance!
  end
end
```

You can optionally pass a `from` argument to `advance!`. This is
useful when iterating over a collection of records where IDs may not be
contiguous.

```ruby
step :process_records do |step|
  import.records.find_each(start: step.cursor)
    record.process
    step.advance! from: record.id
  end
end
```

You can use an array to iterate over nested records:

```ruby
step :process_nested_records, start: [ 0, 0 ] do |step|
  Account.find_each(start: step.cursor[0]) do |account|
    account.records.find_each(start: step.cursor[1]) do |record|
      record.process
      step.set! [ account.id, record.id + 1 ]
    end
    step.set! [ account.id + 1, 0 ]
  end
end
```

Setting or advancing the cursor creates a checkpoint. You can also
create a checkpoint manually by calling the `checkpoint!` method on the
step. This is useful if you want to allow interruptions, but don't need
to update the cursor.

```ruby
step :destroy_records do |step|
  import.records.find_each do |record|
    record.destroy!
    step.checkpoint!
  end
end
```

**Checkpoints**

A checkpoint is where a job can be interrupted. At a checkpoint the job
will call `queue_adapter.stopping?`. If it returns true, the job will
raise an `ActiveJob::Continuation::Interrupt` exception.

There is an automatic checkpoint at the end of each step. Within a step
calling one is created when calling `set!`, `advance!` or
`checkpoint!`.

Jobs are not automatically interrupted when the queue adapter is marked
as stopping - they will continue to run either until the next
checkpoint, or when the process is stopped.

This is to allow jobs to be interrupted at a safe point, but it also
means that the jobs should checkpoint more frequently than the shutdown
timeout to ensure a graceful restart.

When interrupted, the job will automatically retry with the progress
serialized in the job data under the `continuation` key.

The serialized progress contains:
- a list of the completed steps
- the current step and its cursor value (if one is in progress)

**Errors**

If a job raises an error and is not retried via ActiveJob, it will be
passed back to the queue adapter and any progress in this execution will
be lost.

To mitigate this, the job will automatically retried if it raises an
error after it has made progress. Making progress is defined as having
completed a step or advanced the cursor within the current step.

**Queue Adapter support**

Active Job Continuations call the `stopping?` method on the queue
adapter to check if we are in the shutdown phase. By default this will
return false, so the adapters will need to be updated to implement this
method.

This implements the `stopping?` method in the test and Sidekiq adapters.

It would be possible to add support to Delayed Job via a plugin, but it
would probably be better to add a new lifecycle callback to DJ for when
it is shutting down.

Resque also will require a new hook before it can be supported.

Solid Queue's adapter is not part of Rails, but support can be added
there via the `on_worker_stop` hook.

**Inspiration**

This took a lot inspiration from Shopify's
[job-iteration](https://github.com/Shopify/job-iteration) gem.

The main differences are:
- Continuations are Active Job only, so they don't provide the custom
  enumerators that job-iteration does.
- They allow multi-step flows
- They don't intercept the perform method
- Continuations are a sharp knife - you need to manually checkpoint and
  update the cursor. But you could build a job-iteration-like API on top
  of them.

**Future work**

It would be a good exercise to see if the job-iteration gem could be
adapted to run on top of Active Job Continuations to highlight any
missing features - we'd want to add things like max iteration time, max
job runtime and forcing a job to stop.

Another thing to consider is a mechanism for checking whether it is safe
to call a checkpoint. Ideally you wouldn't allow them within database
transactions as they'll cause a rollback. We can maybe inject checkpoint
safety handlers and add a default one that checks whether we are in any
active transactions.
bensheldon pushed a commit to bensheldon/rails that referenced this pull request Jun 17, 2025
Follow up to rails#55127 and rails#55151

- Instrument steps in a block so that runtime is recorded
  (matching perform/perform_started)
- Allow job resumption configuration - max_resumptions, resume_options,
  resume_errors_after_advancing
- Add Active Record Railtie to prevent checkpoints in database
  transactions
- Checkpoint before each step except the first, rather than after
  each step.
- Error if order of completed steps changes when re-running
@mech
Copy link

mech commented Jul 18, 2025

Is this doing the same thing as what https://github.com/fractaledmind/acidic_job is doing?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants