/
c_zookeeper.rb
398 lines (315 loc) · 10.4 KB
/
c_zookeeper.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
Zookeeper.require_lib(
'zookeeper/logger',
'zookeeper/common',
'zookeeper/constants',
'zookeeper/exceptions' # zookeeper_c depends on exceptions defined in here
)
Zookeeper.require_root 'ext/zookeeper_c'
# require File.expand_path('../zookeeper_c', __FILE__)
module Zookeeper
# NOTE: this class extending (opening) the class defined in zkrb.c
class CZookeeper
include Forked
include Constants
include Exceptions
include Logger
DEFAULT_RECEIVE_TIMEOUT_MSEC = 10000
class GotNilEventException < StandardError; end
attr_accessor :original_pid
# assume we're at debug level
def self.get_debug_level
@debug_level ||= ZOO_LOG_LEVEL_INFO
end
def self.set_debug_level(value)
@debug_level = value
set_zkrb_debug_level(value)
end
# wrap these calls in our sync->async special sauce
%w[get set exists create delete get_acl set_acl get_children add_auth].each do |sym|
class_eval(<<-EOS, __FILE__, __LINE__+1)
def #{sym}(*args)
submit_and_block(:#{sym}, *args)
end
EOS
end
def initialize(host, event_queue, opts={})
@host = host
@event_queue = event_queue
# keep track of the pid that created us
update_pid!
# used by the C layer. CZookeeper sets this to true when the init method
# has completed. once this is set to true, it stays true.
#
# you should grab the @mutex before messing with this flag
@_running = nil
# This is set to true after destroy_zkrb_instance has been called and all
# CZookeeper state has been cleaned up
@_closed = false # also used by the C layer
# set by the ruby side to indicate we are in shutdown mode used by method_get_next_event
@_shutting_down = false
# the actual C data is stashed in this ivar. never *ever* touch this
@_data = nil
@_receive_timeout_msec = opts[:receive_timeout_msec] || DEFAULT_RECEIVE_TIMEOUT_MSEC
@mutex = Monitor.new
# used to signal that we're running
@running_cond = @mutex.new_cond
# used to signal we've received the connected event
@state_mutex = Monitor.new
@state_cond = @state_mutex.new_cond
# the current state of the connection
@state = ZOO_CLOSED_STATE
@pipe_read, @pipe_write = IO.pipe
@event_thread = nil
# hash of in-flight Continuation instances
@reg = Continuation::Registry.new
log_level = ENV['ZKC_DEBUG'] ? ZOO_LOG_LEVEL_DEBUG : ZOO_LOG_LEVEL_ERROR
logger.info { "initiating connection to #{@host}" }
zkrb_init(@host, opts)#, :zkc_log_level => log_level)
start_event_thread
logger.debug { "init returned!" }
end
def closed?
@mutex.synchronize { !!@_closed }
end
def running?
@mutex.synchronize { !!@_running }
end
def shutting_down?
@mutex.synchronize { !!@_shutting_down }
end
def connected?
state == ZOO_CONNECTED_STATE
end
def connecting?
state == ZOO_CONNECTING_STATE
end
def associating?
state == ZOO_ASSOCIATING_STATE
end
def close
return if closed?
fn_close = proc do
if !@_closed and @_data
logger.debug { "CALLING CLOSE HANDLE!!" }
close_handle
end
end
if forked?
fn_close.call
else
stop_event_thread
@mutex.synchronize(&fn_close)
end
[@pipe_read, @pipe_write].each { |io| io.close unless io.closed? }
nil
end
# call this to stop the event loop, you can resume with the
# resume method
#
# requests may still be added during this time, but they will not be
# processed until you call resume
def pause_before_fork_in_parent
logger.debug { "##{__method__}" }
@mutex.synchronize { stop_event_thread }
end
# call this if 'pause' was previously called to start the event loop again
def resume_after_fork_in_parent
logger.debug { "##{__method__}" }
@mutex.synchronize do
@_shutting_down = nil
start_event_thread
end
end
def state
return ZOO_CLOSED_STATE if closed?
@state_mutex.synchronize { @state }
end
# this implementation is gross, but i don't really see another way of doing it
# without more grossness
#
# returns true if we're connected, false if we're not
#
# if timeout is nil, we never time out, and wait forever for CONNECTED state
#
def wait_until_connected(timeout=10)
time_to_stop = timeout ? Time.now + timeout : nil
return false unless wait_until_running(timeout)
@state_mutex.synchronize do
while true
if timeout
now = Time.now
break if (@state == ZOO_CONNECTED_STATE) || unhealthy? || (now > time_to_stop)
delay = time_to_stop.to_f - now.to_f
@state_cond.wait(delay)
else
break if (@state == ZOO_CONNECTED_STATE) || unhealthy?
@state_cond.wait
end
end
end
connected?
end
private
# This method is NOT SYNCHRONIZED!
#
# you must hold the @mutex lock while calling this method
def unhealthy?
@_closed || @_shutting_down || is_unrecoverable
end
# This method is NOT SYNCHRONIZED!
#
# you must hold the @mutex lock while calling this method
def healthy?
!unhealthy?
end
# submits a job for processing
# blocks the caller until result has returned
def submit_and_block(meth, *args)
@mutex.synchronize do
raise Exceptions::NotConnected if unhealthy?
end
cnt = Continuation.new(meth, *args)
@reg.synchronize do |r|
if meth == :state
r.state_check << cnt
else
r.pending << cnt
end
end
wake_event_loop!
cnt.value
end
# this method is part of the reopen/close code, and is responsible for
# shutting down the dispatch thread.
#
# this method must be EXTERNALLY SYNCHRONIZED!
#
# @event_thread will be nil when this method exits
#
def stop_event_thread
if @event_thread
logger.debug { "##{__method__}" }
shut_down!
wake_event_loop!
@event_thread.join
@event_thread = nil
end
end
# starts the event thread running if not already started
# returns false if already running
def start_event_thread
return false if @event_thread
@event_thread = Thread.new(&method(:event_thread_body))
end
# will wait until the client has entered the running? state
# or until timeout seconds have passed.
#
# returns true if we're running, false if we timed out
def wait_until_running(timeout=5)
@mutex.synchronize do
return true if @_running
@running_cond.wait(timeout)
!!@_running
end
end
def event_thread_body
Thread.current.abort_on_exception = true
logger.debug { "##{__method__} starting event thread" }
event_thread_await_running
# this is the main loop
while healthy?
if @reg.anything_to_do? && connected?
submit_pending_calls
end
zkrb_iterate_event_loop
iterate_event_delivery
end
# ok, if we're exiting the event loop, and we still have a valid connection
# and there's still completions we're waiting to hear about, then we
# should pump the handle before leaving this loop
if @_shutting_down and not (@_closed or is_unrecoverable)
logger.debug { "we're in shutting down state, there are #{@reg.in_flight.length} in_flight completions" }
until @reg.in_flight.empty? or @_closed or is_unrecoverable
zkrb_iterate_event_loop
iterate_event_delivery
logger.debug { "there are #{@reg.in_flight} in_flight completions left" }
end
logger.debug { "finished completions" }
end
# anything left over after all that gets the finger
remaining = @reg.next_batch + @reg.in_flight.values
logger.debug { "there are #{remaining.length} completions to awaken" }
@reg.in_flight.clear
while cb = remaining.shift
cb.shutdown!
end
rescue ShuttingDownException
logger.error { "event thread saw @_shutting_down, bailing without entering loop" }
ensure
logger.debug { "##{__method__} exiting" }
end
def submit_pending_calls
calls = @reg.next_batch()
return if calls.empty?
while cntn = calls.shift
cntn.submit(self) # this delivers state check results (and does other stuff)
if req_id = cntn.req_id # state checks will not have a req_id
@reg.in_flight[req_id] = cntn # in_flight is only ever touched by us
end
end
end
def wake_event_loop!
@pipe_write && @pipe_write.write('1')
end
def iterate_event_delivery
while hash = zkrb_get_next_event_st()
logger.debug { "##{__method__} got #{hash.inspect} " }
if (hash[:req_id] == ZKRB_GLOBAL_CB_REQ) && (hash[:type] == -1)
ev_state = hash[:state]
@state_mutex.synchronize do
if @state != ev_state
@state = ev_state
@state_cond.broadcast
end
end
end
cntn = @reg.in_flight.delete(hash[:req_id])
if cntn and not cntn.user_callback? # this is one of "our" continuations
cntn.call(hash) # so we handle delivering it
next # and skip handing it to the dispatcher
end
# otherwise, the event was a session event (ZKRB_GLOBAL_CB_REQ)
# or a user-provided callback
@event_queue.push(hash)
end
end
def event_thread_await_running
logger.debug { "event_thread waiting until running: #{@_running}" }
@mutex.synchronize do
@running_cond.wait_until { @_running or @_shutting_down }
logger.debug { "event_thread running: #{@_running}" }
raise ShuttingDownException if @_shutting_down
end
end
# use this method to set the @_shutting_down flag to true
def shut_down!
logger.debug { "##{__method__}" }
@mutex.synchronize do
@_shutting_down = true
# ollie ollie oxen all home free!
@running_cond.broadcast
end
@state_mutex.synchronize do
@state_cond.broadcast
end
end
# called by underlying C code to signal we're running
def zkc_set_running_and_notify!
logger.debug { "##{__method__}" }
@mutex.synchronize do
@_running = true
@running_cond.broadcast
end
end
end
end