-
Notifications
You must be signed in to change notification settings - Fork 75
/
zookeeper_base.rb
276 lines (219 loc) · 7.57 KB
/
zookeeper_base.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
require_relative 'c_zookeeper'
require 'forwardable'
# The low-level wrapper-specific methods for the C lib
# subclassed by the top-level Zookeeper class
module Zookeeper
class ZookeeperBase
extend Forwardable
include Forked
include Common # XXX: clean this up, no need to include *everything*
include Callbacks
include Constants
include Exceptions
include ACLs
include Logger
attr_accessor :original_pid
# @private
class ClientShutdownException < StandardError; end
# @private
KILL_TOKEN = Object.new unless defined?(KILL_TOKEN)
ZKRB_GLOBAL_CB_REQ = -1
# debug levels
ZOO_LOG_LEVEL_ERROR = 1
ZOO_LOG_LEVEL_WARN = 2
ZOO_LOG_LEVEL_INFO = 3
ZOO_LOG_LEVEL_DEBUG = 4
def_delegators :@czk, :get_children, :exists, :delete, :get, :set,
:set_acl, :get_acl, :client_id, :sync, :wait_until_connected
# some state methods need to be more paranoid about locking to ensure the correct
# state is returned
#
def self.threadsafe_inquisitor(*syms)
syms.each do |sym|
class_eval(<<-EOM, __FILE__, __LINE__+1)
def #{sym}
false|@mutex.synchronize { @czk and @czk.#{sym} }
end
EOM
end
end
threadsafe_inquisitor :connected?, :connecting?, :associating?, :running?
attr_reader :event_queue
# this method may be called in either the fork case, or from the constructor
# to set up this state initially (so all of this is in one location). we rely
# on the forked? method to determine which it is
def reopen_after_fork!
logger.debug { "#{self.class}##{__method__} fork detected!" } if forked?
@mutex = Monitor.new
@dispatch_shutdown_cond = @mutex.new_cond
@event_queue = QueueWithPipe.new
@dispatcher = nil if @dispatcher and not @dispatcher.alive?
update_pid! # from Forked
end
private :reopen_after_fork!
def reopen(timeout = 10, watcher=nil)
if watcher and (watcher != @default_watcher)
raise "You cannot set the watcher to a different value this way anymore!"
end
reopen_after_fork! if forked?
@mutex.synchronize do
@czk.close if @czk
@czk = CZookeeper.new(@host, @event_queue)
# flushes all outstanding watcher reqs.
@watcher_reqs.clear
set_default_global_watcher
@czk.wait_until_connected(timeout)
end
setup_dispatch_thread!
state
end
def initialize(host, timeout = 10, watcher=nil)
@watcher_reqs = {}
@completion_reqs = {}
@current_req_id = 0
@dispatcher = @czk = nil
update_pid!
reopen_after_fork!
# approximate the java behavior of raising java.lang.IllegalArgumentException if the host
# argument ends with '/'
raise ArgumentError, "Host argument #{host.inspect} may not end with /" if host.end_with?('/')
@host = host.dup
@default_watcher = (watcher or get_default_global_watcher)
yield self if block_given?
reopen(timeout)
end
# if either of these happen, the user will need to renegotiate a connection via reopen
def assert_open
@mutex.synchronize do
raise Exceptions::NotConnected if closed?
if forked?
raise InheritedConnectionError, <<-EOS.gsub(/(?:^|\n)\s*/, ' ').strip
You tried to use a connection inherited from another process
(original pid: #{original_pid}, your pid: #{Process.pid})
You need to call reopen() after forking
EOS
end
end
end
# do not lock, do not mutex, just close the underlying handle this is
# potentially dangerous and should only be called after a fork() to close
# this instance
def close!
@czk && @czk.close
end
# close the connection normally, stops the dispatch thread and closes the
# underlying connection cleanly
def close
shutdown_thread = Thread.new do
@mutex.synchronize do
stop_dispatch_thread!
close!
end
end
shutdown_thread.join unless event_dispatch_thread?
end
# the C lib doesn't strip the chroot path off of returned path values, which
# is pretty damn annoying. this is used to clean things up.
def create(*args)
# since we don't care about the inputs, just glob args
rc, new_path = @mutex.synchronize { @czk.create(*args) }
[rc, strip_chroot_from(new_path)]
end
def set_debug_level(int)
warn "DEPRECATION WARNING: #{self.class.name}#set_debug_level, it has moved to the class level and will be removed in a future release"
self.class.set_debug_level(int)
end
# set the watcher object/proc that will receive all global events (such as session/state events)
def set_default_global_watcher
warn "DEPRECATION WARNING: #{self.class}#set_default_global_watcher ignores block" if block_given?
@mutex.synchronize do
# @default_watcher = block # save this here for reopen() to use
@watcher_reqs[ZKRB_GLOBAL_CB_REQ] = { :watcher => @default_watcher, :watcher_context => nil }
end
end
def state
return ZOO_CLOSED_STATE if closed?
@mutex.synchronize { @czk.state }
end
def session_id
@mutex.synchronize do
cid = client_id and cid.session_id
end
end
def session_passwd
@mutex.synchronize do
cid = client_id and cid.passwd
end
end
# we are closed if there is no @czk instance or @czk.closed?
def closed?
@mutex.synchronize { !@czk or @czk.closed? }
end
def pause_before_fork_in_parent
@mutex.synchronize do
logger.debug { "ZookeeperBase#pause_before_fork_in_parent" }
# XXX: add anal-retentive state checking
raise "EXPLODERATE! @czk was nil!" unless @czk
@czk.pause_before_fork_in_parent
stop_dispatch_thread!
end
end
def resume_after_fork_in_parent
@mutex.synchronize do
logger.debug { "ZookeeperBase#resume_after_fork_in_parent" }
raise "EXPLODERATE! @czk was nil!" unless @czk
event_queue.open
setup_dispatch_thread!
@czk.resume_after_fork_in_parent
end
end
protected
# this is a hack: to provide consistency between the C and Java drivers when
# using a chrooted connection, we wrap the callback in a block that will
# strip the chroot path from the returned path (important in an async create
# sequential call). This is the only place where we can hook *just* the C
# version. The non-async manipulation is handled in ZookeeperBase#create.
#
# TODO: need to move the continuation setup into here, so that it can get
# added to the callback hash
#
def setup_completion(req_id, meth_name, call_opts)
if (meth_name == :create) and cb = call_opts[:callback]
call_opts[:callback] = lambda do |hash|
# in this case the string will be the absolute zookeeper path (i.e.
# with the chroot still prepended to the path). Here's where we strip it off
hash[:string] = strip_chroot_from(hash[:string])
# call the original callback
cb.call(hash)
end
end
# pass this along to the Zookeeper::Common implementation
super(req_id, meth_name, call_opts)
end
# if we're chrooted, this method will strip the chroot prefix from +path+
def strip_chroot_from(path)
return path unless (chrooted? and path and path.start_with?(chroot_path))
path[chroot_path.length..-1]
end
def get_default_global_watcher
Proc.new { |args|
logger.debug { "Ruby ZK Global CB called type=#{event_by_value(args[:type])} state=#{state_by_value(args[:state])}" }
true
}
end
def chrooted?
!chroot_path.empty?
end
def chroot_path
if @chroot_path.nil?
@chroot_path =
if idx = @host.index('/')
@host.slice(idx, @host.length)
else
''
end
end
@chroot_path
end
end
end