Skip to content

Commit

Permalink
refactor: Extract redis repository
Browse files Browse the repository at this point in the history
Leave Queues with single responsibility - track eventually-consistent
list of paused queues.
  • Loading branch information
ixti committed Dec 10, 2023
1 parent de5afae commit b3265ea
Show file tree
Hide file tree
Showing 10 changed files with 273 additions and 203 deletions.
44 changes: 22 additions & 22 deletions lib/sidekiq/pauzer.rb
Original file line number Diff line number Diff line change
@@ -1,52 +1,51 @@
# frozen_string_literal: true

require "forwardable"
require "sidekiq"
require "sidekiq/api"

require_relative "./pauzer/config"
require_relative "./pauzer/patches/basic_fetch"
require_relative "./pauzer/patches/queue"
require_relative "./pauzer/queues"
require_relative "./pauzer/repository"
require_relative "./pauzer/version"

begin
# :nocov:
require "sidekiq-ent/version"
raise "sidekiq-pauzer is incompatible with Sidekiq Enterprise"
# :nocov:
rescue LoadError
# All good - no compatibility issues
end

begin
# :nocov:
require "sidekiq/pro/version"
raise "sidekiq-pauzer is incompatible with Sidekiq Pro"
# :nocov:
rescue LoadError
# All good - no compatibility issues
end

raise "sidekiq-pauzer is incompatible with Sidekiq Pro" if Sidekiq.pro?
raise "sidekiq-pauzer is incompatible with Sidekiq Enterprise" if Sidekiq.ent?

module Sidekiq
module Pauzer
REDIS_KEY = "sidekiq-pauzer"

MUTEX = Mutex.new
private_constant :MUTEX

@config = Config.new.freeze
@queues = Queues.new(@config)
@config = Config.new.freeze
@repository = Repository.new
@queues = Queues.new(@config.refresh_rate, repository: @repository)

class << self
extend Forwardable

# @example
# Sidekiq::Pauzer.pause!("minor")
# Sidekiq::Pauzer.paused?("minor") # => true
#
# @param (see Queues#pause!)
# @param (see Repository#add)
# @return [void]
def pause!(name)
@queues.pause!(name)

def pause!(queue_name)
@repository.add(queue_name)
nil
end

Expand All @@ -56,11 +55,10 @@ def pause!(name)
# Sidekiq::Pauzer.unpause!("minor")
# Sidekiq::Pauzer.paused?("minor") # => false
#
# @param (see Queues#unpause!)
# @param (see Repository#delete)
# @return [void]
def unpause!(name)
@queues.unpause!(name)

def unpause!(queue_name)
@repository.delete(queue_name)
nil
end

Expand All @@ -69,11 +67,13 @@ def unpause!(name)
# Sidekiq::Pauzer.paused?("minor") # => true
# Sidekiq::Pauzer.paused?("threat") # => false
#
# @see Queues#paused?
def paused?(name)
@queues.paused?(name)
# @return (see Repository#include?)
def paused?(queue_name)
@repository.include?(queue_name)
end

# Eventually consistent list of paused queues.
#
# @example
# Sidekiq::Pauzer.pause!("minor")
# Sidekiq::Pauzer.paused_queues # => ["minor"]
Expand Down Expand Up @@ -121,7 +121,7 @@ def shutdown

def reinit_queues
@queues.stop_refresher
@queues = Queues.new(@config)
@queues = Queues.new(@config.refresh_rate, repository: @repository)
end
end
end
Expand Down
91 changes: 26 additions & 65 deletions lib/sidekiq/pauzer/queues.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,93 +6,54 @@
module Sidekiq
module Pauzer
# @api internal
# Eventually consistent list of paused queues. Used by Sidekiq fetchers to
# avoid hitting Redis on every fetch call.
class Queues
extend Forwardable
include Enumerable

# @!attribute [r] refresh_rate
# @see (Config#refresh_rate)
def_delegators :@config, :refresh_rate

# @param config [Config]
def initialize(config)
@config = config
@names = [].freeze
@names_mutex = Mutex.new
@refresher = nil
@refresher_mutex = Mutex.new
# @param refresh_rate [Float]
# @param repository [Repository]
def initialize(refresh_rate, repository:)
@names = [].freeze
@refresher = Concurrent::TimerTask.new(execution_interval: refresh_rate, run_now: true) do
@names = repository.to_a.freeze
end
end

