Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add perform_all_later to enqueue multiple jobs at once #46603

Merged
merged 1 commit into from
Feb 13, 2023

Conversation

Mangara
Copy link
Contributor

@Mangara Mangara commented Nov 28, 2022

Motivation / Background

Sidekiq has a useful optimisation called push_bulk that enqueues many jobs at once, eliminating the repeated Redis roundtrips. However, this feature is not exposed through Active Job, so it only works for Sidekiq::Worker jobs. This adds a barrier to Active Job adoption for apps that rely on this feature. It also makes it harder for other queue adapters to implement similar functionality, as they then have to take care of serialization, callbacks, etc. themselves.

Detail

This PR adds ActiveJob.perform_all_later([<job 1>, <job2>]), backed by Sidekiq's push_bulk and with a fallback to enqueuing serially if the queue adapter does not support bulk enqueue.

Arguments

It's hard to support the full diversity of ways a Ruby method can be called with a simple array. In particular, jobs with a mix of positional and keyword arguments are difficult to distinguish from jobs with positional arguments that include a hash. Sidekiq gets around this by not supporting keyword arguments.

While it is possible to support everything by treating the last hash in the array of arguments as keywords and requiring jobs with a hash as last positional argument to include an additional empty hash of keyword arguments, based on feedback on the PR, we chose to pass instantiated jobs, so we can let Ruby handle all these complexities.

Delay

Passing in instantiated jobs also makes it easy for the user to specify delays for each job:

user_jobs = users.map.with_index do |user, user_index|
  UserJob.new(user).set(wait: user_index.seconds)
end
ActiveJob.perform_all_later(user_jobs)

Return value

The return value of perform_all_later is limited by Sidekiq's current behaviour. push_bulk returns the (provider) ids of all jobs that were successfully enqueued. This means that if we try to enqueue 2 jobs, we may get back a single id with no way of knowing which job it belongs to, so we can't even map it back to Active Job's job ids. I chose to return the number of successfully enqueued jobs, but it may be better to always return true for now, so that we can more easily change it later? In an ideal world, I think the return value would be an array of either the job or false, to mirror the return value of perform_later.

Callbacks

Based on feedback in this PR, perform_all_later does not run any callbacks. This is in line with the Active Record bulk methods, so it shouldn't be too surprising to users and it is clearly stated in the documentation.

There are several issues with running callbacks:

  • I don't see a good way to run around_enqueue callbacks for each job in a way that the callback begins before the job is enqueued and ends after. Running them another way breaks the assumptions and gives strange results, for instance for Active Job's own enqueue instrumentation.
  • The callbacks run on individual jobs, so they can't take advantage of the bulk nature of this method. This could lead to N+1 queries and greatly slow down what is meant to be a performance optimization.

Batching

Sidekiq recommends batches of no more than 1000 jobs, and their newer bulk API perform_bulk will automatically break them up into batches of that size if you pass a larger array. As recommended batch sizes will vary between back-ends, I'm not sure if this should be something that Active Job handles as opposed to the adapter, although we could make it configurable. This is also easy to add later.

Additional information

The performance benefit for 1000 jobs can be more than an order of magnitude:

Enqueue type Serial time (ms) Bulk time (ms) Speedup
Raw Sidekiq 2661 119 22x
Active Job Sidekiq 2853 208 14x

(Measured in a simple test app in our production environment.)

See also #39499 which was a previous stab at this and where I stole the name from (🙇 @vinistock).

Checklist

