From fce05c9d4b4c0411c982078a4cf3a63f20f739bc Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Fri, 19 Jun 2020 08:39:18 -0700 Subject: [PATCH] Fetch API refactor, WIP (#4602) * Fetch API refactor, WIP * save options for later * changes * Fix test failures --- Changes.md | 1 + Pro-Changes.md | 1 + lib/sidekiq/fetch.rb | 38 ++++++++++++++++++++------------------ lib/sidekiq/launcher.rb | 3 ++- lib/sidekiq/manager.rb | 6 +++--- lib/sidekiq/processor.rb | 8 ++++---- test/test_actors.rb | 13 ++++++++----- test/test_cli.rb | 1 + test/test_fetch.rb | 2 +- test/test_manager.rb | 2 +- test/test_middleware.rb | 6 ++---- test/test_processor.rb | 22 ++++++++-------------- 12 files changed, 52 insertions(+), 51 deletions(-) diff --git a/Changes.md b/Changes.md index 9bb01b614..ecb152253 100644 --- a/Changes.md +++ b/Changes.md @@ -12,6 +12,7 @@ HEAD - Remove rack-protection, reimplement CSRF protection [#4588] - Require redis-rb 4.2 [#4591] - Update to jquery 1.12.4 [#4593] +- Refactor internal fetch logic and API [#4602] 6.0.7 --------- diff --git a/Pro-Changes.md b/Pro-Changes.md index c6e0759b2..335e3b64f 100644 --- a/Pro-Changes.md +++ b/Pro-Changes.md @@ -10,6 +10,7 @@ HEAD - Remove `concurrent-ruby` gem dependency [#4586] - Update `constantize` for batch callbacks. [#4469] - Add queue tag to `jobs.recovered.fetch` metric [#4594] +- Refactor Pro's fetch infrastructure [#4602] 5.0.1 --------- diff --git a/lib/sidekiq/fetch.rb b/lib/sidekiq/fetch.rb index 8ce9970d6..4f4764131 100644 --- a/lib/sidekiq/fetch.rb +++ b/lib/sidekiq/fetch.rb @@ -25,8 +25,10 @@ def requeue } def initialize(options) - @strictly_ordered_queues = !!options[:strict] - @queues = options[:queues].map { |q| "queue:#{q}" } + raise ArgumentError, "missing queue list" unless options[:queues] + @options = options + @strictly_ordered_queues = !!@options[:strict] + @queues = @options[:queues].map { |q| "queue:#{q}" } if @strictly_ordered_queues @queues.uniq! @queues << TIMEOUT @@ -38,24 +40,9 @@ def retrieve_work UnitOfWork.new(*work) if work end - # Creating the Redis#brpop command takes into account any - # configured queue weights. By default Redis#brpop returns - # data from the first queue that has pending elements. We - # recreate the queue command each time we invoke Redis#brpop - # to honor weights and avoid queue starvation. - def queues_cmd - if @strictly_ordered_queues - @queues - else - queues = @queues.shuffle!.uniq - queues << TIMEOUT - queues - end - end - # By leaving this as a class method, it can be pluggable and used by the Manager actor. Making it # an instance method will make it async to the Fetcher actor - def self.bulk_requeue(inprogress, options) + def bulk_requeue(inprogress, options) return if inprogress.empty? Sidekiq.logger.debug { "Re-queueing terminated jobs" } @@ -76,5 +63,20 @@ def self.bulk_requeue(inprogress, options) rescue => ex Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{ex.message}") end + + # Creating the Redis#brpop command takes into account any + # configured queue weights. By default Redis#brpop returns + # data from the first queue that has pending elements. We + # recreate the queue command each time we invoke Redis#brpop + # to honor weights and avoid queue starvation. + def queues_cmd + if @strictly_ordered_queues + @queues + else + queues = @queues.shuffle!.uniq + queues << TIMEOUT + queues + end + end end end diff --git a/lib/sidekiq/launcher.rb b/lib/sidekiq/launcher.rb index e29d25797..644c16f68 100644 --- a/lib/sidekiq/launcher.rb +++ b/lib/sidekiq/launcher.rb @@ -22,6 +22,7 @@ class Launcher attr_accessor :manager, :poller, :fetcher def initialize(options) + options[:fetch] ||= BasicFetch.new(options) @manager = Sidekiq::Manager.new(options) @poller = Sidekiq::Scheduled::Poller.new @done = false @@ -56,7 +57,7 @@ def stop # Requeue everything in case there was a worker who grabbed work while stopped # This call is a no-op in Sidekiq but necessary for Sidekiq Pro. - strategy = (@options[:fetch] || Sidekiq::BasicFetch) + strategy = @options[:fetch] strategy.bulk_requeue([], @options) clear_heartbeat diff --git a/lib/sidekiq/manager.rb b/lib/sidekiq/manager.rb index 27e3d6e65..f059bcef9 100644 --- a/lib/sidekiq/manager.rb +++ b/lib/sidekiq/manager.rb @@ -35,7 +35,7 @@ def initialize(options = {}) @done = false @workers = Set.new @count.times do - @workers << Processor.new(self) + @workers << Processor.new(self, options) end @plock = Mutex.new end @@ -90,7 +90,7 @@ def processor_died(processor, reason) @plock.synchronize do @workers.delete(processor) unless @done - p = Processor.new(self) + p = Processor.new(self, options) @workers << p p.start end @@ -123,7 +123,7 @@ def hard_shutdown # contract says that jobs are run AT LEAST once. Process termination # is delayed until we're certain the jobs are back in Redis because # it is worse to lose a job than to run it twice. - strategy = (@options[:fetch] || Sidekiq::BasicFetch) + strategy = @options[:fetch] strategy.bulk_requeue(jobs, @options) end diff --git a/lib/sidekiq/processor.rb b/lib/sidekiq/processor.rb index 2ae5c2c05..39d3752c6 100644 --- a/lib/sidekiq/processor.rb +++ b/lib/sidekiq/processor.rb @@ -28,15 +28,15 @@ class Processor attr_reader :thread attr_reader :job - def initialize(mgr) + def initialize(mgr, options) @mgr = mgr @down = false @done = false @job = nil @thread = nil - @strategy = (mgr.options[:fetch] || Sidekiq::BasicFetch).new(mgr.options) - @reloader = Sidekiq.options[:reloader] - @job_logger = (mgr.options[:job_logger] || Sidekiq::JobLogger).new + @strategy = options[:fetch] + @reloader = options[:reloader] || proc { |&block| block.call } + @job_logger = (options[:job_logger] || Sidekiq::JobLogger).new @retrier = Sidekiq::JobRetry.new end diff --git a/test/test_actors.rb b/test/test_actors.rb index 739397a94..63cbfac42 100644 --- a/test/test_actors.rb +++ b/test/test_actors.rb @@ -50,7 +50,8 @@ def perform(slp) end it 'can start and stop' do - f = Sidekiq::Processor.new(Mgr.new) + m = Mgr.new + f = Sidekiq::Processor.new(m, m.options) f.terminate end @@ -74,14 +75,16 @@ def processor_stopped(inst) end end def options - { :concurrency => 3, :queues => ['default'] } + opts = { :concurrency => 3, :queues => ['default'] } + opts[:fetch] = Sidekiq::BasicFetch.new(opts) + opts end end it 'can process' do mgr = Mgr.new - p = Sidekiq::Processor.new(mgr) + p = Sidekiq::Processor.new(mgr, mgr.options) JoeWorker.perform_async(0) a = $count @@ -93,7 +96,7 @@ def options it 'deals with errors' do mgr = Mgr.new - p = Sidekiq::Processor.new(mgr) + p = Sidekiq::Processor.new(mgr, mgr.options) JoeWorker.perform_async("boom") q = Sidekiq::Queue.new assert_equal 1, q.size @@ -116,7 +119,7 @@ def options it 'gracefully kills' do mgr = Mgr.new - p = Sidekiq::Processor.new(mgr) + p = Sidekiq::Processor.new(mgr, mgr.options) JoeWorker.perform_async(1) q = Sidekiq::Queue.new assert_equal 1, q.size diff --git a/test/test_cli.rb b/test/test_cli.rb index 74726393b..832ab4fe0 100644 --- a/test/test_cli.rb +++ b/test/test_cli.rb @@ -419,6 +419,7 @@ def logdev describe '#run' do before do + Sidekiq.options[:concurrency] = 2 Sidekiq.options[:require] = './test/fake_env.rb' end diff --git a/test/test_fetch.rb b/test/test_fetch.rb index 449d801dc..70d776f37 100644 --- a/test/test_fetch.rb +++ b/test/test_fetch.rb @@ -52,7 +52,7 @@ assert_equal 0, q1.size assert_equal 0, q2.size - Sidekiq::BasicFetch.bulk_requeue(works, {:queues => []}) + fetch.bulk_requeue(works, {:queues => []}) assert_equal 2, q1.size assert_equal 1, q2.size end diff --git a/test/test_manager.rb b/test/test_manager.rb index b3870c084..0d165985c 100644 --- a/test/test_manager.rb +++ b/test/test_manager.rb @@ -8,7 +8,7 @@ end def new_manager(opts) - Sidekiq::Manager.new(opts) + Sidekiq::Manager.new(opts.merge(fetch: Sidekiq::BasicFetch.new(opts))) end it 'creates N processor instances' do diff --git a/test/test_middleware.rb b/test/test_middleware.rb index 90e1044db..b9847f0ea 100644 --- a/test/test_middleware.rb +++ b/test/test_middleware.rb @@ -78,10 +78,8 @@ def call(*args) end boss = Minitest::Mock.new - boss.expect(:options, {:queues => ['default'] }, []) - boss.expect(:options, {:queues => ['default'] }, []) - boss.expect(:options, {:queues => ['default'] }, []) - processor = Sidekiq::Processor.new(boss) + opts = {:queues => ['default'] } + processor = Sidekiq::Processor.new(boss, opts) boss.expect(:processor_done, nil, [processor]) processor.process(Sidekiq::BasicFetch::UnitOfWork.new('queue:default', msg)) assert_equal %w(2 before 3 before 1 before work_performed 1 after 3 after 2 after), $recorder.flatten diff --git a/test/test_processor.rb b/test/test_processor.rb index 63b0291d5..ae1fcc4d7 100644 --- a/test/test_processor.rb +++ b/test/test_processor.rb @@ -11,10 +11,9 @@ before do $invokes = 0 @mgr = Minitest::Mock.new - @mgr.expect(:options, {:queues => ['default']}) - @mgr.expect(:options, {:queues => ['default']}) - @mgr.expect(:options, {:queues => ['default']}) - @processor = ::Sidekiq::Processor.new(@mgr) + opts = {:queues => ['default']} + opts[:fetch] = Sidekiq::BasicFetch.new(opts) + @processor = ::Sidekiq::Processor.new(@mgr, opts) end class MockWorker @@ -327,11 +326,9 @@ def call(item, queue) end before do + opts = {:queues => ['default'], job_logger: CustomJobLogger} @mgr = Minitest::Mock.new - @mgr.expect(:options, {:queues => ['default'], job_logger: CustomJobLogger}) - @mgr.expect(:options, {:queues => ['default'], job_logger: CustomJobLogger}) - @mgr.expect(:options, {:queues => ['default'], job_logger: CustomJobLogger}) - @processor = ::Sidekiq::Processor.new(@mgr) + @processor = ::Sidekiq::Processor.new(@mgr, opts) end end end @@ -356,18 +353,15 @@ def successful_job describe 'custom job logger class' do before do - @mgr = Minitest::Mock.new - @mgr.expect(:options, {:queues => ['default'], :job_logger => CustomJobLogger}) - @mgr.expect(:options, {:queues => ['default'], :job_logger => CustomJobLogger}) - @mgr.expect(:options, {:queues => ['default'], :job_logger => CustomJobLogger}) - @processor = ::Sidekiq::Processor.new(@mgr) + opts = {:queues => ['default'], :job_logger => CustomJobLogger} + opts[:fetch] = Sidekiq::BasicFetch.new(opts) + @processor = ::Sidekiq::Processor.new(nil, opts) end it 'is called instead default Sidekiq::JobLogger' do msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['myarg'] }) @processor.process(work(msg)) assert_equal 1, $invokes - @mgr.verify end end end