Skip to content

Commit

Permalink
allow specifying which queue throttled jobs should be sent to
Browse files Browse the repository at this point in the history
  • Loading branch information
lavaturtle committed Jun 30, 2023
1 parent 37c905a commit b314a8d
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 40 deletions.
2 changes: 1 addition & 1 deletion lib/sidekiq/throttled.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def requeue_throttled(work)
job_class = Object.const_get(message.fetch("wrapped") { message.fetch("class") { return false } })

Registry.get job_class do |strategy|
strategy.requeue_throttled(work, requeue_with: job_class.sidekiq_throttled_requeue_with)
strategy.requeue_throttled(work, **job_class.sidekiq_throttled_requeue_options)
end
end
end
Expand Down
17 changes: 11 additions & 6 deletions lib/sidekiq/throttled/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module Sidekiq
module Throttled
# Configuration holder.
class Configuration
attr_reader :default_requeue_with
attr_reader :default_requeue_options

# Class constructor.
def initialize
Expand All @@ -16,7 +16,7 @@ def initialize
# @return [self]
def reset!
@inherit_strategies = false
@default_requeue_with = :enqueue
@default_requeue_options = { with: :enqueue }

self
end
Expand Down Expand Up @@ -50,11 +50,16 @@ def inherit_strategies?
end

# Specifies how we should return throttled jobs to the queue so they can be executed later.
# Options are `:enqueue` (put them on the end of the queue) and `:schedule` (schedule for later).
# Default: `:enqueue`
# Expects a hash with keys that may include :with and :to
# For :with, options are `:enqueue` (put them on the end of the queue) and `:schedule` (schedule for later).
# For :to, the name of a sidekiq queue should be specified. If none is specified, jobs will by default be
# requeued to the same queue they were originally enqueued in.
# Default: {with: `:enqueue`}
#
def default_requeue_with=(value)
@default_requeue_with = value.intern
def default_requeue_options=(options)
requeue_with = options.delete(:with).intern || :enqueue

@default_requeue_options = options.merge({ with: requeue_with })
end
end
end
Expand Down
15 changes: 8 additions & 7 deletions lib/sidekiq/throttled/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ module Throttled
# include Sidekiq::Throttled::Job
#
# sidkiq_options :queue => :my_queue
# sidekiq_throttle :threshold => { :limit => 123, :period => 1.hour }, :requeue_with => :schedule
# sidekiq_throttle :threshold => { :limit => 123, :period => 1.hour },
# :requeue => { :to => :other_queue, :with => :schedule }
#
# def perform
# # ...
Expand All @@ -32,7 +33,7 @@ module Job
#
# @private
def self.included(worker)
worker.sidekiq_class_attribute :sidekiq_throttled_requeue_with # :enqueue | :schedule
worker.sidekiq_class_attribute :sidekiq_throttled_requeue_options
worker.send(:extend, ClassMethods)
end

Expand Down Expand Up @@ -74,16 +75,16 @@ module ClassMethods
# })
# end
#
# @param [#to_s] requeue_with What to do with jobs that are throttled
# @param [#to_s] requeue What to do with jobs that are throttled
# @see Registry.add for other parameters
# @return [void]
def sidekiq_throttle(**kwargs)
requeue_with = kwargs.delete(:requeue_with) || Throttled.configuration.default_requeue_with
unless VALID_VALUES_FOR_REQUEUE_WITH.include?(requeue_with)
raise ArgumentError, "#{requeue_with} is not a valid value for :requeue_with"
requeue_options = Throttled.configuration.default_requeue_options.merge(kwargs.delete(:requeue) || {})
unless VALID_VALUES_FOR_REQUEUE_WITH.include?(requeue_options[:with])
raise ArgumentError, "requeue: #{requeue_options[:with]} is not a valid value for :with"
end

self.sidekiq_throttled_requeue_with = requeue_with
self.sidekiq_throttled_requeue_options = requeue_options

Registry.add(self, **kwargs)
end
Expand Down
22 changes: 13 additions & 9 deletions lib/sidekiq/throttled/strategy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,23 +72,26 @@ def throttled?(jid, *job_args)
false
end

# Return throttled job to be executed later. Implementation depends on the value of requeue_with:
# Return throttled job to be executed later. Implementation depends on the value of `with`:
# :enqueue means put the job back at the end of the queue immediately
# :schedule means schedule enqueueing the job for a later time when we expect to have capacity
#
# @param [#to_s] requeue_with How to handle the throttled job
# @param [#to_s] with How to handle the throttled job
# @param [#to_s] to Name of the queue to re-queue the job to. If not specified, will use the job's original queue.
# @return [void]
def requeue_throttled(work, requeue_with:)
case requeue_with
def requeue_throttled(work, with:, to: nil)
case with
when :enqueue
# Push the job back to the head of the queue.
target_list = to.nil? ? work.queue : "queue:#{to}"