# @overload each
# @return [Enumerator<String>]
#
# @overload each(&block)
# For a block { |queue_name| ... }
# @yieldparam queue_name [String]
# @return [self]
def each(&block)
return to_enum __method__ unless block

start_refresher unless refresher_running?
@names_mutex.synchronize { @names.dup }.each(&block)

self
end

# @param name [#to_s]
# @return [Queues] self
def pause!(name)
redis_call("SADD", Pauzer::REDIS_KEY, name.to_s)
refresh
self
end
@names.each(&block)

# @param name [#to_s]
# @return [Queues] self
def unpause!(name)
redis_call("SREM", Pauzer::REDIS_KEY, name.to_s)
refresh
self
end

# @param name [#to_s]
# @return [Boolean]
def paused?(name)
include?(name.to_s)
end

# Starts paused queues list async poller.
#
# @return [self]
def start_refresher
@refresher_mutex.synchronize do
@refresher&.shutdown
@refresher = Concurrent::TimerTask.execute(execution_interval: refresh_rate, run_now: true) { refresh }
end

@refresher.execute
self
end

# Stops paused queues list async poller.
#
# @return [self]
def stop_refresher
@refresher_mutex.synchronize do
@refresher&.shutdown
@refresher = nil
end

@refresher.shutdown
self
end

def refresher_running?
@refresher_mutex.synchronize do
@refresher&.running? || false
end
end

private

# @return [nil]
def refresh
names = redis_call("SMEMBERS", Pauzer::REDIS_KEY).to_a

@names_mutex.synchronize do
@names = names.each(&:freeze).freeze
end

nil
end

def redis_call(...)
Sidekiq.redis { |conn| conn.call(...) }
@refresher.running?
end
end
end
Expand Down
52 changes: 52 additions & 0 deletions lib/sidekiq/pauzer/repository.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# frozen_string_literal: true

module Sidekiq
module Pauzer
class Repository
include Enumerable

REDIS_KEY = "sidekiq-pauzer"

# @overload each
# @return [Enumerator<String>]
#
# @overload each(&block)
# For a block { |queue_name| ... }
# @yieldparam queue_name [String]
# @return [self]
def each
return to_enum __method__ unless block_given?

redis_call("SMEMBERS", REDIS_KEY).each { yield _1.freeze }

self
end

# @param queue_name [#to_s]
# @return [void]
def add(queue_name)
redis_call("SADD", REDIS_KEY, queue_name.to_s)
nil
end

# @param queue_name [#to_s]
# @return [void]
def delete(queue_name)
redis_call("SREM", REDIS_KEY, queue_name.to_s)
nil
end

# @param name [#to_s]
# @return [void]
def include?(queue_name)
redis_call("SISMEMBER", REDIS_KEY, queue_name.to_s).positive?
end

private

def redis_call(...)
Sidekiq.redis { _1.call(...) }
end
end
end
end
6 changes: 5 additions & 1 deletion spec/lib/sidekiq/pauzer/patches/basic_fetch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@
let(:queues) { ["foo,1", "bar,10", "baz,100"] }

before do
Sidekiq::Pauzer.pause!(:foo)
Sidekiq::Pauzer.pause!("foo")
Sidekiq::Pauzer.configure { |c| c.refresh_rate = 0.1 }
Sidekiq::Pauzer.startup
sleep 0.1

stub_const("Sidekiq::BasicFetch::TIMEOUT", 0.1)
end

Expand Down
4 changes: 2 additions & 2 deletions spec/lib/sidekiq/pauzer/patches/queue_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
Sidekiq::Pauzer.unpause!("foo")

expect { subject }
.to change { redis_smembers(Sidekiq::Pauzer::REDIS_KEY) }.to(contain_exactly("foo"))
.to change { redis_smembers }.to(contain_exactly("foo"))
.and change { Sidekiq::Pauzer.paused? "foo" }.to(true)
end
end
Expand All @@ -60,7 +60,7 @@
Sidekiq::Pauzer.pause!("foo")

expect { subject }
.to change { redis_smembers(Sidekiq::Pauzer::REDIS_KEY) }.to(be_empty)
.to change { redis_smembers }.to(be_empty)
.and change { Sidekiq::Pauzer.paused? "foo" }.to(false)
end
end
Expand Down
Loading

0 comments on commit b3265ea

Please sign in to comment.