Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

NodeDeletionWatcher#wait_until_blocked now takes a :timeout

This is the basis for the Locker's blocking behavior. Now that this has
been implemented, we can add support in the various Lockers for passing
this value through.
  • Loading branch information...
commit 4a627a38bc416c3b244b95e0481856acb6f5dc29 1 parent 2af50c5
@slyphon slyphon authored
View
4 lib/zk/exceptions.rb
@@ -141,6 +141,7 @@ class LockAssertionFailedError < ZKError; end
# called when the client is reopened, resumed, or paused when in an invalid state
class InvalidStateError < ZKError; end
+ # Raised when a NodeDeletionWatcher is interrupted by another thread
class WakeUpException < ZKError; end
# raised when a chrooted conection is requested but the root doesn't exist
@@ -155,6 +156,9 @@ def initialize(erroneous_string)
super("Chroot strings must start with a '/' you provided: #{erroneous_string.inspect}")
end
end
+
+ # raised when we are blocked waiting on a lock and the timeout expires
+ class LockWaitTimeoutError < ZKError; end
end
end
View
50 lib/zk/node_deletion_watcher.rb
@@ -10,6 +10,7 @@ module Constants
BLOCKED = :yes
NOT_ANYMORE = :not_anymore
INTERRUPTED = :interrupted
+ TIMED_OUT = :timed_out
end
include Constants
@@ -36,6 +37,10 @@ def blocked?
@mutex.synchronize { @blocked == BLOCKED }
end
+ def timed_out?
+ @mutex.synchronize { @result == TIMED_OUT }
+ end
+
# this is for testing, allows us to wait until this object has gone into
# blocking state.
#
@@ -66,7 +71,7 @@ def wait_until_blocked(timeout=nil)
end
end
- # cause a thread blocked us to be awakened and have a WakeUpException
+ # cause a thread blocked by us to be awakened and have a WakeUpException
# raised.
#
# if a result has already been delivered, then this does nothing
@@ -86,7 +91,13 @@ def interrupt!
end
end
- def block_until_deleted
+ # @option opts [Numeric] :timeout (nil) if a positive integer, represents a duration in
+ # seconds after which, if we have not acquired the lock, a LockWaitTimeoutError will
+ # be raised in all waiting threads
+ #
+ def block_until_deleted(opts={})
+ timeout = opts[:timeout]
+
@mutex.synchronize do
raise InvalidStateError, "Already fired for #{path}" if @result
register_callbacks
@@ -103,13 +114,19 @@ def block_until_deleted
@blocked = BLOCKED
@cond.broadcast # wake threads waiting for @blocked to change
- @cond.wait_until { @result } # wait until we get a result
+
+ wait_for_result(timeout)
+
@blocked = NOT_ANYMORE
+ logger.debug { "got result for path: #{path}, result: #{@result.inspect}" }
+
case @result
when :deleted
logger.debug { "path #{path} was deleted" }
return true
+ when TIMED_OUT
+ raise ZK::Exceptions::LockWaitTimeoutError
when INTERRUPTED
raise ZK::Exceptions::WakeUpException
when ZOO_EXPIRED_SESSION_STATE
@@ -127,6 +144,29 @@ def block_until_deleted
end
private
+ # this method must be synchronized on @mutex, obviously
+ def wait_for_result(timeout)
+ # do the deadline maths
+ time_to_stop = timeout ? (Time.now + timeout) : nil # slight time slippage between here
+
+ until @result
+ if timeout # and here
+ now = Time.now
+
+ if (now > time_to_stop)
+ @result ||= TIMED_OUT # don't overwrite the @result
+ return
+ elsif @result
+ return
+ end
+
+ @cond.wait(time_to_stop.to_f - now.to_f)
+ else
+ @cond.wait_until { @result }
+ end
+ end
+ end
+
def unregister_callbacks
@subs.each(&:unregister)
end
@@ -142,11 +182,11 @@ def register_callbacks
def node_deletion_cb(event)
@mutex.synchronize do
if event.node_deleted?
- @result = :deleted
+ @result ||= :deleted
@cond.broadcast
else
unless zk.exists?(path, :watch => true)
- @result = :deleted
+ @result ||= :deleted
@cond.broadcast
end
end
View
26 spec/zk/node_deletion_watcher_spec.rb
@@ -7,6 +7,7 @@
@path = "#{@base_path}/node_deleteion_watcher_victim"
@n = ZK::NodeDeletionWatcher.new(@zk, @path)
+ @exc = nil
end
describe %[when the node already exists] do
@@ -31,8 +32,7 @@
it %[should wake up if interrupt! is called] do
@zk.mkdir_p(@path)
- @exc = nil
-
+ # see _eric!! i had to do this because of 1.8.7!
th = Thread.new do
begin
@n.block_until_deleted
@@ -50,6 +50,28 @@
@exc.should be_kind_of(ZK::Exceptions::WakeUpException)
end
+
+ it %[should raise LockWaitTimeoutError if we time out waiting for a node to be deleted] do
+ @zk.mkdir_p(@path)
+
+ th = Thread.new do
+ begin
+ @n.block_until_deleted(:timeout => 0.02)
+ rescue Exception => e
+ @exc = e
+ end
+ end
+
+ @n.wait_until_blocked(5).should be_true
+
+ logger.debug { "wait_until_blocked returned" }
+
+ th.join(5).should == th
+
+ @exc.should be_kind_of(ZK::Exceptions::LockWaitTimeoutError)
+ @n.should be_done
+ @n.should be_timed_out
+ end
end
describe %[when the node doesn't exist] do
Please sign in to comment.
Something went wrong with that request. Please try again.