/
unique_jobs.rb
73 lines (59 loc) · 1.83 KB
/
unique_jobs.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
require 'digest'
require 'sidekiq_unique_jobs/connectors'
module SidekiqUniqueJobs
module Middleware
module Server
class UniqueJobs
attr_reader :unlock_order, :redis_pool
def call(worker, item, _queue, redis_pool = nil)
shutdown = false
@redis_pool = redis_pool
decide_unlock_order(worker.class)
lock_key = payload_hash(item)
unlock(lock_key) if before_yield?
yield
rescue Sidekiq::Shutdown
shutdown = true
raise
ensure
if !shutdown && (after_yield? || !defined? unlocked || unlocked != 1)
unlock(lock_key)
end
end
def decide_unlock_order(klass)
@unlock_order = if unlock_order_configured?(klass)
klass.get_sidekiq_options['unique_unlock_order']
else
default_unlock_order
end
end
def unlock_order_configured?(klass)
klass.respond_to?(:get_sidekiq_options) &&
!klass.get_sidekiq_options['unique_unlock_order'].nil?
end
def default_unlock_order
SidekiqUniqueJobs.config.default_unlock_order
end
def before_yield?
unlock_order == :before_yield
end
def after_yield?
unlock_order == :after_yield
end
protected
def payload_hash(item)
SidekiqUniqueJobs::PayloadHelper.get_payload(item['class'], item['queue'], item['args'])
end
def unlock(payload_hash)
connection { |c| c.del(payload_hash) }
end
def logger
Sidekiq.logger
end
def connection(&block)
SidekiqUniqueJobs::Connectors.connection(redis_pool, &block)
end
end
end
end
end