Permalink
Browse files

much needed refactor of event and async result delivery

event and async blocks were previously mixed in with a bunch of other
common code, and shared a Monitor with other unrelated functionality.
This code has been gently refactored into its own self-contained class
(with its own task-specific lock), which should help with maintenance.

Also includes a cleanup of the assert_*_keys methods into one method
that checks both required and supported options.
  • Loading branch information...
1 parent 30c365c commit 74239256c3145039993bf2953e364394c50abbe1 @slyphon slyphon committed Aug 17, 2012
View
@@ -1,3 +1,10 @@
+v1.3.0 much needed refactor of event and async result delivery
+
+ * event and async blocks were previously mixed in with a bunch of other
+ common code, and shared a Monitor with other unrelated functionality. This
+ code has been gently refactored into its own self-contained class (with its
+ own task-specific lock), which should help with maintenance.
+
v1.2.14 merge add_auth pull request, reduce chances for ContinuationTimeoutError
* added support for the add_auth call (h/t: @bradhe) see pull req #25
View
@@ -20,18 +20,12 @@ class ZookeeperBase
# @private
class ClientShutdownException < StandardError; end
- # @private
- KILL_TOKEN = Object.new unless defined?(KILL_TOKEN)
-
- ZKRB_GLOBAL_CB_REQ = -1
-
# debug levels
ZOO_LOG_LEVEL_ERROR = 1
ZOO_LOG_LEVEL_WARN = 2
ZOO_LOG_LEVEL_INFO = 3
ZOO_LOG_LEVEL_DEBUG = 4
-
def_delegators :czk, :get_children, :exists, :delete, :get, :set,
:set_acl, :get_acl, :client_id, :sync, :add_auth, :wait_until_connected
@@ -66,11 +60,8 @@ def reopen_after_fork!
end
private :reopen_after_fork!
-
def reopen(timeout = 10, watcher=nil)
- if watcher and (watcher != @default_watcher)
- raise "You cannot set the watcher to a different value this way anymore!"
- end
+ raise "You cannot set the watcher to a different value this way anymore!" if watcher
reopen_after_fork! if forked?
@@ -79,8 +70,7 @@ def reopen(timeout = 10, watcher=nil)
@czk = CZookeeper.new(@host, @event_queue)
# flushes all outstanding watcher reqs.
- @watcher_reqs.clear
- set_default_global_watcher
+ @req_registry.clear_watchers!
@czk.wait_until_connected(timeout)
end
@@ -90,24 +80,21 @@ def reopen(timeout = 10, watcher=nil)
end
def initialize(host, timeout = 10, watcher=nil)
- @watcher_reqs = {}
- @completion_reqs = {}
-
- @current_req_id = 0
-
- @dispatcher = @czk = nil
-
- update_pid!
- reopen_after_fork!
-
# approximate the java behavior of raising java.lang.IllegalArgumentException if the host
# argument ends with '/'
raise ArgumentError, "Host argument #{host.inspect} may not end with /" if host.end_with?('/')
@host = host.dup
- @default_watcher = (watcher or get_default_global_watcher)
+ watcher ||= get_default_global_watcher
+
+ @req_registry = RequestRegistry.new(watcher, :chroot_path => chroot_path)
+ @dispatcher = @czk = nil
+
+ update_pid!
+ reopen_after_fork!
+
yield self if block_given?
reopen(timeout)
@@ -166,7 +153,7 @@ def close
def create(*args)
# since we don't care about the inputs, just glob args
rc, new_path = czk.create(*args)
- [rc, strip_chroot_from(new_path)]
+ [rc, @req_registry.strip_chroot_from(new_path)]
end
def set_debug_level(int)
@@ -176,12 +163,7 @@ def set_debug_level(int)
# set the watcher object/proc that will receive all global events (such as session/state events)
def set_default_global_watcher
- warn "DEPRECATION WARNING: #{self.class}#set_default_global_watcher ignores block" if block_given?
-
- @mutex.synchronize do
-# @default_watcher = block # save this here for reopen() to use
- @watcher_reqs[ZKRB_GLOBAL_CB_REQ] = { :watcher => @default_watcher, :watcher_context => nil }
- end
+ raise "NO! YOU CANNOT HAZ set_default_global_watcher"
end
def state
@@ -233,34 +215,9 @@ def resume_after_fork_in_parent
end
protected
- # this is a hack: to provide consistency between the C and Java drivers when
- # using a chrooted connection, we wrap the callback in a block that will
- # strip the chroot path from the returned path (important in an async create
- # sequential call). This is the only place where we can hook *just* the C
- # version. The non-async manipulation is handled in ZookeeperBase#create.
- #
- # TODO: need to move the continuation setup into here, so that it can get
- # added to the callback hash
- #
- def setup_completion(req_id, meth_name, call_opts)
- if (meth_name == :create) and cb = call_opts[:callback]
- call_opts[:callback] = lambda do |hash|
- # in this case the string will be the absolute zookeeper path (i.e.
- # with the chroot still prepended to the path). Here's where we strip it off
- hash[:string] = strip_chroot_from(hash[:string])
-
- # call the original callback
- cb.call(hash)
- end
- end
-
- # pass this along to the Zookeeper::Common implementation
- super(req_id, meth_name, call_opts)
- end
-
def czk
rval = @mutex.synchronize { @czk }
- raise Exceptions::NotConnected unless rval
+ raise Exceptions::NotConnected, "underlying connection was nil" unless rval
rval
end
View
@@ -186,9 +186,7 @@ def reopen(timeout=10, watcher=nil)
# watcher ||= @default_watcher
@mutex.synchronize do
- # flushes all outstanding watcher reqs.
- @watcher_reqs.clear
- set_default_global_watcher
+ @req_registry.clear_watchers!
replace_jzk!
wait_until_connected
@@ -205,19 +203,19 @@ def wait_until_connected(timeout=10)
def initialize(host, timeout=10, watcher=nil, options={})
@host = host
@event_queue = QueueWithPipe.new
- @current_req_id = 0
+
+ watcher ||= get_default_global_watcher()
+
+ @req_registry = RequestRegistry.new(watcher)
@mutex = Monitor.new
@dispatch_shutdown_cond = @mutex.new_cond
@connected_latch = Latch.new
- @watcher_reqs = {}
- @completion_reqs = {}
@_running = nil
@_closed = false
@options = {}
- @default_watcher = (watcher || get_default_global_watcher)
# allows connected-state handlers to be registered before
yield self if block_given?
@@ -415,19 +413,6 @@ def assert_open
raise NotConnected unless connected?
end
- # set the watcher object/proc that will receive all global events (such as session/state events)
- #---
- # XXX: this code needs to be duplicated from ext/zookeeper_base.rb because
- # it's called from the initializer, and because of the C impl. we can't have
- # the two decend from a common base, and a module wouldn't work
- #
- # XXX: this is probably a relic?
- def set_default_global_watcher
- @mutex.synchronize do
- @watcher_reqs[ZKRB_GLOBAL_CB_REQ] = { :watcher => @default_watcher, :watcher_context => nil }
- end
- end
-
def session_id
jzk.session_id
end
View
@@ -40,6 +40,7 @@ def self.require_root(*relpaths)
'zookeeper/exceptions',
'zookeeper/continuation',
'zookeeper/common',
+ 'zookeeper/request_registry',
'zookeeper/callbacks',
'zookeeper/stat',
'zookeeper/client_methods'
@@ -1,9 +1,14 @@
module Zookeeper
module ClientMethods
+ extend Forwardable
include Constants
include ACLs
include Logger
+ # @req_registry is set up in the platform-specific base classes
+ def_delegators :@req_registry, :setup_call
+ private :setup_call
+
def reopen(timeout=10, watcher=nil)
warn "WARN: ZookeeperBase#reopen watcher argument is now ignored" if watcher
super
@@ -15,8 +20,9 @@ def initialize(host, timeout=10, watcher=nil)
def add_auth(options = {})
assert_open
- assert_supported_keys(options, [:scheme, :cert])
- assert_required_keys(options, [:scheme, :cert])
+ assert_keys(options,
+ :supported => [:scheme, :cert],
+ :required => [:scheme, :cert])
req_id = setup_call(:add_auth, options)
rc = super(req_id, options[:scheme], options[:cert])
@@ -26,8 +32,9 @@ def add_auth(options = {})
def get(options = {})
assert_open
- assert_supported_keys(options, [:path, :watcher, :watcher_context, :callback, :callback_context])
- assert_required_keys(options, [:path])
+ assert_keys(options,
+ :supported => [:path, :watcher, :watcher_context, :callback, :callback_context],
+ :required => [:path])
req_id = setup_call(:get, options)
rc, value, stat = super(req_id, options[:path], options[:callback], options[:watcher])
@@ -38,8 +45,10 @@ def get(options = {})
def set(options = {})
assert_open
- assert_supported_keys(options, [:path, :data, :version, :callback, :callback_context])
- assert_required_keys(options, [:path])
+ assert_keys(options,
+ :supported => [:path, :data, :version, :callback, :callback_context],
+ :required => [:path])
+
assert_valid_data_size!(options[:data])
options[:version] ||= -1
@@ -52,8 +61,9 @@ def set(options = {})
def get_children(options = {})
assert_open
- assert_supported_keys(options, [:path, :callback, :callback_context, :watcher, :watcher_context])
- assert_required_keys(options, [:path])
+ assert_keys(options,
+ :supported => [:path, :callback, :callback_context, :watcher, :watcher_context],
+ :required => [:path])
req_id = setup_call(:get_children, options)
rc, children, stat = super(req_id, options[:path], options[:callback], options[:watcher])
@@ -64,8 +74,9 @@ def get_children(options = {})
def stat(options = {})
assert_open
- assert_supported_keys(options, [:path, :callback, :callback_context, :watcher, :watcher_context])
- assert_required_keys(options, [:path])
+ assert_keys(options,
+ :supported => [:path, :callback, :callback_context, :watcher, :watcher_context],
+ :required => [:path])
req_id = setup_call(:stat, options)
rc, stat = exists(req_id, options[:path], options[:callback], options[:watcher])
@@ -76,8 +87,10 @@ def stat(options = {})
def create(options = {})
assert_open
- assert_supported_keys(options, [:path, :data, :acl, :ephemeral, :sequence, :callback, :callback_context])
- assert_required_keys(options, [:path])
+ assert_keys(options,
+ :supported => [:path, :data, :acl, :ephemeral, :sequence, :callback, :callback_context],
+ :required => [:path])
+
assert_valid_data_size!(options[:data])
flags = 0
@@ -95,8 +108,10 @@ def create(options = {})
def delete(options = {})
assert_open
- assert_supported_keys(options, [:path, :version, :callback, :callback_context])
- assert_required_keys(options, [:path])
+ assert_keys(options,
+ :supported => [:path, :version, :callback, :callback_context],
+ :required => [:path])
+
options[:version] ||= -1
req_id = setup_call(:delete, options)
@@ -113,8 +128,9 @@ def delete(options = {})
#
def sync(options = {})
assert_open
- assert_supported_keys(options, [:path, :callback, :callback_context])
- assert_required_keys(options, [:path, :callback])
+ assert_keys(options,
+ :supported => [:path, :callback, :callback_context],
+ :required => [:path, :callback])
req_id = setup_call(:sync, options)
@@ -125,8 +141,9 @@ def sync(options = {})
def set_acl(options = {})
assert_open
- assert_supported_keys(options, [:path, :acl, :version, :callback, :callback_context])
- assert_required_keys(options, [:path, :acl])
+ assert_keys(options,
+ :supported => [:path, :acl, :version, :callback, :callback_context],
+ :required => [:path, :acl])
options[:version] ||= -1
req_id = setup_call(:set_acl, options)
@@ -137,8 +154,9 @@ def set_acl(options = {})
def get_acl(options = {})
assert_open
- assert_supported_keys(options, [:path, :callback, :callback_context])
- assert_required_keys(options, [:path])
+ assert_keys(options,
+ :supported => [:path, :callback, :callback_context],
+ :required => [:path])
req_id = setup_call(:get_acl, options)
rc, acls, stat = super(req_id, options[:path], options[:callback])
@@ -241,14 +259,19 @@ def assert_valid_data_size!(data)
end
private
- # TODO: Sanitize user mistakes by unregistering watchers from ops that
- # don't return ZOK (except wexists)? Make users clean up after themselves for now.
- #
- # XXX: is this dead code?
- def unregister_watcher(req_id)
- @mutex.synchronize {
- @watcher_reqs.delete(req_id)
- }
+ def assert_keys(args, opts={})
+ supported = opts[:supported] || []
+ required = opts[:required] || []
+
+ unless (args.keys - supported).empty?
+ raise Zookeeper::Exceptions::BadArguments,
+ "Supported arguments are: #{supported.inspect}, but arguments #{args.keys.inspect} were supplied instead"
+ end
+
+ unless (required - args.keys).empty?
+ raise Zookeeper::Exceptions::BadArguments,
+ "Required arguments are: #{required.inspect}, but only the arguments #{args.keys.inspect} were supplied."
+ end
end
# must be supplied by parent class impl.
Oops, something went wrong.

0 comments on commit 7423925

Please sign in to comment.