-
-
Notifications
You must be signed in to change notification settings - Fork 273
/
base_lock.rb
157 lines (135 loc) · 4.41 KB
/
base_lock.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# frozen_string_literal: true
module SidekiqUniqueJobs
class Lock
# Abstract base class for locks
#
# @abstract
# @author Mikael Henriksson <mikael@mhenrixon.com>
class BaseLock
extend Forwardable
# includes "SidekiqUniqueJobs::Logging"
# @!parse include SidekiqUniqueJobs::Logging
include SidekiqUniqueJobs::Logging
# includes "SidekiqUniqueJobs::Reflectable"
# @!parse include SidekiqUniqueJobs::Reflectable
include SidekiqUniqueJobs::Reflectable
#
# Validates that the sidekiq_options for the worker is valid
#
# @param [Hash] options the sidekiq_options given to the worker
#
# @return [void]
#
def self.validate_options(options = {})
Validator.validate(options)
end
# NOTE: Mainly used for a clean testing API
#
def_delegators :locksmith, :locked?
# @param [Hash] item the Sidekiq job hash
# @param [Proc] callback the callback to use after unlock
# @param [Sidekiq::RedisConnection, ConnectionPool] redis_pool the redis connection
def initialize(item, callback, redis_pool = nil)
@item = item
@callback = callback
@redis_pool = redis_pool
@attempt = 0
prepare_item # Used to ease testing
@lock_config = LockConfig.new(item)
end
#
# Locks a sidekiq job
#
# @note Will call a conflict strategy if lock can't be achieved.
#
# @return [String, nil] the locked jid when properly locked, else nil.
#
# @yield to the caller when given a block
#
def lock
raise NotImplementedError, "##{__method__} needs to be implemented in #{self.class}"
end
# Execute the job in the Sidekiq server processor
# @raise [NotImplementedError] needs to be implemented in child class
def execute
raise NotImplementedError, "##{__method__} needs to be implemented in #{self.class}"
end
#
# The lock manager/client
#
# @api private
# @return [SidekiqUniqueJobs::Locksmith] the locksmith for this sidekiq job
#
def locksmith
@locksmith ||= SidekiqUniqueJobs::Locksmith.new(item, redis_pool)
end
private
# @!attribute [r] item
# @return [Hash<String, Object>] the Sidekiq job hash
attr_reader :item
# @!attribute [r] lock_config
# @return [LockConfig] a lock configuration
attr_reader :lock_config
# @!attribute [r] redis_pool
# @return [Sidekiq::RedisConnection, ConnectionPool, NilClass] the redis connection
attr_reader :redis_pool
# @!attribute [r] callback
# @return [Proc] the block to call after unlock
attr_reader :callback
# @!attribute [r] attempt
# @return [Integer] the current locking attempt
attr_reader :attempt
def prepare_item
return if item.key?(LOCK_DIGEST)
# The below should only be done to ease testing
# in production this will be done by the middleware
SidekiqUniqueJobs::Job.prepare(item)
end
#
# Handle when lock failed
#
# @param [Symbol] location: :client or :server
#
# @return [void]
#
def lock_failed(origin: :client)
reflect(:lock_failed, item)
call_strategy(origin: origin)
end
def call_strategy(origin:)
@attempt += 1
case origin
when :client
client_strategy.call { lock if replace? }
when :server
server_strategy.call { lock if replace? }
else
raise SidekiqUniqueJobs::InvalidArgument,
"either `for: :server` or `for: :client` needs to be specified"
end
end
def replace?
client_strategy.replace? && attempt < 2
end
def unlock_and_callback
return callback_safely if locksmith.unlock
reflect(:unlock_failed, item)
end
def callback_safely
callback&.call
item[JID]
rescue StandardError
reflect(:after_unlock_callback_failed, item)
raise
end
def client_strategy
@client_strategy ||=
OnConflict.find_strategy(lock_config.on_client_conflict).new(item, redis_pool)
end
def server_strategy
@server_strategy ||=
OnConflict.find_strategy(lock_config.on_server_conflict).new(item, redis_pool)
end
end
end
end