-
Notifications
You must be signed in to change notification settings - Fork 8
/
synchrony_client.rb
135 lines (113 loc) · 3.81 KB
/
synchrony_client.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
module ZK
module ZKEventMachine
class SynchronyEventHandlerWrapper
include ZK::Logging
def initialize(event_handler)
@event_handler = event_handler
end
# registers a block to be called back within a fiber context
def register(path, &block)
new_block = proc do |*a|
Fiber.new { block.call(*a) }.resume
end
@event_handler.register(path, &new_block)
end
private
def method_missing(meth, *a, &b)
@event_handler.__send__(meth, *a, &b)
end
end
# This class is an EM::Synchrony wrapper around a ZK::ZKEventMachine::Client
#
# It should behave exactly like a ZK::Client instance (when called in a
# synchronous fashion), and one should look there for documentation about
# the various methods
#
# @note this class is implemented as a wrapper instead of a subclass of Client
# so that it can support the unixisms like rm_rf and mkdir_p. The
# synchrony pattern of aliasing the base class methods and specializing for
# synchrony didn't work in this case.
#
class SynchronyClient
include ZK::Logging
attr_reader :event_handler, :client
# @overload new(client_instance)
# Wrap an existing ZK::ZKEventMachine::Client instance in an
# EM::Synchrony compatible way
# @param [ZK::ZKEventMachine::Client] client_instance an instance of Client to wrap
# @overload new(host, opts={})
# Creates a new ZK::ZKEventMachine::Client instance to manage
# takes the same arguments as ZK::Client::Base
def initialize(host, opts={})
case host
when Client
@client = host
when String
@client = Client.new(host, opts)
else
raise ArgumentError, "argument must be either a ZK::ZKEventMachine::Client instance or a hostname:port string"
end
@client.synchrony_client = self
@event_handler = SynchronyEventHandlerWrapper.new(@client.event_handler)
end
%w[connect get set create stat delete children get_acl set_acl mkdir_p rm_rf].each do |meth|
class_eval(<<-EOMETH, __FILE__, __LINE__ + 1)
def #{meth}(*args,&blk)
sync!(@client.#{meth}(*args, &blk))
end
EOMETH
end
# it is *crucially* important that close and close! be wrapped in a fiber.
# it's possible under very odd corner cases with the 1.9.3 GC to cause a
# '[BUG] cfp consistency error - send'
#
%w[close close!].each do |meth|
class_eval(<<-EOMETH, __FILE__, __LINE__ + 1)
def #{meth}(*args,&blk)
Fiber.new { sync!(@client.#{meth}(*args, &blk)) }.resume
end
EOMETH
end
# @deprecated for backwards compatibility only
def watcher
event_handler
end
def exists?(path, opts={})
stat(path, opts={}).exists?
end
# returns self
def to_synchrony
self
end
# returns the wrapped async client
def to_async
@client
end
protected
# a modification of EM::Synchrony.sync to handle multiple callback arguments properly
def sync(df)
f = Fiber.current
xback = proc do |*args|
if f == Fiber.current
return *args
else
f.resume(*args)
end
end
df.callback(&xback)
df.errback(&xback)
Fiber.yield
end
# like sync, but if the deferred returns an exception instance, re-raises
def sync!(deferred)
rval = sync(deferred)
raise rval if rval.kind_of?(Exception)
rval
end
private
def method_missing(meth, *a, &b)
@client.__send__(meth, *a, &b)
end
end
end
end