Permalink
Browse files

make assert_open more sane

Since all calls occur on the event thread (for the C module), it makes more
sense to do the check as part of the call itself inside the event thread,
rather than queue it up as a separate call.

explicit calls to 'state' will still get priority through the loop.
  • Loading branch information...
1 parent bb76bd5 commit b1e005c0949cb4a036a4b53944bf7ca7de797671 @slyphon slyphon committed May 21, 2012
Showing with 31 additions and 18 deletions.
  1. +1 −2 ext/zookeeper_base.rb
  2. +30 −16 lib/zookeeper/continuation.rb
@@ -117,8 +117,7 @@ def initialize(host, timeout = 10, watcher=nil)
# if either of these happen, the user will need to renegotiate a connection via reopen
def assert_open
@mutex.synchronize do
- raise Exceptions::SessionExpired if state == ZOO_EXPIRED_SESSION_STATE
- raise Exceptions::NotConnected unless connected?
+ raise Exceptions::NotConnected if closed?
if forked?
raise InheritedConnectionError, <<-EOS.gsub(/(?:^|\n)\s*/, ' ').strip
You tried to use a connection inherited from another process
@@ -82,6 +82,9 @@ def initialize(meth, *args)
@mutex = Mutex.new
@cond = ConditionVariable.new
@rval = nil
+
+ # make this error reporting more robust if necessary, right now, just set to state
+ @error = nil
# set to true when an event occurs that would cause the caller to
# otherwise block forever
@@ -90,18 +93,24 @@ def initialize(meth, *args)
# the caller calls this method and receives the response from the async loop
def value
- @mutex.lock
- begin
- @cond.wait(@mutex) until @rval
+ @mutex.synchronize do
+ @cond.wait(@mutex) until @rval or @error
+
+ case @error
+ when nil
+ # ok, nothing to see here, carry on
+ when ZOO_EXPIRED_SESSION_STATE
+ raise Exceptions::SessionExpired, "connection has expired"
+ else
+ raise Exceptions::NotConnected, "connection state is #{STATE_NAMES[@error]}"
+ end
case @rval.length
when 1
return @rval.first
else
return @rval
end
- ensure
- @mutex.unlock rescue nil
end
end
@@ -124,13 +133,24 @@ def user_callback?
# implementation, but it's more important to get *something* working and
# passing specs, then refactor to make everything sane
#
+ #
def submit(czk)
+ state = czk.zkrb_state # check the state of the connection
+
+ if @meth == :state # if the method is a state call
+ @rval = [state] # we're done, no error
+ return deliver!
+
+ elsif state != ZOO_CONNECTED_STATE # otherwise, we must be connected
+ @error = state # so set the error
+ return deliver! # and we're out
+ end
+
rc, *_ = czk.__send__(:"zkrb_#{@meth}", *async_args)
- # if this is an state call, async call, or we failed to submit it
- if (@meth == :state) or user_callback? or (rc != ZOK)
- @rval = [rc] # create the repsonse
- deliver! # wake the caller and we're out
+ if user_callback? or (rc != ZOK) # async call, or we failed to submit it
+ @rval = [rc] # create the repsonse
+ deliver! # wake the caller and we're out
end
end
@@ -152,9 +172,6 @@ def async_args
logger.debug { "async_args, meth: #{meth} ary: #{ary.inspect}, #{callback_arg_idx}" }
- # this is not already an async call
- # so we replace the req_id with the ZKRB_ASYNC_CONTN_ID so the
- # event thread knows to dispatch it itself
ary[callback_arg_idx] ||= self
ary
@@ -165,11 +182,8 @@ def callback_arg_idx
end
def deliver!
- @mutex.lock
- begin
+ @mutex.synchronize do
@cond.signal
- ensure
- @mutex.unlock rescue nil
end
end
end # Base

0 comments on commit b1e005c

Please sign in to comment.