# This is the same operation Sidekiq performs upon `Sidekiq::Worker.perform_async` call.
Sidekiq.redis { |conn| conn.lpush(work.queue, work.job) }
Sidekiq.redis { |conn| conn.lpush(target_list, work.job) }
when :schedule
# Find out when we will next be able to execute this job, and reschedule for then.
reschedule_throttled(work)
reschedule_throttled(work, to: to)
else
raise "unrecognized requeue_with option #{requeue_with}"
raise "unrecognized :with option #{with}"
end
end

Expand All @@ -107,12 +110,13 @@ def reset!

private

def reschedule_throttled(work)
def reschedule_throttled(work, to: nil)
message = JSON.parse(work.job)
job_class = message.fetch("wrapped") { message.fetch("class") { return false } }
job_args = message["args"]

Sidekiq::Client.enqueue_to_in(work.queue, retry_in(work), Object.const_get(job_class), *job_args)
target_queue = to.nil? ? work.queue : "queue:#{to}"
Sidekiq::Client.enqueue_to_in(target_queue, retry_in(work), Object.const_get(job_class), *job_args)
end

def retry_in(work)
Expand Down
54 changes: 45 additions & 9 deletions spec/lib/sidekiq/throttled/job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,38 @@

working_class.sidekiq_throttle(foo: :bar)

expect(working_class.sidekiq_throttled_requeue_with).to eq :enqueue
expect(working_class.sidekiq_throttled_requeue_options).to eq({ with: :enqueue })
end

it "accepts and stores a requeue_with parameter" do
it "accepts and stores a requeue parameter including :with" do
expect(Sidekiq::Throttled::Registry)
.to receive(:add).with(working_class, foo: :bar)

working_class.sidekiq_throttle(foo: :bar, requeue_with: :schedule)
working_class.sidekiq_throttle(foo: :bar, requeue: { with: :schedule })

expect(working_class.sidekiq_throttled_requeue_with).to eq :schedule
expect(working_class.sidekiq_throttled_requeue_options).to eq({ with: :schedule })
end

context "when a default_requeue_with is set" do
before { Sidekiq::Throttled.configuration.default_requeue_with = :schedule }
it "accepts and stores a requeue parameter including :to" do
expect(Sidekiq::Throttled::Registry)
.to receive(:add).with(working_class, foo: :bar)

working_class.sidekiq_throttle(foo: :bar, requeue: { to: :other_queue })

expect(working_class.sidekiq_throttled_requeue_options).to eq({ to: :other_queue, with: :enqueue })
end

it "accepts and stores a requeue parameter including both :to and :with" do
expect(Sidekiq::Throttled::Registry)
.to receive(:add).with(working_class, foo: :bar)

working_class.sidekiq_throttle(foo: :bar, requeue: { to: :other_queue, with: :schedule })

expect(working_class.sidekiq_throttled_requeue_options).to eq({ to: :other_queue, with: :schedule })
end

context "when default_requeue_options are set" do
before { Sidekiq::Throttled.configuration.default_requeue_options = { with: :schedule } }

after { Sidekiq::Throttled.configuration.reset! }

Expand All @@ -42,16 +60,34 @@

working_class.sidekiq_throttle(foo: :bar)

expect(working_class.sidekiq_throttled_requeue_with).to eq :schedule
expect(working_class.sidekiq_throttled_requeue_options).to eq({ with: :schedule })
end

it "uses the default alongside a requeue parameter including :to" do
expect(Sidekiq::Throttled::Registry)
.to receive(:add).with(working_class, foo: :bar)

working_class.sidekiq_throttle(foo: :bar, requeue: { to: :other_queue })

expect(working_class.sidekiq_throttled_requeue_options).to eq({ to: :other_queue, with: :schedule })
end

it "allows overriding the default" do
expect(Sidekiq::Throttled::Registry)
.to receive(:add).with(working_class, foo: :bar)

working_class.sidekiq_throttle(foo: :bar, requeue_with: :enqueue)
working_class.sidekiq_throttle(foo: :bar, requeue: { with: :enqueue })

expect(working_class.sidekiq_throttled_requeue_options).to eq({ with: :enqueue })
end

it "allows overriding the default and including a :to parameter" do
expect(Sidekiq::Throttled::Registry)
.to receive(:add).with(working_class, foo: :bar)

working_class.sidekiq_throttle(foo: :bar, requeue: { to: :other_queue, with: :enqueue })

