From b3265ea54ea906da8c1a9a882cdf9188c92dd121 Mon Sep 17 00:00:00 2001 From: Alexey Zapparov Date: Sat, 9 Dec 2023 05:40:16 +0100 Subject: [PATCH] refactor: Extract redis repository Leave Queues with single responsibility - track eventually-consistent list of paused queues. --- lib/sidekiq/pauzer.rb | 44 +++---- lib/sidekiq/pauzer/queues.rb | 91 ++++--------- lib/sidekiq/pauzer/repository.rb | 52 ++++++++ .../pauzer/patches/basic_fetch_spec.rb | 6 +- spec/lib/sidekiq/pauzer/patches/queue_spec.rb | 4 +- spec/lib/sidekiq/pauzer/queues_spec.rb | 123 +++++++----------- spec/lib/sidekiq/pauzer/repository_spec.rb | 87 +++++++++++++ spec/lib/sidekiq/pauzer/web_spec.rb | 16 +-- spec/lib/sidekiq/pauzer_spec.rb | 45 +++---- spec/support/sidekiq.rb | 8 +- 10 files changed, 273 insertions(+), 203 deletions(-) create mode 100644 lib/sidekiq/pauzer/repository.rb create mode 100644 spec/lib/sidekiq/pauzer/repository_spec.rb diff --git a/lib/sidekiq/pauzer.rb b/lib/sidekiq/pauzer.rb index edfe04c..7e32802 100644 --- a/lib/sidekiq/pauzer.rb +++ b/lib/sidekiq/pauzer.rb @@ -1,6 +1,5 @@ # frozen_string_literal: true -require "forwardable" require "sidekiq" require "sidekiq/api" @@ -8,45 +7,45 @@ require_relative "./pauzer/patches/basic_fetch" require_relative "./pauzer/patches/queue" require_relative "./pauzer/queues" +require_relative "./pauzer/repository" require_relative "./pauzer/version" begin + # :nocov: require "sidekiq-ent/version" + raise "sidekiq-pauzer is incompatible with Sidekiq Enterprise" + # :nocov: rescue LoadError # All good - no compatibility issues end begin + # :nocov: require "sidekiq/pro/version" + raise "sidekiq-pauzer is incompatible with Sidekiq Pro" + # :nocov: rescue LoadError # All good - no compatibility issues end -raise "sidekiq-pauzer is incompatible with Sidekiq Pro" if Sidekiq.pro? -raise "sidekiq-pauzer is incompatible with Sidekiq Enterprise" if Sidekiq.ent? - module Sidekiq module Pauzer - REDIS_KEY = "sidekiq-pauzer" - MUTEX = Mutex.new private_constant :MUTEX - @config = Config.new.freeze - @queues = Queues.new(@config) + @config = Config.new.freeze + @repository = Repository.new + @queues = Queues.new(@config.refresh_rate, repository: @repository) class << self - extend Forwardable - # @example # Sidekiq::Pauzer.pause!("minor") # Sidekiq::Pauzer.paused?("minor") # => true # - # @param (see Queues#pause!) + # @param (see Repository#add) # @return [void] - def pause!(name) - @queues.pause!(name) - + def pause!(queue_name) + @repository.add(queue_name) nil end @@ -56,11 +55,10 @@ def pause!(name) # Sidekiq::Pauzer.unpause!("minor") # Sidekiq::Pauzer.paused?("minor") # => false # - # @param (see Queues#unpause!) + # @param (see Repository#delete) # @return [void] - def unpause!(name) - @queues.unpause!(name) - + def unpause!(queue_name) + @repository.delete(queue_name) nil end @@ -69,11 +67,13 @@ def unpause!(name) # Sidekiq::Pauzer.paused?("minor") # => true # Sidekiq::Pauzer.paused?("threat") # => false # - # @see Queues#paused? - def paused?(name) - @queues.paused?(name) + # @return (see Repository#include?) + def paused?(queue_name) + @repository.include?(queue_name) end + # Eventually consistent list of paused queues. + # # @example # Sidekiq::Pauzer.pause!("minor") # Sidekiq::Pauzer.paused_queues # => ["minor"] @@ -121,7 +121,7 @@ def shutdown def reinit_queues @queues.stop_refresher - @queues = Queues.new(@config) + @queues = Queues.new(@config.refresh_rate, repository: @repository) end end end diff --git a/lib/sidekiq/pauzer/queues.rb b/lib/sidekiq/pauzer/queues.rb index ed72fe6..49863a1 100644 --- a/lib/sidekiq/pauzer/queues.rb +++ b/lib/sidekiq/pauzer/queues.rb @@ -6,93 +6,54 @@ module Sidekiq module Pauzer # @api internal + # Eventually consistent list of paused queues. Used by Sidekiq fetchers to + # avoid hitting Redis on every fetch call. class Queues - extend Forwardable include Enumerable - # @!attribute [r] refresh_rate - # @see (Config#refresh_rate) - def_delegators :@config, :refresh_rate - - # @param config [Config] - def initialize(config) - @config = config - @names = [].freeze - @names_mutex = Mutex.new - @refresher = nil - @refresher_mutex = Mutex.new + # @param refresh_rate [Float] + # @param repository [Repository] + def initialize(refresh_rate, repository:) + @names = [].freeze + @refresher = Concurrent::TimerTask.new(execution_interval: refresh_rate, run_now: true) do + @names = repository.to_a.freeze + end end + # @overload each + # @return [Enumerator] + # + # @overload each(&block) + # For a block { |queue_name| ... } + # @yieldparam queue_name [String] + # @return [self] def each(&block) return to_enum __method__ unless block start_refresher unless refresher_running? - @names_mutex.synchronize { @names.dup }.each(&block) - - self - end - - # @param name [#to_s] - # @return [Queues] self - def pause!(name) - redis_call("SADD", Pauzer::REDIS_KEY, name.to_s) - refresh - self - end + @names.each(&block) - # @param name [#to_s] - # @return [Queues] self - def unpause!(name) - redis_call("SREM", Pauzer::REDIS_KEY, name.to_s) - refresh self end - # @param name [#to_s] - # @return [Boolean] - def paused?(name) - include?(name.to_s) - end - + # Starts paused queues list async poller. + # + # @return [self] def start_refresher - @refresher_mutex.synchronize do - @refresher&.shutdown - @refresher = Concurrent::TimerTask.execute(execution_interval: refresh_rate, run_now: true) { refresh } - end - + @refresher.execute self end + # Stops paused queues list async poller. + # + # @return [self] def stop_refresher - @refresher_mutex.synchronize do - @refresher&.shutdown - @refresher = nil - end - + @refresher.shutdown self end def refresher_running? - @refresher_mutex.synchronize do - @refresher&.running? || false - end - end - - private - - # @return [nil] - def refresh - names = redis_call("SMEMBERS", Pauzer::REDIS_KEY).to_a - - @names_mutex.synchronize do - @names = names.each(&:freeze).freeze - end - - nil - end - - def redis_call(...) - Sidekiq.redis { |conn| conn.call(...) } + @refresher.running? end end end diff --git a/lib/sidekiq/pauzer/repository.rb b/lib/sidekiq/pauzer/repository.rb new file mode 100644 index 0000000..1d9f6e4 --- /dev/null +++ b/lib/sidekiq/pauzer/repository.rb @@ -0,0 +1,52 @@ +# frozen_string_literal: true + +module Sidekiq + module Pauzer + class Repository + include Enumerable + + REDIS_KEY = "sidekiq-pauzer" + + # @overload each + # @return [Enumerator] + # + # @overload each(&block) + # For a block { |queue_name| ... } + # @yieldparam queue_name [String] + # @return [self] + def each + return to_enum __method__ unless block_given? + + redis_call("SMEMBERS", REDIS_KEY).each { yield _1.freeze } + + self + end + + # @param queue_name [#to_s] + # @return [void] + def add(queue_name) + redis_call("SADD", REDIS_KEY, queue_name.to_s) + nil + end + + # @param queue_name [#to_s] + # @return [void] + def delete(queue_name) + redis_call("SREM", REDIS_KEY, queue_name.to_s) + nil + end + + # @param name [#to_s] + # @return [void] + def include?(queue_name) + redis_call("SISMEMBER", REDIS_KEY, queue_name.to_s).positive? + end + + private + + def redis_call(...) + Sidekiq.redis { _1.call(...) } + end + end + end +end diff --git a/spec/lib/sidekiq/pauzer/patches/basic_fetch_spec.rb b/spec/lib/sidekiq/pauzer/patches/basic_fetch_spec.rb index dada74c..9a80297 100644 --- a/spec/lib/sidekiq/pauzer/patches/basic_fetch_spec.rb +++ b/spec/lib/sidekiq/pauzer/patches/basic_fetch_spec.rb @@ -10,7 +10,11 @@ let(:queues) { ["foo,1", "bar,10", "baz,100"] } before do - Sidekiq::Pauzer.pause!(:foo) + Sidekiq::Pauzer.pause!("foo") + Sidekiq::Pauzer.configure { |c| c.refresh_rate = 0.1 } + Sidekiq::Pauzer.startup + sleep 0.1 + stub_const("Sidekiq::BasicFetch::TIMEOUT", 0.1) end diff --git a/spec/lib/sidekiq/pauzer/patches/queue_spec.rb b/spec/lib/sidekiq/pauzer/patches/queue_spec.rb index d2c4e8e..4c1a523 100644 --- a/spec/lib/sidekiq/pauzer/patches/queue_spec.rb +++ b/spec/lib/sidekiq/pauzer/patches/queue_spec.rb @@ -40,7 +40,7 @@ Sidekiq::Pauzer.unpause!("foo") expect { subject } - .to change { redis_smembers(Sidekiq::Pauzer::REDIS_KEY) }.to(contain_exactly("foo")) + .to change { redis_smembers }.to(contain_exactly("foo")) .and change { Sidekiq::Pauzer.paused? "foo" }.to(true) end end @@ -60,7 +60,7 @@ Sidekiq::Pauzer.pause!("foo") expect { subject } - .to change { redis_smembers(Sidekiq::Pauzer::REDIS_KEY) }.to(be_empty) + .to change { redis_smembers }.to(be_empty) .and change { Sidekiq::Pauzer.paused? "foo" }.to(false) end end diff --git a/spec/lib/sidekiq/pauzer/queues_spec.rb b/spec/lib/sidekiq/pauzer/queues_spec.rb index 4d3eb3e..125bbdc 100644 --- a/spec/lib/sidekiq/pauzer/queues_spec.rb +++ b/spec/lib/sidekiq/pauzer/queues_spec.rb @@ -1,108 +1,83 @@ # frozen_string_literal: true RSpec.describe Sidekiq::Pauzer::Queues do - subject(:queues) { described_class.new(config) } + subject(:queues) { described_class.new(refresh_rate, repository: repository) } - let(:config) { Sidekiq::Pauzer::Config.new } + let(:refresh_rate) { 0.1 } + let(:repository) { Sidekiq::Pauzer::Repository.new } after { queues.stop_refresher } it { is_expected.to be_an Enumerable } - describe "#each" do - before do - queues.pause! "foo" - queues.pause! "bar" - end + it "polls repository regularily" do + allow(repository).to receive(:to_a).and_call_original - context "with block given" do - subject { queues.each { |q| yielded_results << q } } + queues.start_refresher + sleep(4 * refresh_rate) - let(:yielded_results) { [] } + expect(repository).to have_received(:to_a).at_least(4).times + expect(repository).to have_received(:to_a).at_most(5).times + end - it "yields each paused queue" do - expect { subject }.to change { yielded_results }.to(match_array(%w[foo bar])) - end + context "when repository poll fails" do + before do + attempt = 0 - it { is_expected.to be queues } - end + allow(repository).to receive(:to_a).and_wrap_original do |m| + attempt += 1 - context "without block given" do - subject { queues.each } + if attempt <= 4 + raise "nope" if attempt.odd? - it { is_expected.to be_an Enumerator } + repository.add("q#{attempt}") + end - it { is_expected.to match_array %w[foo bar] } - end - end + m.call + end - describe "#pause!" do - it "adds queue to the paused list" do - expect { %w[foo bar].each { |q| queues.pause!(q) } } - .to change { redis_smembers(Sidekiq::Pauzer::REDIS_KEY) }.to(match_array(%w[foo bar])) - .and change(queues, :to_a).to(match_array(%w[foo bar])) + queues.start_refresher + sleep(4 * refresh_rate) end - it "support queue name given as Symbol" do - expect { %i[foo bar].each { |q| queues.pause!(q) } } - .to change { redis_smembers(Sidekiq::Pauzer::REDIS_KEY) }.to(match_array(%w[foo bar])) - .and change(queues, :to_a).to(match_array(%w[foo bar])) + it "keeps refresher runnning" do + expect(queues.refresher_running?).to be true end - it "avoids duplicates" do - queues.pause! "foo" - - expect { %w[foo bar].each { |q| queues.pause!(q) } } - .to change { redis_smembers(Sidekiq::Pauzer::REDIS_KEY) }.to(match_array(%w[foo bar])) - .and change(queues, :to_a).to(match_array(%w[foo bar])) + it "keeps updating the local cache" do + expect(queues.to_a).to contain_exactly("q2", "q4") end end - describe "#unpause!" do - before do - queues.pause! "foo" - queues.pause! "bar" - end + describe "#each" do + subject { queues.each { |q| yielded_results << q } } - it "removes queue from the paused list" do - expect { queues.unpause!("foo") } - .to change { redis_smembers(Sidekiq::Pauzer::REDIS_KEY) }.to(contain_exactly("bar")) - .and change(queues, :to_a).to(contain_exactly("bar")) - end + let(:yielded_results) { [] } - it "support queue name given as Symbol" do - expect { queues.unpause!(:foo) } - .to change { redis_smembers(Sidekiq::Pauzer::REDIS_KEY) }.to(contain_exactly("bar")) - .and change(queues, :to_a).to(contain_exactly("bar")) + before do + repository.add("foo") + repository.add("bar") end - it "skips non-paused queues" do - expect { queues.unpause!("baz") } - .to keep_unchanged { redis_smembers(Sidekiq::Pauzer::REDIS_KEY) } - .and keep_unchanged(queues, :to_a) - end - end + it { is_expected.to be queues } - describe "#paused?" do - context "when queue is not paused" do - it "returns ‹false›" do - expect(queues.paused?("foo")).to be false - end + it "yields each paused queue" do + queues.start_refresher + sleep refresh_rate - it "support queue name given as Symbol" do - expect(queues.paused?(:foo)).to be false - end + expect { subject }.to change { yielded_results }.to(match_array(%w[foo bar])) end - context "when queue is paused" do - before { queues.pause!("foo") } + context "without block given" do + subject { queues.each } - it "returns ‹true›" do - expect(queues.paused?("foo")).to be true - end + it { is_expected.to be_an Enumerator } + + it "returns each paused queue" do + queues.start_refresher + sleep refresh_rate - it "support queue name given as Symbol" do - expect(queues.paused?(:foo)).to be true + expect(subject).to contain_exactly("foo", "bar") end end end @@ -114,9 +89,9 @@ end describe "#stop_refresher" do - it "stops asynchronous refresher" do - queues.start_refresher + before { queues.start_refresher } + it "stops asynchronous refresher" do expect { queues.stop_refresher }.to change(queues, :refresher_running?).to(false) end end @@ -124,8 +99,6 @@ describe "#refresher_running?" do subject { queues.refresher_running? } - after { queues.stop_refresher } - it { is_expected.to be false } context "when refresher was stopped" do diff --git a/spec/lib/sidekiq/pauzer/repository_spec.rb b/spec/lib/sidekiq/pauzer/repository_spec.rb new file mode 100644 index 0000000..93eb942 --- /dev/null +++ b/spec/lib/sidekiq/pauzer/repository_spec.rb @@ -0,0 +1,87 @@ +# frozen_string_literal: true + +RSpec.describe Sidekiq::Pauzer::Repository do + subject(:repository) { described_class.new } + + it { is_expected.to be_an Enumerable } + + describe "#each" do + subject { repository.each { |inhibitor| yielded_results << inhibitor } } + + let(:yielded_results) { [] } + + before do + redis_sadd("a") + redis_sadd("b") + redis_sadd("c") + end + + it { is_expected.to be repository } + + it "yields each paused queue" do + expect { subject }.to(change { yielded_results }.to(contain_exactly("a", "b", "c"))) + end + + context "without block given" do + subject { repository.each } + + it { is_expected.to be_an Enumerator } + + it "returns each valid inhibitor" do + expect(subject).to contain_exactly("a", "b", "c") + end + end + end + + describe "#add" do + before do + redis_sadd("a") + redis_sadd("b") + end + + it "adds paused queue to redis" do + expect { repository.add("c") }.to( + change { redis_smembers }.to(contain_exactly("a", "b", "c")) + ) + end + + context "when given queue is already in the paused list" do + it "does nothing" do + expect { repository.add("b") }.to(keep_unchanged { redis_smembers }) + end + end + end + + describe "#delete" do + subject { repository.delete("b") } + + before do + redis_sadd("a") + redis_sadd("b") + end + + it "removes paused queue" do + expect { subject }.to(change { redis_smembers }.to(contain_exactly("a"))) + end + + context "when queue is not paused" do + subject { repository.delete("deadbeef") } + + it "does nothing" do + expect { subject }.to(keep_unchanged { redis_smembers }) + end + end + end + + describe "#include?" do + subject { repository.include?("a") } + + it { is_expected.to be false } + + context "when given queue name is in the paused queues list" do + before { redis_sadd("a") } + + it { is_expected.to be true } + end + end +end diff --git a/spec/lib/sidekiq/pauzer/web_spec.rb b/spec/lib/sidekiq/pauzer/web_spec.rb index 18f9bf1..43a1021 100644 --- a/spec/lib/sidekiq/pauzer/web_spec.rb +++ b/spec/lib/sidekiq/pauzer/web_spec.rb @@ -36,13 +36,13 @@ def csrf_token it "allows pausing queues" do post "/queues/foo", "pause" => "1", "authenticity_token" => csrf_token expect(last_response.status).to eq 302 - expect(redis_smembers(Sidekiq::Pauzer::REDIS_KEY)).to contain_exactly("foo") - expect(Sidekiq::Pauzer.paused_queues).to contain_exactly("foo") + expect(Sidekiq::Pauzer.paused?("foo")).to be true + expect(Sidekiq::Pauzer.paused?("bar")).to be false post "/queues/bar", "pause" => "1", "authenticity_token" => csrf_token expect(last_response.status).to eq 302 - expect(redis_smembers(Sidekiq::Pauzer::REDIS_KEY)).to contain_exactly("foo", "bar") - expect(Sidekiq::Pauzer.paused_queues).to contain_exactly("foo", "bar") + expect(Sidekiq::Pauzer.paused?("foo")).to be true + expect(Sidekiq::Pauzer.paused?("bar")).to be true end it "allows unpausing queues" do @@ -51,13 +51,13 @@ def csrf_token post "/queues/foo", "unpause" => "1", "authenticity_token" => csrf_token expect(last_response.status).to eq 302 - expect(redis_smembers(Sidekiq::Pauzer::REDIS_KEY)).to contain_exactly("bar") - expect(Sidekiq::Pauzer.paused_queues).to contain_exactly("bar") + expect(Sidekiq::Pauzer.paused?("foo")).to be false + expect(Sidekiq::Pauzer.paused?("bar")).to be true post "/queues/bar", "unpause" => "1", "authenticity_token" => csrf_token expect(last_response.status).to eq 302 - expect(redis_smembers(Sidekiq::Pauzer::REDIS_KEY)).to be_empty - expect(Sidekiq::Pauzer.paused_queues).to be_empty + expect(Sidekiq::Pauzer.paused?("foo")).to be false + expect(Sidekiq::Pauzer.paused?("bar")).to be false end it "allows clearing the queue" do diff --git a/spec/lib/sidekiq/pauzer_spec.rb b/spec/lib/sidekiq/pauzer_spec.rb index c5ce93c..6eb7e1b 100644 --- a/spec/lib/sidekiq/pauzer_spec.rb +++ b/spec/lib/sidekiq/pauzer_spec.rb @@ -20,22 +20,12 @@ describe ".pause!" do it "adds queue to the paused list" do expect { %w[foo bar].each { |q| described_class.pause!(q) } } - .to change { redis_smembers(described_class::REDIS_KEY) }.to(match_array(%w[foo bar])) - .and change(described_class, :paused_queues).to(match_array(%w[foo bar])) - end - - it "support queue name given as Symbol" do - expect { %i[foo bar].each { |q| described_class.pause!(q) } } - .to change { redis_smembers(described_class::REDIS_KEY) }.to(match_array(%w[foo bar])) - .and change(described_class, :paused_queues).to(match_array(%w[foo bar])) + .to change { redis_smembers }.to(match_array(%w[foo bar])) end it "avoids duplicates" do - described_class.pause!("foo") - - expect { %w[foo bar].each { |q| described_class.pause!(q) } } - .to change { redis_smembers(described_class::REDIS_KEY) }.to(match_array(%w[foo bar])) - .and change(described_class, :paused_queues).to(match_array(%w[foo bar])) + expect { %w[foo bar foo].each { |q| described_class.pause!(q) } } + .to change { redis_smembers }.to(match_array(%w[foo bar])) end end @@ -47,20 +37,12 @@ it "removes queue from the paused list" do expect { described_class.unpause!("foo") } - .to change { redis_smembers(described_class::REDIS_KEY) }.to(contain_exactly("bar")) - .and change(described_class, :paused_queues).to(contain_exactly("bar")) - end - - it "support queue name given as Symbol" do - expect { described_class.unpause!(:foo) } - .to change { redis_smembers(described_class::REDIS_KEY) }.to(contain_exactly("bar")) - .and change(described_class, :paused_queues).to(contain_exactly("bar")) + .to change { redis_smembers }.to(contain_exactly("bar")) end it "skips non-paused queues" do expect { described_class.unpause!("baz") } - .to keep_unchanged { redis_smembers(described_class::REDIS_KEY) } - .and keep_unchanged(described_class, :paused_queues) + .to(keep_unchanged { redis_smembers }) end end @@ -89,9 +71,20 @@ end describe ".paused_queues" do - it "returns list of paused queue names" do - expect { %w[foo bar].each { |q| described_class.pause!(q) } } - .to change(described_class, :paused_queues).to(match_array(%w[foo bar])) + before do + described_class.configure { |c| c.refresh_rate = 0.1 } + end + + it "returns eventually consistent list of paused queue names" do + described_class.pause! "foo" + described_class.pause! "bar" + + expect(described_class.paused_queues).to be_empty + + described_class.startup + sleep 0.1 + + expect(described_class.paused_queues).to contain_exactly("foo", "bar") end end diff --git a/spec/support/sidekiq.rb b/spec/support/sidekiq.rb index ad893a7..2df3b0b 100644 --- a/spec/support/sidekiq.rb +++ b/spec/support/sidekiq.rb @@ -16,12 +16,12 @@ def sidekiq_fire_event(...) Sidekiq.default_configuration.default_capsule.fire_event(...) end - def redis_smembers(key) - Sidekiq.redis { |conn| conn.call("SMEMBERS", key) } + def redis_smembers + Sidekiq.redis { |conn| conn.call("SMEMBERS", "sidekiq-pauzer") } end - def redis_sadd(key, value) - Sidekiq.redis { |conn| conn.call("SADD", key, value) } + def redis_sadd(value) + Sidekiq.redis { |conn| conn.call("SADD", "sidekiq-pauzer", value) } end end