Skip to content

Commit

Permalink
Fix that :PRIMED keys are seemingly not removed (#574)
Browse files Browse the repository at this point in the history
* Adds coverage for batch delete

* The created_at belongs to payload

This prevents the reaper from crashing

* Prevent keys from bleeding out into the wild

There shouldn't be any need for these keys after a lock has been achieved

* Always clear out info, primed and queued
  • Loading branch information
mhenrixon committed Feb 8, 2021
1 parent 4870453 commit f147131
Show file tree
Hide file tree
Showing 10 changed files with 100 additions and 22 deletions.
2 changes: 1 addition & 1 deletion lib/sidekiq_unique_jobs/batch_delete.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class BatchDelete
#
# @return [void]
#
def self.call(digests, conn)
def self.call(digests, conn = nil)
new(digests, conn).call
end

Expand Down
8 changes: 7 additions & 1 deletion lib/sidekiq_unique_jobs/json.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,17 @@ module JSON
# @return [Object]
#
def load_json(string)
return unless string && !string.empty?
return if string.nil? || string.empty?

::JSON.parse(string)
end

def safe_load_json(string)
return string if string.is_a?(Hash)

load_json(string)
end

#
# Dumps an object into a JSON string
#
Expand Down
32 changes: 31 additions & 1 deletion lib/sidekiq_unique_jobs/lock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def initialize(key, time: nil)
#
# Locks a job_id
#
# @note intended only for testing purposez
# @note intended only for testing purposes
#
# @param [String] job_id a sidekiq JID
# @param [Hash] lock_info information about the lock
Expand All @@ -73,6 +73,36 @@ def lock(job_id, lock_info = {})
end
end

#
# Create the :QUEUED key
#
# @note intended only for testing purposes
#
# @param [String] job_id a sidekiq JID
#
# @return [void]
#
def queue(job_id)
redis do |conn|
conn.lpush(key.queued, job_id)
end
end

#
# Create the :PRIMED key
#
# @note intended only for testing purposes
#
# @param [String] job_id a sidekiq JID
#
# @return [void]
#
def prime(job_id)
redis do |conn|
conn.lpush(key.primed, job_id)
end
end

#
# Unlock a specific job_id
#
Expand Down
18 changes: 9 additions & 9 deletions lib/sidekiq_unique_jobs/lua/lock.lua
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,19 @@ if pttl and pttl > 0 then
log_debug("PEXPIRE", digest, pttl)
redis.call("PEXPIRE", digest, pttl)

log_debug("PEXPIRE", queued, pttl)
redis.call("PEXPIRE", queued, pttl)

log_debug("PEXPIRE", primed, pttl)
redis.call("PEXPIRE", primed, pttl)

log_debug("PEXPIRE", locked, pttl)
redis.call("PEXPIRE", locked, pttl)

log_debug("PEXPIRE", info, pttl)
redis.call("PEXPIRE", info, pttl)
end

log_debug("PEXPIRE", queued, 1000)
redis.call("PEXPIRE", queued, 1000)

log_debug("PEXPIRE", primed, 1000)
redis.call("PEXPIRE", primed, 1000)

log_debug("PEXPIRE", info, 1000)
redis.call("PEXPIRE", info, 1000)

log("Locked")
log_debug("END lock digest:", digest, "job_id:", job_id)
return job_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ local function find_digest_in_process_set(digest, threshold)
end

local job = cjson.decode(jobstr)
if job.created_at > threshold then
if job.payload.created_at > threshold then
found = true
break
end
Expand Down
16 changes: 9 additions & 7 deletions lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ def call
# @return [Array<String>] an array of orphaned digests
#
def orphans
conn.zrevrange(digests.key, 0, -1).each_with_object([]) do |digest, result|
conn.zrevrange(digests.key, 0, -1).each_with_object([]) do |digest, memo|
next if belongs_to_job?(digest)

result << digest
break if result.size >= reaper_count
memo << digest
break if memo.size >= reaper_count
end
end

Expand Down Expand Up @@ -117,7 +117,7 @@ def enqueued?(digest)
end
end

def active?(digest) # rubocop:disable Metrics/MethodLength, Metrics/CyclomaticComplexity
def active?(digest) # rubocop:disable Metrics/MethodLength, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
Sidekiq.redis do |conn|
procs = conn.sscan_each("processes").to_a
return false if procs.empty?
Expand All @@ -132,10 +132,12 @@ def active?(digest) # rubocop:disable Metrics/MethodLength, Metrics/CyclomaticCo
next unless workers.any?

workers.each_pair do |_tid, job|
item = load_json(job)
next unless (item = safe_load_json(job))

return true if item.dig(PAYLOAD, LOCK_DIGEST) == digest
return true if considered_active?(item[CREATED_AT])
payload = safe_load_json(item[PAYLOAD])

return true if payload[LOCK_DIGEST] == digest
return true if considered_active?(payload[CREATED_AT])
end
end

Expand Down
34 changes: 34 additions & 0 deletions spec/sidekiq_unique_jobs/batch_delete_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# frozen_string_literal: true

require "spec_helper"

RSpec.describe SidekiqUniqueJobs::BatchDelete do
let(:digests) { [] }
let(:locks) { [] }

describe ".call" do
subject(:call) { described_class.call(digests) }

before do
10.times do |_n|
digest = SecureRandom.hex
lock = SidekiqUniqueJobs::Lock.create(digest, digest)

lock.queue(digest)
lock.prime(digest)

digests << digest
locks << lock
end
end

it "deletes all digests (locked, primed, queued and run keys)" do
call

locks.all? do |lock|
expect(lock.all_jids).to match_array([])
expect(unique_keys).to match_array([])
end
end
end
end
6 changes: 5 additions & 1 deletion spec/sidekiq_unique_jobs/middleware/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,11 @@

MyUniqueJob.perform_in(expected_expires_at, "mika", "hel")

unique_keys.all? { |key| expect(key).to have_ttl(8_100) }
unique_keys.all? do |key|
next if key.end_with?(":INFO")

expect(key).to have_ttl(8_100)
end
end

it "logs duplicate payload when configured" do
Expand Down
2 changes: 1 addition & 1 deletion spec/sidekiq_unique_jobs/orphans/reaper_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@
conn.multi do
conn.sadd("processes", process_key)
conn.set(process_key, "bogus")
conn.hset(worker_key, thread_id, dump_json(created_at: created_at, payload: item))
conn.hset(worker_key, thread_id, dump_json(payload: item.merge(created_at: created_at)))
conn.expire(process_key, 60)
conn.expire(worker_key, 60)
end
Expand Down
2 changes: 2 additions & 0 deletions spec/workers/until_expired_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
it "sets keys to expire as per configuration" do
lock_ttl = described_class.get_sidekiq_options["lock_ttl"]
unique_keys.all? do |key|
next if key.end_with?(":INFO")

expect(key).to have_ttl(lock_ttl + 60).within(10)
end
end
Expand Down

0 comments on commit f147131

Please sign in to comment.