diff --git a/lib/sidekiq/throttled/job.rb b/lib/sidekiq/throttled/job.rb index 5b4f66f..3dc09b7 100644 --- a/lib/sidekiq/throttled/job.rb +++ b/lib/sidekiq/throttled/job.rb @@ -75,7 +75,7 @@ module ClassMethods # }) # end # - # @example Allow max 123 MyJob jobs per hour, and when jobs are throttled, schedule them for later in :other_queue + # @example Allow max 123 MyJob jobs per hour; when jobs are throttled, schedule them for later in :other_queue # # class MyJob # include Sidekiq::Job diff --git a/lib/sidekiq/throttled/strategy.rb b/lib/sidekiq/throttled/strategy.rb index 7759300..b5d6e81 100644 --- a/lib/sidekiq/throttled/strategy.rb +++ b/lib/sidekiq/throttled/strategy.rb @@ -76,20 +76,26 @@ def throttled?(jid, *job_args) # :enqueue means put the job back at the end of the queue immediately # :schedule means schedule enqueueing the job for a later time when we expect to have capacity # - # @param [#to_s] with How to handle the throttled job - # @param [#to_s] to Name of the queue to re-queue the job to. If not specified, will use the job's original queue. + # @param [#to_s, #call] with How to handle the throttled job + # @param [#to_s, #call] to Name of the queue to re-queue the job to. + # If not specified, will use the job's original queue. # @return [void] - def requeue_throttled(work, with:, to: nil) - case with + def requeue_throttled(work, with:, to: nil) # rubocop:disable Metrics/MethodLength + # Resolve :with and :to arguments, calling them if they are Procs + job_args = JSON.parse(work.job)["args"] + requeue_with = with.respond_to?(:call) ? with.call(*job_args) : with + requeue_to = to.respond_to?(:call) ? to.call(*job_args) : to + + case requeue_with when :enqueue # Push the job back to the head of the queue. - target_list = to.nil? ? work.queue : "queue:#{to}" + target_list = requeue_to.nil? ? work.queue : "queue:#{requeue_to}" # This is the same operation Sidekiq performs upon `Sidekiq::Worker.perform_async` call. Sidekiq.redis { |conn| conn.lpush(target_list, work.job) } when :schedule # Find out when we will next be able to execute this job, and reschedule for then. - reschedule_throttled(work, to: to) + reschedule_throttled(work, requeue_to: requeue_to) else raise "unrecognized :with option #{with}" end @@ -110,12 +116,12 @@ def reset! private - def reschedule_throttled(work, to: nil) + def reschedule_throttled(work, requeue_to: nil) message = JSON.parse(work.job) job_class = message.fetch("wrapped") { message.fetch("class") { return false } } job_args = message["args"] - target_queue = to.nil? ? work.queue : "queue:#{to}" + target_queue = requeue_to.nil? ? work.queue : "queue:#{requeue_to}" Sidekiq::Client.enqueue_to_in(target_queue, retry_in(work), Object.const_get(job_class), *job_args) end diff --git a/spec/lib/sidekiq/throttled/strategy_spec.rb b/spec/lib/sidekiq/throttled/strategy_spec.rb index 2325204..57f3cee 100644 --- a/spec/lib/sidekiq/throttled/strategy_spec.rb +++ b/spec/lib/sidekiq/throttled/strategy_spec.rb @@ -264,6 +264,31 @@ def enqueued_jobs(queue) expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) expect(enqueued_jobs("other_queue")).to eq([["ThrottledTestJob", [1]]]) end + + it "accepts a Proc for :with argument" do + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to be_empty + + # Requeue the work + subject.requeue_throttled(work, with: ->(_arg) { :enqueue }) + + # See that it is now on the end of the queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [1]], ["ThrottledTestJob", [3]], + ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to be_empty + end + + it "accepts a Proc for :to argument" do + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to be_empty + + # Requeue the work + subject.requeue_throttled(work, with: :enqueue, to: ->(_arg) { :other_queue }) + + # See that it is now on the end of the queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to eq([["ThrottledTestJob", [1]]]) + end end describe "with parameter with: :schedule" do @@ -308,6 +333,29 @@ def scheduled_redis_item_and_score "queue" => "queue:other_queue") expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0) end + + it "accepts a Proc for :with argument" do + # Requeue the work, see that it ends up in 'schedule' + expect do + subject.requeue_throttled(work, with: ->(_arg) { :schedule }) + end.to change { Sidekiq.redis { |conn| conn.zcard("schedule") } }.by(1) + + item, score = scheduled_redis_item_and_score + expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], "queue" => "queue:default") + expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0) + end + + it "accepts a Proc for :to argument" do + # Requeue the work, see that it ends up in 'schedule' + expect do + subject.requeue_throttled(work, with: :schedule, to: ->(_arg) { :other_queue }) + end.to change { Sidekiq.redis { |conn| conn.zcard("schedule") } }.by(1) + + item, score = scheduled_redis_item_and_score + expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], + "queue" => "queue:other_queue") + expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0) + end end context "when concurrency constraints given" do