-
Notifications
You must be signed in to change notification settings - Fork 75
/
zookeeper_base.rb
233 lines (188 loc) · 5.99 KB
/
zookeeper_base.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# The low-level wrapper-specific methods for the C lib
# subclassed by the top-level Zookeeper class
class ZookeeperBase < CZookeeper
include ZookeeperCommon
include ZookeeperCallbacks
include ZookeeperConstants
include ZookeeperExceptions
include ZookeeperACLs
include ZookeeperStat
ZKRB_GLOBAL_CB_REQ = -1
# debug levels
ZOO_LOG_LEVEL_ERROR = 1
ZOO_LOG_LEVEL_WARN = 2
ZOO_LOG_LEVEL_INFO = 3
ZOO_LOG_LEVEL_DEBUG = 4
def reopen(timeout = 10, watcher=nil)
watcher ||= @default_watcher
@req_mutex.synchronize do
# flushes all outstanding watcher reqs.
@watcher_req = {}
set_default_global_watcher(&watcher)
end
@start_stop_mutex.synchronize do
# $stderr.puts "%s: calling init, self.obj_id: %x" % [self.class, object_id]
init(@host)
# XXX: replace this with a callback
if timeout > 0
time_to_stop = Time.now + timeout
until state == Zookeeper::ZOO_CONNECTED_STATE
break if Time.now > time_to_stop
sleep 0.1
end
end
end
state
end
def initialize(host, timeout = 10, watcher=nil)
@watcher_reqs = {}
@completion_reqs = {}
@req_mutex = Monitor.new
@current_req_id = 1
# approximate the java behavior of raising java.lang.IllegalArgumentException if the host
# argument ends with '/'
raise ArgumentError, "Host argument #{host.inspect} may not end with /" if host.end_with?('/')
@host = host
@start_stop_mutex = Monitor.new
watcher ||= get_default_global_watcher
@_running = nil # used by the C layer
@_closed = false # also used by the C layer
yield self if block_given?
reopen(timeout, watcher)
return nil unless connected?
setup_dispatch_thread!
end
# if either of these happen, the user will need to renegotiate a connection via reopen
def assert_open
raise ZookeeperException::SessionExpired if state == ZOO_EXPIRED_SESSION_STATE
raise ZookeeperException::NotConnected unless connected?
end
def connected?
state == ZOO_CONNECTED_STATE
end
def connecting?
state == ZOO_CONNECTING_STATE
end
def associating?
state == ZOO_ASSOCIATING_STATE
end
def close
@start_stop_mutex.synchronize do
@_running = false if @_running
end
if @dispatcher
unless @_closed
wake_event_loop!
end
@dispatcher.join
end
@start_stop_mutex.synchronize do
unless @_closed
close_handle
end
end
# this is set up in the C init method, but it's easier to
# do the teardown here
begin
@selectable_io.close if @selectable_io
rescue IOError
end
end
# the C lib doesn't strip the chroot path off of returned path values, which
# is pretty damn annoying. this is used to clean things up.
def create(*args)
# since we don't care about the inputs, just glob args
rc, new_path = super(*args)
[rc, strip_chroot_from(new_path)]
end
def set_debug_level(int)
warn "DEPRECATION WARNING: #{self.class.name}#set_debug_level, it has moved to the class level and will be removed in a future release"
self.class.set_debug_level(int)
end
# set the watcher object/proc that will receive all global events (such as session/state events)
def set_default_global_watcher(&block)
@req_mutex.synchronize do
@default_watcher = block # save this here for reopen() to use
@watcher_reqs[ZKRB_GLOBAL_CB_REQ] = { :watcher => @default_watcher, :watcher_context => nil }
end
end
def closed?
@start_stop_mutex.synchronize { false|@_closed }
end
def running?
@start_stop_mutex.synchronize { false|@_running }
end
def state
return ZOO_CLOSED_STATE if closed?
super
end
def session_id
client_id.session_id
end
def session_passwd
client_id.passwd
end
protected
# this is a hack: to provide consistency between the C and Java drivers when
# using a chrooted connection, we wrap the callback in a block that will
# strip the chroot path from the returned path (important in an async create
# sequential call). This is the only place where we can hook *just* the C
# version. The non-async manipulation is handled in ZookeeperBase#create.
#
def setup_completion(req_id, meth_name, call_opts)
if (meth_name == :create) and cb = call_opts[:callback]
call_opts[:callback] = lambda do |hash|
# in this case the string will be the absolute zookeeper path (i.e.
# with the chroot still prepended to the path). Here's where we strip it off
hash[:string] = strip_chroot_from(hash[:string])
# call the original callback
cb.call(hash)
end
end
# pass this along to the ZookeeperCommon implementation
super(req_id, meth_name, call_opts)
end
# if we're chrooted, this method will strip the chroot prefix from +path+
def strip_chroot_from(path)
return path unless (chrooted? and path and path.start_with?(chroot_path))
path[chroot_path.length..-1]
end
def barf_unless_running!
@start_stop_mutex.synchronize do
raise ShuttingDownException unless (@_running and not @_closed)
yield
end
end
def setup_dispatch_thread!
@dispatcher = Thread.new do
while running?
begin # calling user code, so protect ourselves
dispatch_next_callback
rescue Exception => e
$stderr.puts "Error in dispatch thread, #{e.class}: #{e.message}\n" << e.backtrace.map{|n| "\t#{n}"}.join("\n")
end
end
end
end
# TODO: Make all global puts configurable
def get_default_global_watcher
Proc.new { |args|
logger.debug { "Ruby ZK Global CB called type=#{event_by_value(args[:type])} state=#{state_by_value(args[:state])}" }
true
}
end
def chrooted?
!chroot_path.empty?
end
def chroot_path
if @chroot_path.nil?
@chroot_path =
if idx = @host.index('/')
@host.slice(idx, @host.length)
else
''
end
end
@chroot_path
end
end