Skip to content

Commit

Permalink
accept a Proc for :with and :to arguments
Browse files Browse the repository at this point in the history
  • Loading branch information
lavaturtle committed Jun 30, 2023
1 parent ce5745d commit 122e6e1
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 9 deletions.
2 changes: 1 addition & 1 deletion lib/sidekiq/throttled/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ module ClassMethods
# })
# end
#
# @example Allow max 123 MyJob jobs per hour, and when jobs are throttled, schedule them for later in :other_queue
# @example Allow max 123 MyJob jobs per hour; when jobs are throttled, schedule them for later in :other_queue
#
# class MyJob
# include Sidekiq::Job
Expand Down
22 changes: 14 additions & 8 deletions lib/sidekiq/throttled/strategy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,26 @@ def throttled?(jid, *job_args)
# :enqueue means put the job back at the end of the queue immediately
# :schedule means schedule enqueueing the job for a later time when we expect to have capacity
#
# @param [#to_s] with How to handle the throttled job
# @param [#to_s] to Name of the queue to re-queue the job to. If not specified, will use the job's original queue.
# @param [#to_s, #call] with How to handle the throttled job
# @param [#to_s, #call] to Name of the queue to re-queue the job to.
# If not specified, will use the job's original queue.
# @return [void]
def requeue_throttled(work, with:, to: nil)
case with
def requeue_throttled(work, with:, to: nil) # rubocop:disable Metrics/MethodLength
# Resolve :with and :to arguments, calling them if they are Procs
job_args = JSON.parse(work.job)["args"]
requeue_with = with.respond_to?(:call) ? with.call(*job_args) : with
requeue_to = to.respond_to?(:call) ? to.call(*job_args) : to

case requeue_with
when :enqueue
# Push the job back to the head of the queue.
target_list = to.nil? ? work.queue : "queue:#{to}"
target_list = requeue_to.nil? ? work.queue : "queue:#{requeue_to}"

# This is the same operation Sidekiq performs upon `Sidekiq::Worker.perform_async` call.
Sidekiq.redis { |conn| conn.lpush(target_list, work.job) }
when :schedule
# Find out when we will next be able to execute this job, and reschedule for then.
reschedule_throttled(work, to: to)
reschedule_throttled(work, requeue_to: requeue_to)
else
raise "unrecognized :with option #{with}"
end
Expand All @@ -110,12 +116,12 @@ def reset!

private

def reschedule_throttled(work, to: nil)
def reschedule_throttled(work, requeue_to: nil)
message = JSON.parse(work.job)
job_class = message.fetch("wrapped") { message.fetch("class") { return false } }
job_args = message["args"]

target_queue = to.nil? ? work.queue : "queue:#{to}"
target_queue = requeue_to.nil? ? work.queue : "queue:#{requeue_to}"
Sidekiq::Client.enqueue_to_in(target_queue, retry_in(work), Object.const_get(job_class), *job_args)
end

Expand Down
48 changes: 48 additions & 0 deletions spec/lib/sidekiq/throttled/strategy_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,31 @@ def enqueued_jobs(queue)
expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]])
expect(enqueued_jobs("other_queue")).to eq([["ThrottledTestJob", [1]]])
end

it "accepts a Proc for :with argument" do
expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]])
expect(enqueued_jobs("other_queue")).to be_empty

# Requeue the work
subject.requeue_throttled(work, with: ->(_arg) { :enqueue })

# See that it is now on the end of the queue
expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [1]], ["ThrottledTestJob", [3]],
["ThrottledTestJob", [2]]])
expect(enqueued_jobs("other_queue")).to be_empty
end

it "accepts a Proc for :to argument" do
expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]])
expect(enqueued_jobs("other_queue")).to be_empty

# Requeue the work
subject.requeue_throttled(work, with: :enqueue, to: ->(_arg) { :other_queue })

# See that it is now on the end of the queue
expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]])
expect(enqueued_jobs("other_queue")).to eq([["ThrottledTestJob", [1]]])
end
end

describe "with parameter with: :schedule" do
Expand Down Expand Up @@ -308,6 +333,29 @@ def scheduled_redis_item_and_score
"queue" => "queue:other_queue")
expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0)
end

it "accepts a Proc for :with argument" do
# Requeue the work, see that it ends up in 'schedule'
expect do
subject.requeue_throttled(work, with: ->(_arg) { :schedule })
end.to change { Sidekiq.redis { |conn| conn.zcard("schedule") } }.by(1)

item, score = scheduled_redis_item_and_score
expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], "queue" => "queue:default")
expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0)
end

it "accepts a Proc for :to argument" do
# Requeue the work, see that it ends up in 'schedule'
expect do
subject.requeue_throttled(work, with: :schedule, to: ->(_arg) { :other_queue })
end.to change { Sidekiq.redis { |conn| conn.zcard("schedule") } }.by(1)

item, score = scheduled_redis_item_and_score
expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1],
"queue" => "queue:other_queue")
expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0)
end
end

context "when concurrency constraints given" do
Expand Down

0 comments on commit 122e6e1

Please sign in to comment.