forked from defunkt/resque-lock
-
Notifications
You must be signed in to change notification settings - Fork 20
/
lock_timeout.rb
308 lines (279 loc) · 9.61 KB
/
lock_timeout.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
module Resque
module Plugins
# If you want only one instance of your job running at a time,
# extend it with this module:
#
# require 'resque-lock-timeout'
#
# class UpdateNetworkGraph
# extend Resque::Plugins::LockTimeout
# @queue = :network_graph
#
# def self.perform(repo_id)
# heavy_lifting
# end
# end
#
# If you wish to limit the duration a lock may be held for, you can
# set/override `lock_timeout`. e.g.
#
# class UpdateNetworkGraph
# extend Resque::Plugins::LockTimeout
# @queue = :network_graph
#
# # lock may be held for upto an hour.
# @lock_timeout = 3600
#
# def self.perform(repo_id)
# heavy_lifting
# end
# end
#
# If you wish that only one instance of the job defined by #identifier may be
# enqueued or running, you can set/override `loner`. e.g.
#
# class PdfExport
# extend Resque::Plugins::LockTimeout
# @queue = :exports
#
# # only one job can be running/enqueued at a time. For instance a button
# # to run a PDF export. If the user clicks several times on it, enqueue
# # the job if and only if
# # - the same export is not currently running
# # - the same export is not currently queued.
# # ('same' being defined by `identifier`)
# @loner = true
#
# def self.perform(repo_id)
# heavy_lifting
# end
# end
module LockTimeout
# @abstract You may override to implement a custom identifier,
# you should consider doing this if your job arguments
# are many/long or may not cleanly cleanly to strings.
#
# Builds an identifier using the job arguments. This identifier
# is used as part of the redis lock key.
#
# @param [Array] args job arguments
# @return [String, nil] job identifier
def identifier(*args)
args.join('-')
end
# Override to fully control the redis object used for storing
# the locks.
#
# The default is Resque.redis
#
# @return [Redis] redis object
def lock_redis
Resque.redis
end
# Override to fully control the lock key used. It is passed
# the job arguments.
#
# The default looks like this: `lock:<class name>:<identifier>`
#
# @param [Array] args job arguments
# @return [String] redis key
def redis_lock_key(*args)
['lock', name, identifier(*args)].compact.join(':')
end
# Builds lock key used by `@loner` option. Passed job arguments.
#
# The default looks like this: `loner:lock:<class name>:<identifier>`
#
# @param [Array] args job arguments
# @return [String] redis key
def redis_loner_lock_key(*args)
['loner', redis_lock_key(*args)].compact.join(':')
end
# Number of seconds the lock may be held for.
# A value of 0 or below will lock without a timeout.
#
# @param [Array] args job arguments
# @return [Fixnum]
def lock_timeout(*args)
@lock_timeout ||= 0
end
# Whether one instance of the job should be running or enqueued.
#
# @param [Array] args job arguments
# @return [TrueClass || FalseClass]
def loner(*args)
@loner ||= false
end
# Checks if job is locked or loner locked (if applicable).
#
# @return [Boolean] true if the job is locked by someone
def loner_locked?(*args)
locked?(*args) || (loner && enqueued?(*args))
end
# Convenience method to check if job is locked and lock did not expire.
#
# @return [Boolean] true if the job is locked by someone
def locked?(*args)
inspect_lock(:redis_lock_key, *args)
end
# Convenience method to check if a loner job is queued and lock did not expire.
#
# @return [Boolean] true if the job is already queued
def enqueued?(*args)
inspect_lock(:redis_loner_lock_key, *args)
end
# Check for existence of given key.
#
# @param [Array] args job arguments
# @param [Symbol] lock_key_method the method returning redis key to lock
# @return [Boolean] true if the lock exists
def inspect_lock(lock_key_method, *args)
lock_until = lock_redis.get(self.send(lock_key_method, *args))
return (lock_until.to_i > Time.now.to_i) if lock_timeout(*args) > 0
!lock_until.nil?
end
# @abstract
# Hook method; called when a were unable to acquire the lock.
#
# @param [Array] args job arguments
def lock_failed(*args)
end
# @abstract
# Hook method; called when a were unable to enqueue loner job.
#
# @param [Array] args job arguments
def loner_enqueue_failed(*args)
end
# @abstract
# Hook method; called when the lock expired before we released it.
#
# @param [Array] args job arguments
def lock_expired_before_release(*args)
end
# @abstract
# if the job is a `loner`, enqueue only if no other same job
# is already running/enqueued
#
# @param [Array] args job arguments
def before_enqueue_lock(*args)
if loner
if locked?(*args)
# Same job is currently running
loner_enqueue_failed(*args)
false
else
acquire_loner_lock!(*args)
end
end
end
# Try to acquire a lock for running the job.
# @return [Boolean, Fixnum]
def acquire_lock!(*args)
acquire_lock_impl!(:redis_lock_key, :lock_failed, *args)
end
# Try to acquire a lock to enqueue a loner job.
# @return [Boolean, Fixnum]
def acquire_loner_lock!(*args)
acquire_lock_impl!(:redis_loner_lock_key, :loner_enqueue_failed, *args)
end
# Generic implementation of the locking logic
#
# Returns false; when unable to acquire the lock.
# * Returns true; when lock acquired, without a timeout.
# * Returns timestamp; when lock acquired with a timeout, timestamp is
# when the lock timeout expires.
#
# @param [Symbol] lock_key_method the method returning redis key to lock
# @param [Symbol] failed_hook the method called if lock failed
# @param [Array] args job arguments
# @return [Boolean, Fixnum]
def acquire_lock_impl!(lock_key_method, failed_hook, *args)
acquired = false
lock_key = self.send lock_key_method, *args
unless lock_timeout(*args) > 0
# Acquire without using a timeout.
acquired = true if lock_redis.setnx(lock_key, true)
else
# Acquire using the timeout algorithm.
acquired, lock_until = acquire_lock_algorithm!(lock_key, *args)
end
self.send(failed_hook, *args) if !acquired
lock_until && acquired ? lock_until : acquired
end
# Attempts to acquire the lock using a timeout / deadlock algorithm.
#
# Locking algorithm: http://code.google.com/p/redis/wiki/SetnxCommand
#
# @param [String] lock_key redis lock key
# @param [Array] args job arguments
def acquire_lock_algorithm!(lock_key, *args)
now = Time.now.to_i
lock_until = now + lock_timeout(*args)
acquired = false
return [true, lock_until] if lock_redis.setnx(lock_key, lock_until)
# Can't acquire the lock, see if it has expired.
lock_expiration = lock_redis.get(lock_key)
if lock_expiration && lock_expiration.to_i < now
# expired, try to acquire.
lock_expiration = lock_redis.getset(lock_key, lock_until)
if lock_expiration.nil? || lock_expiration.to_i < now
acquired = true
end
else
# Try once more...
acquired = true if lock_redis.setnx(lock_key, lock_until)
end
[acquired, lock_until]
end
# Release the lock.
#
# @param [Array] args job arguments
def release_lock!(*args)
lock_redis.del(redis_lock_key(*args))
end
# Release the enqueue lock for loner jobs
#
# @param [Array] args job arguments
def release_loner_lock!(*args)
lock_redis.del(redis_loner_lock_key(*args))
end
# Refresh the lock.
#
# @param [Array] args job arguments
def refresh_lock!(*args)
now = Time.now.to_i
lock_until = now + lock_timeout(*args)
lock_redis.set(redis_lock_key(*args), lock_until)
end
# Where the magic happens.
#
# @param [Array] args job arguments
def around_perform_lock(*args)
lock_until = acquire_lock!(*args)
# Release loner lock as job has been dequeued
release_loner_lock!(*args) if loner
# Abort if another job holds the lock.
return unless lock_until
begin
yield
ensure
# Release the lock on success and error. Unless a lock_timeout is
# used, then we need to be more careful before releasing the lock.
now = Time.now.to_i
if lock_until != true and lock_until < now
# Eeek! Lock expired before perform finished. Trigger callback.
lock_expired_before_release(*args)
else
release_lock!(*args)
end
end
end
def on_failure_lock(exception, *args)
# In case of a DirtyExit, the ensure block of the around hook is not called
if exception.is_a?(Resque::DirtyExit)
release_lock!(*args)
end
end
end
end
end