Before submitting the PR make sure the following are checked:

  • This Pull Request is related to one change. Changes that are unrelated should be opened in separate PRs.
  • Commit message has a detailed description of what changed and why. If this PR fixes a related issue include it in the commit message. Ex: [Fix #issue-number]
  • Tests are added or updated if you fix a bug or add a feature.
  • CHANGELOG files are updated for the changed libraries if there is a behavior change or additional feature. Minor bug fixes and documentation changes should not be included.
  • CI is passing.

@rails-bot rails-bot bot added the activejob label Nov 28, 2022
@Mangara Mangara force-pushed the mangara-multi-enqueue branch from 0f8ae98 to 3f9262c Compare November 28, 2022 20:19
@georgeclaghorn
Copy link
Contributor

Thanks for working on this! I deleted two review comments because I realized after posting that you addressed them in the PR description.

I will rephrase one of them here: I think it would make sense to skip *_enqueue callbacks for perform_all_later.

  1. This matches the behavior of Active Record bulk APIs—insert_all, update_all, and upsert_all—which don’t run callbacks.
  2. Active Job itself currently relies on around_enqueue for instrumentation, which will be incorrect for bulk-enqueued jobs without additional work.
  3. Perhaps most importantly, this will make up some of the 3x performance difference between raw Sidekiq and Active Job bulk enqueuing.

You said:

Not doing this could lead to very subtle bugs where a job serializes successfully on its own, but not in bulk, because it relies on some global state (for example a current user or database connection) being set by an around_enqueue callback.

A similar argument could be made for Active Record’s bulk APIs, but to my knowledge, this hasn’t been a major source of confusion in practice. These APIs exist to accommodate intentional performance optimization and it’s enough that they’re clearly documented not to run callbacks.

@Mangara
Copy link
Contributor Author

Mangara commented Nov 30, 2022

Good points. I'm worried that not running callbacks will make them far less usable for many apps, though. As an example, our main monolith does a lot of validation in before_enqueue callbacks that we'd ideally not bypass even for bulk enqueue. I wonder if running just before_enqueue and after_enqueue callbacks would be worthwhile, or if that'd be even more confusing? We can clearly document that it does not run around_enqueue callbacks, which are the most problematic kind.

@Mangara Mangara force-pushed the mangara-multi-enqueue branch from 3f9262c to 2f56ba8 Compare December 5, 2022 15:25
@simi
Copy link
Contributor

simi commented Dec 8, 2022

Hello! Thanks for opening this, similar contribution is on my TODO list for some time already. I can share our experience on this.

We have implemented something similar (called perform_async_multi) in our app to be able to schedule effectively ~400k DB based jobs in batches in small time. We indeed skip callbacks and we do wrap scheduling of sub-jobs into one big scheduling job running callbacks manually in effective way. As mentioned, it is the price for the performance, similar to update_all.

I tried to extract pseudo code of our approach.

class UserUpdate < ApplicationJob
  before_enqueue :update_user_state

  def perform(user_id)
    # ... some logic
  end

  def update_user_state
    User.find(user_id).queue_update # state machine change
  end
end

class UserGroupUpdate < ApplicationJob
  before_enqueue :update_users_states

  def perform(user_group_id)
    User.where(group_id: user_group_id).waiting_for_update.find_in_batches do |batch|
      UserUpdate.perform_async_multi(*batch.map(&:id))
    end
  end

  def update_users_states
    # state machine bulk change
    # we need this since enqueue callback is skipped due to
    # perform_async_multi is used for maximum performance
    User.where(group_id: user_group_id).idle.queue_update!
  end
end

Thanks to perform_async_multi (backed by activerecord-import gem, most-likely replaceable today with built-in insert_all) DB entries are inserted in batches. We do run custom DB adapter for ActiveJob, but it is similar to DelayedJob or GoodJob (we plan to upgrade to soon). These DB backed adapters could massively benefit from batch scheduling as well.

@Mangara
Copy link
Contributor Author

Mangara commented Dec 8, 2022

@simi Thanks for that example! So it looks like with the current implementation in this PR that does run before_enqueue and after_enqueue callbacks, your bulk job could be written as

class UserGroupUpdate < ApplicationJob
  def perform(user_group_id)
    User.where(group_id: user_group_id).waiting_for_update.find_in_batches do |batch|
      UserUpdate.perform_all_later(*batch.map(&:id))
    end
  end
end

Does that meet your needs, or do you think it would be better not to run the callbacks?

@simi
Copy link
Contributor

simi commented Dec 9, 2022

@simi Thanks for that example! So it looks like with the current implementation in this PR that does run before_enqueue and after_enqueue callbacks, your bulk job could be written as

class UserGroupUpdate < ApplicationJob
  def perform(user_group_id)
    User.where(group_id: user_group_id).waiting_for_update.find_in_batches do |batch|
      UserUpdate.perform_all_later(*batch.map(&:id))
    end
  end
end

Does that meet your needs, or do you think it would be better not to run the callbacks?

In our case we would like to at least opt-out of the callbacks, since we would like to to run all callbacks in "bulk" way in "group" update wrapper. Since User.where(group_id: user_group_id).idle.queue_update! in my example is not just running User.find(user_id).queue_update per each user in the group (= 1 SQL query update per user in group), but it also benefits from bulk update (= 1 SQL query in any case).

@Mangara
Copy link
Contributor Author

Mangara commented Dec 10, 2022

That makes sense. I'll change the proposed implementation in this PR to not run callbacks. We can always add an option to run callbacks later, but the default will be not to. As @georgeclaghorn pointed out, that is in line with the Active Record bulk methods, so it won't be too unexpected for users.

@Mangara Mangara force-pushed the mangara-multi-enqueue branch 2 times, most recently from 31b4a0a to baaecb9 Compare December 13, 2022 19:09
@casperisfine
Copy link
Contributor

I've thought a bit about this last night, and I'm afraid the only way to handle keyword arguments properly would be to expose some helper to process arguments:

MyJob.perform_all_later([
  MyJob.arguments(1, foo: 42),
  MyJob.arguments(1, {hello: "world"}, foo: 42),
])

But then at this stage I wonder if it may make sense to just instantiate the Job object and pass a list of job instances. That would even allow to queue multiple job types at once?

@simi
Copy link
Contributor

simi commented Dec 15, 2022

But then at this stage I wonder if it may make sense to just instantiate the Job object and pass a list of job instances. That would even allow to queue multiple job types at once?

Something like this (maybe covered with some high level API on job level as well)?

batch = ActiveJob::Batch.new

users.each do |user|
  batch.jobs << UserJob.new(id: user.id)
end

batch.perform_later

@casperisfine
Copy link
Contributor

More like:

ActiveJob.perform_all_later(users.map { |u| UserJob.new(id: user.id) })

Also, instantiating the jobs means running the callbacks is less of a worry.

Now of course that means much more allocations, but for most job classes that will be one or two allocs per job which I think is negligible compare to what is needed to serialize the arguments anyway.

@Mangara
Copy link
Contributor Author

Mangara commented Dec 15, 2022

Another option would be to treat the last hash in the args array (if any) as keywords. Jobs that take a hash as their last positional argument would need to pass an additional empty hash of keyword arguments, but would otherwise work.

MyJob.perform_all_later([[first_arg1, first_arg2], [second_arg1, second_arg2]]) # Positional only
MyJob.perform_all_later([[first_arg, { key: first_key }], [second_arg, { key: second_key }]]) # Positional and keyword
MyJob.perform_all_later([[first_arg, { hello: "world" }, {}], [second_arg, { hello: "world" }, {}]]) # Positional hash
MyJob.perform_all_later([[{ key: first_key }], [{ key: first_key }]]) # Keywords only (could support omitting the wrapping array here)

@bensheldon
Copy link
Contributor

I'm the maintainer of GoodJob. I wanted to offer my strong support of this proposal 👍🏻

From my perspective of what's possible with GoodJob, I'd simply recommend to my users that they pass in job instances like @casperisfine suggested:

jobs = users.map { |u| UserJob.new(id: user.id).set(delay: 10.minutes) }
ApplicationJob.perform_all_later(jobs)

That also makes it possible to determine which jobs failed to enqueue by expecting/hoping the Adapter is able to set provider_job_id on the jobs that were enqueued e.g. unenqueued_jobs = jobs.reject(&:provider_job_id). Directionally, I'd like developers to become more comfortable/familiar with the instance interface of ActiveJob::Base.

Re: delays, I think the Adapter should just be expected to grab job.scheduled_at and use that value if present. I'd prefer pushing that logic down to the adapter instead of replicating separate enqueue_bulk/enqueue_bulk_at methods. And it allows bulk enqueuing jobs with different options too; all those attributes live on the job instance.

@casperisfine
Copy link
Contributor

Another option would be to treat the last hash in the args array (if any) as keywords.

I'd be ok with that, but as an explictly lower level API that doesn't instantiate the job instances and don't run callbacks. To me that's a more niche feature that can only be used in specific cases, like ActiveRecord::Base.insert_all.

@Mangara
Copy link
Contributor Author

Mangara commented Jan 4, 2023

I updated the API to take an array of jobs, as suggested. I'm not sure if the code is in the right place now. I left it in the modules (grouped by functionality), but as the API is now top-level, it may make more sense in a separate file, or just the top-level active_job.rb?

I made no changes to callbacks (they are not run), as they still suffer from the same problems as before (detailed in the description and earlier discussion).

activejob/lib/active_job/enqueuing.rb Outdated Show resolved Hide resolved
private
def instrument_enqueue_all(queue_adapter, jobs)
payload = { adapter: queue_adapter, jobs: jobs }
ActiveSupport::Notifications.instrument("enqueue_all.active_job", payload) do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a different event for mass enqueue might be a bit of a problem. e.g. tracing framework might not handle both etc.

I don't have a solution in mind right now, but making a note to put more thoughts into this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The advantage of a separate event is that the semantics of the current event remain the same, and frameworks can add support for the new event over time. If there is a clean way of plugging in the existing monitoring, I'd be all for that, but I'm not sure there is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That new event is really the only thing that bothers me here. @rafaelfranca I'd love to hear your thoughts on it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Mangara ok, sorry for the delay, I just had a chat with @rafaelfranca and introducing that new event is fine, we'll just have to make sure it's properly pointed out in the changelog etc.

So no more blocker from me. I'll merge this next week.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to add this to the instrumentation guide:
https://edgeguides.rubyonrails.org/active_support_instrumentation.html#active-job

I'm happy to do this after this PR lands though, not a blocker!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! It should definitely be documented.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 3748385 🙏

@bensheldon
Copy link
Contributor

fyi'ing @mperham for visibility on Sidekiq.

@mperham
Copy link
Contributor

mperham commented Jan 21, 2023

The return value of perform_all_later is limited by Sidekiq's current behaviour. push_bulk returns the (provider) ids of all jobs that were successfully enqueued. This means that if we try to enqueue 2 jobs, we may get back a single id with no way of knowing which job it belongs to, so we can't even map it back to Active Job's job ids.

This is a totally fair criticism and poor API design on my part. If someone wants to open up a new issue for Sidekiq, we can discuss changing the return value for 7.1.

@Mangara Mangara force-pushed the mangara-multi-enqueue branch from a047397 to e3fabdc Compare February 2, 2023 19:58
Sidekiq has a useful optimisation called `push_bulk` that enqueues many jobs at
once, eliminating the repeated Redis roundtrips. However, this feature is not
exposed through Active Job, so it only works for `Sidekiq::Worker` jobs. This
adds a barrier to Active Job adoption for apps that rely on this feature. It
also makes it harder for other queue adapters to implement similar
functionality, as they then have to take care of serialization, callbacks, etc.
themselves.

This commit adds `ActiveJob.perform_all_later(<job1>, <job2>)`, backed by
Sidekiq's `push_bulk` and with a fallback to enqueuing serially if the queue
adapter does not support bulk enqueue.

The performance benefit for 1000 jobs can be more than an order of magnitude:

| Enqueue type       | Serial time (ms) | Bulk time (ms) | Speedup |
| ------------------ | ---------------- | -------------- | ------- |
| Raw Sidekiq        |             2661 |            119 |     22x |
| Active Job Sidekiq |             2853 |            208 |     14x |

(Measured in a simple test app in our production environment.)

Instrumentation for perform_all_later uses a new event `enqueue_all.active_job`
@Mangara Mangara force-pushed the mangara-multi-enqueue branch from e3fabdc to 9b62f88 Compare February 2, 2023 21:39
@Mangara
Copy link
Contributor Author

Mangara commented Feb 2, 2023

I changed the return value to nil for now, to give us more flexibility for the future. Since we're passing in job instances, queue adapters can give per-job feedback by setting successfully_enqueued and/or enqueue_error on these, if they want.

@byroot byroot merged commit 9b79798 into rails:main Feb 13, 2023
nalabjp added a commit to nalabjp/solid_queue that referenced this pull request May 11, 2024
nalabjp added a commit to nalabjp/solid_queue that referenced this pull request May 11, 2024
nalabjp added a commit to nalabjp/solid_queue that referenced this pull request May 12, 2024
nalabjp added a commit to nalabjp/solid_queue that referenced this pull request May 12, 2024
nalabjp added a commit to nalabjp/solid_queue that referenced this pull request May 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants