-
Notifications
You must be signed in to change notification settings - Fork 58
/
exclusive_locker.rb
104 lines (89 loc) · 3.16 KB
/
exclusive_locker.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
module ZK
module Locker
# An exclusive lock implementation
#
# If the name 'dingus' is given, then in the case of an exclusive lock, the
# algorithm works like:
#
# * lock_path = `zk.create("/_zklocking/dingus/ex", :sequential => true, :ephemeral => true)`
# * extract the digit from the lock path
# * of all the children under '/_zklocking/dingus', do we have the lowest digit?
# * __yes__: then we hold the lock, if we're non-blocking, return true
# * __no__: is the lock blocking?
# * __yes__: then set a watch on the next-to-lowest node and sleep the current thread until that node has been deleted
# * __no__: return false, you lose
#
class ExclusiveLocker < LockerBase
# (see LockerBase#lock)
# obtain an exclusive lock.
#
def lock(opts={})
super
end
# (see LockerBase#assert!)
#
# checks that we:
#
# * we have obtained the lock (i.e. {#locked?} is true)
# * have a lock path
# * our lock path still exists
# * there are no locks, _exclusive or shared_, with lower numbers than ours
#
def assert!
super
end
# (see LockerBase#acquirable?)
def acquirable?
return true if locked?
stat = zk.stat(root_lock_path)
!stat.exists? or stat.num_children == 0
rescue Exceptions::NoNode # XXX: is this ever hit? stat shouldn't raise
true
end
private
def lock_with_opts_hash(opts)
create_lock_path!(EXCLUSIVE_LOCK_PREFIX)
block, timeout = opts.values_at(:block, :timeout)
if got_write_lock?
synchronize { @locked = true }
elsif block
block_until_write_lock!(:timeout => timeout)
else
cleanup_lock_path!
false
end
end
# the node that is next-lowest in sequence number to ours, the one we
# watch for updates to
def next_lowest_node
ary = ordered_lock_children()
my_idx = ary.index(lock_basename)
raise WeAreTheLowestLockNumberException if my_idx == 0
ary[(my_idx - 1)]
end
def got_write_lock?
ordered_lock_children.first == lock_basename
end
alias got_lock? got_write_lock?
def block_until_write_lock!(opts={})
begin
path = "#{root_lock_path}/#{next_lowest_node}"
logger.debug { "#{self.class}##{__method__} path=#{path.inspect}" }
@mutex.synchronize do
logger.debug { "assigning the @node_deletion_watcher" }
@node_deletion_watcher = NodeDeletionWatcher.new(zk, path)
logger.debug { "broadcasting" }
@cond.broadcast
end
logger.debug { "calling block_until_deleted" }
Thread.pass
@node_deletion_watcher.block_until_deleted(opts)
rescue WeAreTheLowestLockNumberException
ensure
logger.debug { "block_until_deleted returned" }
end
@mutex.synchronize { @locked = true }
end
end # ExclusiveLocker
end # Locker
end # ZK