diff --git a/.reek.yml b/.reek.yml index 82dc0a5e..4f2e69ea 100644 --- a/.reek.yml +++ b/.reek.yml @@ -51,6 +51,7 @@ detectors: FeatureEnvy: exclude: - SidekiqUniqueJobs::BatchDelete#batch_delete + - SidekiqUniqueJobs::Cli#list_entries - SidekiqUniqueJobs::Digests#page - SidekiqUniqueJobs::Digests#page - SidekiqUniqueJobs::InvalidUniqueArguments#initialize diff --git a/lib/sidekiq_unique_jobs.rb b/lib/sidekiq_unique_jobs.rb index cda21aa3..249551b7 100644 --- a/lib/sidekiq_unique_jobs.rb +++ b/lib/sidekiq_unique_jobs.rb @@ -72,6 +72,7 @@ require "sidekiq_unique_jobs/on_conflict" require "sidekiq_unique_jobs/changelog" require "sidekiq_unique_jobs/digests" +require "sidekiq_unique_jobs/expiring_digests" require "sidekiq_unique_jobs/config" require "sidekiq_unique_jobs/sidekiq_unique_jobs" diff --git a/lib/sidekiq_unique_jobs/cli.rb b/lib/sidekiq_unique_jobs/cli.rb index a83720b6..a669ef05 100644 --- a/lib/sidekiq_unique_jobs/cli.rb +++ b/lib/sidekiq_unique_jobs/cli.rb @@ -20,9 +20,11 @@ def self.banner(command, _namespace = nil, _subcommand = false) # rubocop:disabl option :count, aliases: :c, type: :numeric, default: 1000, desc: "The max number of digests to return" # :nodoc: def list(pattern = "*") - entries = digests.entries(pattern: pattern, count: options[:count]) - say "Found #{entries.size} digests matching '#{pattern}':" - print_in_columns(entries.sort) if entries.any? + max_count = options[:count] + say "Searching for regular digests" + list_entries(digests.entries(pattern: pattern, count: max_count), pattern) + say "Searching for expiring digests" + list_entries(expiring_digests.entries(pattern: pattern, count: max_count), pattern) end desc "del PATTERN", "deletes unique digests from redis by pattern" @@ -32,11 +34,9 @@ def list(pattern = "*") def del(pattern) max_count = options[:count] if options[:dry_run] - result = digests.entries(pattern: pattern, count: max_count) - say "Would delete #{result.size} digests matching '#{pattern}'" + count_entries_for_del(max_count, pattern) else - deleted_count = digests.delete_by_pattern(pattern, count: max_count) - say "Deleted #{deleted_count} digests matching '#{pattern}'" + del_entries(max_count, pattern) end end @@ -51,12 +51,17 @@ def console console_class.start end - no_commands do + no_commands do # rubocop:disable Metrics/BlockLength # :nodoc: def digests @digests ||= SidekiqUniqueJobs::Digests.new end + # :nodoc: + def expiring_digests + @expiring_digests ||= SidekiqUniqueJobs::ExpiringDigests.new + end + # :nodoc: def console_class require "pry" @@ -65,6 +70,26 @@ def console_class require "irb" IRB end + + # :nodoc: + def list_entries(entries, pattern) + say "Found #{entries.size} digests matching '#{pattern}':" + print_in_columns(entries.sort) if entries.any? + end + + # :nodoc: + def count_entries_for_del(max_count, pattern) + count = digests.entries(pattern: pattern, count: max_count).size + + expiring_digests.entries(pattern: pattern, count: max_count).size + say "Would delete #{count} digests matching '#{pattern}'" + end + + # :nodoc: + def del_entries(max_count, pattern) + deleted_count = digests.delete_by_pattern(pattern, count: max_count).to_i + + expiring_digests.delete_by_pattern(pattern, count: max_count).to_i + say "Deleted #{deleted_count} digests matching '#{pattern}'" + end end end end diff --git a/lib/sidekiq_unique_jobs/constants.rb b/lib/sidekiq_unique_jobs/constants.rb index 14c55684..b9af7131 100644 --- a/lib/sidekiq_unique_jobs/constants.rb +++ b/lib/sidekiq_unique_jobs/constants.rb @@ -14,6 +14,7 @@ module SidekiqUniqueJobs CREATED_AT = "created_at" DEAD_VERSION = "uniquejobs:dead" DIGESTS = "uniquejobs:digests" + EXPIRING_DIGESTS = "uniquejobs:expiring_digests" ERRORS = "errors" JID = "jid" LIMIT = "limit" diff --git a/lib/sidekiq_unique_jobs/digests.rb b/lib/sidekiq_unique_jobs/digests.rb index c6552116..efc30965 100644 --- a/lib/sidekiq_unique_jobs/digests.rb +++ b/lib/sidekiq_unique_jobs/digests.rb @@ -14,8 +14,8 @@ class Digests < Redis::SortedSet # @return [String] the default pattern to use for matching SCAN_PATTERN = "*" - def initialize - super(DIGESTS) + def initialize(digests_key = DIGESTS) + super(digests_key) end # diff --git a/lib/sidekiq_unique_jobs/expiring_digests.rb b/lib/sidekiq_unique_jobs/expiring_digests.rb new file mode 100644 index 00000000..04e33a90 --- /dev/null +++ b/lib/sidekiq_unique_jobs/expiring_digests.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true + +module SidekiqUniqueJobs + # + # Class ExpiringDigests provides access to the expiring digests used by until_expired locks + # + # @author Mikael Henriksson + # + class ExpiringDigests < Digests + def initialize + super(EXPIRING_DIGESTS) + end + end +end diff --git a/lib/sidekiq_unique_jobs/key.rb b/lib/sidekiq_unique_jobs/key.rb index bee8dea0..296224f3 100644 --- a/lib/sidekiq_unique_jobs/key.rb +++ b/lib/sidekiq_unique_jobs/key.rb @@ -33,6 +33,10 @@ class Key # @!attribute [r] digests # @return [String] the zset with locked digests attr_reader :digests + # + # @!attribute [r] expiring_digests + # @return [String] the zset with locked expiring_digests + attr_reader :expiring_digests # # Initialize a new Key @@ -40,13 +44,14 @@ class Key # @param [String] digest the digest to use as key # def initialize(digest) - @digest = digest - @queued = suffixed_key("QUEUED") - @primed = suffixed_key("PRIMED") - @locked = suffixed_key("LOCKED") - @info = suffixed_key("INFO") - @changelog = CHANGELOGS - @digests = DIGESTS + @digest = digest + @queued = suffixed_key("QUEUED") + @primed = suffixed_key("PRIMED") + @locked = suffixed_key("LOCKED") + @info = suffixed_key("INFO") + @changelog = CHANGELOGS + @digests = DIGESTS + @expiring_digests = EXPIRING_DIGESTS end # @@ -81,7 +86,7 @@ def ==(other) # @return [Array] an ordered array with all keys # def to_a - [digest, queued, primed, locked, info, changelog, digests] + [digest, queued, primed, locked, info, changelog, digests, expiring_digests] end private diff --git a/lib/sidekiq_unique_jobs/lua/lock.lua b/lib/sidekiq_unique_jobs/lua/lock.lua index bfaf97e4..2104e65f 100644 --- a/lib/sidekiq_unique_jobs/lua/lock.lua +++ b/lib/sidekiq_unique_jobs/lua/lock.lua @@ -1,11 +1,12 @@ -------- BEGIN keys --------- -local digest = KEYS[1] -local queued = KEYS[2] -local primed = KEYS[3] -local locked = KEYS[4] -local info = KEYS[5] -local changelog = KEYS[6] -local digests = KEYS[7] +local digest = KEYS[1] +local queued = KEYS[2] +local primed = KEYS[3] +local locked = KEYS[4] +local info = KEYS[5] +local changelog = KEYS[6] +local digests = KEYS[7] +local expiring_digests = KEYS[8] -------- END keys --------- @@ -57,8 +58,13 @@ if limit_exceeded then return nil end -log_debug("ZADD", digests, current_time, digest) -redis.call("ZADD", digests, current_time, digest) +if lock_type == "until_expired" and pttl and pttl > 0 then + log_debug("ZADD", expiring_digests, current_time + pttl, digest) + redis.call("ZADD", expiring_digests, current_time + pttl, digest) +else + log_debug("ZADD", digests, current_time, digest) + redis.call("ZADD", digests, current_time, digest) +end log_debug("HSET", locked, job_id, current_time) redis.call("HSET", locked, job_id, current_time) diff --git a/lib/sidekiq_unique_jobs/lua/reap_orphans.lua b/lib/sidekiq_unique_jobs/lua/reap_orphans.lua index e3845ec3..e3c39dd9 100644 --- a/lib/sidekiq_unique_jobs/lua/reap_orphans.lua +++ b/lib/sidekiq_unique_jobs/lua/reap_orphans.lua @@ -1,9 +1,10 @@ redis.replicate_commands() -------- BEGIN keys --------- -local digests_set = KEYS[1] -local schedule_set = KEYS[2] -local retry_set = KEYS[3] +local digests_set = KEYS[1] +local expiring_digests_set = KEYS[2] +local schedule_set = KEYS[3] +local retry_set = KEYS[4] -------- END keys --------- -------- BEGIN argv --------- @@ -90,5 +91,32 @@ repeat index = index + per until index >= total or del_count >= reaper_count +if del_count < reaper_count then + index = 0 + total = redis.call("ZCOUNT", expiring_digests_set, 0, current_time) + repeat + local digests = redis.call("ZRANGEBYSCORE", expiring_digests_set, 0, current_time, "LIMIT", index, index + per -1) + + for _, digest in pairs(digests) do + local queued = digest .. ":QUEUED" + local primed = digest .. ":PRIMED" + local locked = digest .. ":LOCKED" + local info = digest .. ":INFO" + local run_digest = digest .. ":RUN" + local run_queued = digest .. ":RUN:QUEUED" + local run_primed = digest .. ":RUN:PRIMED" + local run_locked = digest .. ":RUN:LOCKED" + local run_info = digest .. ":RUN:INFO" + + redis.call(del_cmd, digest, queued, primed, locked, info, run_digest, run_queued, run_primed, run_locked, run_info) + + redis.call("ZREM", expiring_digests_set, digest) + del_count = del_count + 1 + end + + index = index + per + until index >= total or del_count >= reaper_count +end + log_debug("END") return del_count diff --git a/lib/sidekiq_unique_jobs/orphans/lua_reaper.rb b/lib/sidekiq_unique_jobs/orphans/lua_reaper.rb index fc9fd010..08feab81 100644 --- a/lib/sidekiq_unique_jobs/orphans/lua_reaper.rb +++ b/lib/sidekiq_unique_jobs/orphans/lua_reaper.rb @@ -20,7 +20,7 @@ def call call_script( :reap_orphans, conn, - keys: [DIGESTS, SCHEDULE, RETRY, PROCESSES], + keys: [DIGESTS, EXPIRING_DIGESTS, SCHEDULE, RETRY, PROCESSES], argv: [reaper_count, (Time.now - reaper_timeout).to_f], ) end diff --git a/lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb b/lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb index dcfcca6b..faa47054 100644 --- a/lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb +++ b/lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb @@ -66,6 +66,11 @@ def call return if queues_very_full? BatchDelete.call(orphans, conn) + BatchDelete.call(expired_digests, conn) + end + + def expired_digests + conn.zrangebyscore(EXPIRING_DIGESTS, 0, @start_time) end # diff --git a/lib/sidekiq_unique_jobs/web.rb b/lib/sidekiq_unique_jobs/web.rb index fe9d7964..06a2d21a 100644 --- a/lib/sidekiq_unique_jobs/web.rb +++ b/lib/sidekiq_unique_jobs/web.rb @@ -8,7 +8,7 @@ module SidekiqUniqueJobs # # @author Mikael Henriksson module Web - def self.registered(app) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize + def self.registered(app) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity app.helpers do include Web::Helpers end @@ -49,8 +49,25 @@ def self.registered(app) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize erb(unique_template(:locks)) end + app.get "/expiring_locks" do + @filter = params[:filter] || "*" + @filter = "*" if @filter == "" + @count = (params[:count] || 100).to_i + @current_cursor = params[:cursor] + @prev_cursor = params[:prev_cursor] + + @total_size, @next_cursor, @locks = expiring_digests.page( + cursor: @current_cursor, + pattern: @filter, + page_size: @count, + ) + + erb(unique_template(:locks)) + end + app.get "/locks/delete_all" do digests.delete_by_pattern("*", count: digests.count) + expiring_digests.delete_by_pattern("*", count: digests.count) redirect_to :locks end @@ -63,6 +80,7 @@ def self.registered(app) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize app.get "/locks/:digest/delete" do digests.delete_by_digest(params[:digest]) + expiring_digests.delete_by_digest(params[:digest]) redirect_to :locks end @@ -82,8 +100,9 @@ def self.registered(app) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize require "sidekiq/web" unless defined?(Sidekiq::Web) Sidekiq::Web.register(SidekiqUniqueJobs::Web) - Sidekiq::Web.tabs["Locks"] = "locks" - Sidekiq::Web.tabs["Changelogs"] = "changelogs" + Sidekiq::Web.tabs["Locks"] = "locks" + Sidekiq::Web.tabs["Expiring Locks"] = "expiring_locks" + Sidekiq::Web.tabs["Changelogs"] = "changelogs" Sidekiq::Web.settings.locales << File.join(File.dirname(__FILE__), "locales") rescue NameError, LoadError => ex SidekiqUniqueJobs.logger.error(ex) diff --git a/lib/sidekiq_unique_jobs/web/helpers.rb b/lib/sidekiq_unique_jobs/web/helpers.rb index 621168ca..73e8df5c 100644 --- a/lib/sidekiq_unique_jobs/web/helpers.rb +++ b/lib/sidekiq_unique_jobs/web/helpers.rb @@ -51,6 +51,16 @@ def digests @digests ||= SidekiqUniqueJobs::Digests.new end + # + # The collection of digests + # + # + # @return [SidekiqUniqueJobs::ExpiringDigests] the sorted set with expiring digests + # + def expiring_digests + @expiring_digests ||= SidekiqUniqueJobs::ExpiringDigests.new + end + # # The collection of changelog entries # diff --git a/spec/sidekiq_unique_jobs/cli_spec.rb b/spec/sidekiq_unique_jobs/cli_spec.rb index bb6db17e..c1113031 100644 --- a/spec/sidekiq_unique_jobs/cli_spec.rb +++ b/spec/sidekiq_unique_jobs/cli_spec.rb @@ -68,7 +68,14 @@ subject(:list) { capture(:stdout) { described_class.start(%w[list * --count 1000]) } } context "when no digests exist" do - it { is_expected.to eq("Found 0 digests matching '#{pattern}':\n") } + it do + expect(list).to eq <<~HEADER + Searching for regular digests + Found 0 digests matching '#{pattern}': + Searching for expiring digests + Found 0 digests matching '#{pattern}': + HEADER + end end context "when a key exists" do diff --git a/spec/sidekiq_unique_jobs/key_spec.rb b/spec/sidekiq_unique_jobs/key_spec.rb index bef031ac..72819551 100644 --- a/spec/sidekiq_unique_jobs/key_spec.rb +++ b/spec/sidekiq_unique_jobs/key_spec.rb @@ -27,6 +27,7 @@ #{digest_one}:INFO uniquejobs:changelog uniquejobs:digests + uniquejobs:expiring_digests ], ) end diff --git a/spec/sidekiq_unique_jobs/lua/lock_spec.rb b/spec/sidekiq_unique_jobs/lua/lock_spec.rb index 6faff756..ed0dddb6 100644 --- a/spec/sidekiq_unique_jobs/lua/lock_spec.rb +++ b/spec/sidekiq_unique_jobs/lua/lock_spec.rb @@ -65,7 +65,7 @@ it { expect { lock }.to change { hget(key.locked, job_id_one) }.from(nil) } it { expect { lock }.to change { llen(key.queued) }.by(0) } it { expect { lock }.to change { llen(key.primed) }.by(-1) } - it { expect { lock }.to change { zcard("uniquejobs:digests") }.by(1) } + it { expect { lock }.to change { zcard("uniquejobs:expiring_digests") }.by(1) } end context "when given lock_ttl nil" do diff --git a/spec/sidekiq_unique_jobs/lua/reap_orphans_spec.rb b/spec/sidekiq_unique_jobs/lua/reap_orphans_spec.rb index 88d983d0..9a5c9d80 100644 --- a/spec/sidekiq_unique_jobs/lua/reap_orphans_spec.rb +++ b/spec/sidekiq_unique_jobs/lua/reap_orphans_spec.rb @@ -12,6 +12,7 @@ let(:redis_keys) do [ SidekiqUniqueJobs::DIGESTS, + SidekiqUniqueJobs::EXPIRING_DIGESTS, SidekiqUniqueJobs::SCHEDULE, SidekiqUniqueJobs::RETRY, ] diff --git a/spec/sidekiq_unique_jobs/lua/unlock_spec.rb b/spec/sidekiq_unique_jobs/lua/unlock_spec.rb index e0c2c788..a47d13fc 100644 --- a/spec/sidekiq_unique_jobs/lua/unlock_spec.rb +++ b/spec/sidekiq_unique_jobs/lua/unlock_spec.rb @@ -69,7 +69,7 @@ it { expect { unlock }.to change { llen(key.primed) }.by(0) } it { expect { unlock }.not_to change { ttl(key.locked) } } it { expect { unlock }.not_to change { hget(key.locked, job_id_one) } } - it { expect { unlock }.to change { zcard(key.digests) }.by(-1) } + it { expect { unlock }.not_to change { zcard(key.expiring_digests) } } end context "when given lock_ttl nil" do diff --git a/spec/support/sidekiq_unique_jobs/testing.rb b/spec/support/sidekiq_unique_jobs/testing.rb index 054949ed..ccda62ad 100644 --- a/spec/support/sidekiq_unique_jobs/testing.rb +++ b/spec/support/sidekiq_unique_jobs/testing.rb @@ -827,7 +827,8 @@ def locking_jids end def unique_keys - keys("uniquejobs:*") - [SidekiqUniqueJobs::CHANGELOGS, SidekiqUniqueJobs::DIGESTS] + keys("uniquejobs:*") - [SidekiqUniqueJobs::CHANGELOGS, SidekiqUniqueJobs::DIGESTS, + SidekiqUniqueJobs::EXPIRING_DIGESTS] end def changelogs