expect(working_class.sidekiq_throttled_requeue_with).to eq :enqueue
expect(working_class.sidekiq_throttled_requeue_options).to eq({ to: :other_queue, with: :enqueue })
end
end
end
Expand Down
38 changes: 32 additions & 6 deletions spec/lib/sidekiq/throttled/strategy_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def perform(*); end
Sidekiq::BasicFetch::UnitOfWork.new("queue:default", job, sidekiq_config)
end

describe "with requeue_with = :enqueue" do
describe "with parameter with: :enqueue" do
let(:options) { threshold }

def enqueued_jobs(queue)
Expand All @@ -242,17 +242,31 @@ def enqueued_jobs(queue)

it "puts the job back on the queue" do
expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]])
expect(enqueued_jobs("other_queue")).to be_empty

# Requeue the work
subject.requeue_throttled(work, requeue_with: :enqueue)
subject.requeue_throttled(work, with: :enqueue)

# See that it is now on the end of the queue
expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [1]], ["ThrottledTestJob", [3]],
["ThrottledTestJob", [2]]])
expect(enqueued_jobs("other_queue")).to be_empty
end

it "puts the job back on a different queue when specified" do
expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]])
expect(enqueued_jobs("other_queue")).to be_empty

# Requeue the work
subject.requeue_throttled(work, with: :enqueue, to: :other_queue)

# See that it is now on the end of the queue
expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]])
expect(enqueued_jobs("other_queue")).to eq([["ThrottledTestJob", [1]]])
end
end

describe "with requeue_with = :schedule" do
describe "with parameter with: :schedule" do
def scheduled_redis_item_and_score
Sidekiq.redis do |conn|
# Depending on whether we have redis-client (for Sidekiq 7) or redis-rb (for older Sidekiq),
Expand All @@ -275,13 +289,25 @@ def scheduled_redis_item_and_score
it "reschedules for when the threshold strategy says to, plus some jitter" do
# Requeue the work, see that it ends up in 'schedule'
expect do
subject.requeue_throttled(work, requeue_with: :schedule)
subject.requeue_throttled(work, with: :schedule)
end.to change { Sidekiq.redis { |conn| conn.zcard("schedule") } }.by(1)

item, score = scheduled_redis_item_and_score
expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], "queue" => "queue:default")
expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0)
end

it "reschedules for a different queue if specified" do
# Requeue the work, see that it ends up in 'schedule'
expect do
subject.requeue_throttled(work, with: :schedule, to: :other_queue)
end.to change { Sidekiq.redis { |conn| conn.zcard("schedule") } }.by(1)

item, score = scheduled_redis_item_and_score
expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1],
"queue" => "queue:other_queue")
expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0)
end
end

context "when concurrency constraints given" do
Expand All @@ -294,7 +320,7 @@ def scheduled_redis_item_and_score
it "reschedules for when the concurrency strategy says to, plus some jitter" do
# Requeue the work, see that it ends up in 'schedule'
expect do
subject.requeue_throttled(work, requeue_with: :schedule)
subject.requeue_throttled(work, with: :schedule)
end.to change { Sidekiq.redis { |conn| conn.zcard("schedule") } }.by(1)

item, score = scheduled_redis_item_and_score
Expand All @@ -314,7 +340,7 @@ def scheduled_redis_item_and_score
it "reschedules for the later of what the two say, plus some jitter" do
# Requeue the work, see that it ends up in 'schedule'
expect do
subject.requeue_throttled(work, requeue_with: :schedule)
subject.requeue_throttled(work, with: :schedule)
end.to change { Sidekiq.redis { |conn| conn.zcard("schedule") } }.by(1)

item, score = scheduled_redis_item_and_score
Expand Down
4 changes: 2 additions & 2 deletions spec/lib/sidekiq/throttled_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,15 @@ def perform(*); end
let!(:strategy) { Sidekiq::Throttled::Registry.add("ThrottledTestJob", threshold: { limit: 1, period: 1 }) }

before do
ThrottledTestJob.sidekiq_throttled_requeue_with = :enqueue
ThrottledTestJob.sidekiq_throttled_requeue_options = { to: :other_queue, with: :enqueue }
end

it "invokes requeue_throttled on the strategy" do
payload_jid = jid
job = { class: "ThrottledTestJob", jid: payload_jid.inspect }.to_json
work = Sidekiq::BasicFetch::UnitOfWork.new("queue:default", job, sidekiq_config)

expect(strategy).to receive(:requeue_throttled).with(work, requeue_with: :enqueue)
expect(strategy).to receive(:requeue_throttled).with(work, to: :other_queue, with: :enqueue)

described_class.requeue_throttled work
end
Expand Down

0 comments on commit b314a8d

Please sign in to comment.