Skip to content

Commit

Permalink
Fix a fairly bad bug in event de-duplication
Browse files Browse the repository at this point in the history
This is fairly edge-case-y but could bite someone. If you'd set a watch
when doing a get that failed because the node didn't exist, any subsequent
attempts to set a watch would fail silently, because the client thought that the
watch had already been set.

We now wrap the operation in the setup_watcher! method, which rolls back the
record-keeping of what watches have already been set for what nodes if an
exception is raised.

This change has the side-effect that certain operations (get,stat,exists?,children)
will block event delivery until completion, because they need to have a consistent
idea about what events are pending, and which have been delivered. This also means
that calling these methods represent a synchronization point between user threads
(these operations can only occur serially, not simultaneously).
  • Loading branch information
slyphon committed Apr 26, 2012
1 parent 8fa292c commit 9ca2f90
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 35 deletions.
33 changes: 17 additions & 16 deletions lib/z_k/client/base.rb
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -326,9 +326,9 @@ def create(path, data='', opts={})
def get(path, opts={}) def get(path, opts={})
h = { :path => path }.merge(opts) h = { :path => path }.merge(opts)


setup_watcher!(:data, h) rv = setup_watcher!(:data, h) do

check_rc(cnx.get(h), h)
rv = check_rc(cnx.get(h), h) end


opts[:callback] ? rv : rv.values_at(:data, :stat) opts[:callback] ? rv : rv.values_at(:data, :stat)
end end
Expand Down Expand Up @@ -454,17 +454,17 @@ def stat(path, opts={})


h = { :path => path }.merge(opts) h = { :path => path }.merge(opts)


setup_watcher!(:data, h) setup_watcher!(:data, h) do
rv = cnx.stat(h)


rv = cnx.stat(h) return rv if opts[:callback]


return rv if opts[:callback] case rv[:rc]

when Zookeeper::ZOK, Zookeeper::ZNONODE
case rv[:rc] rv[:stat]
when Zookeeper::ZOK, Zookeeper::ZNONODE else
rv[:stat] check_rc(rv, h) # throws the appropriate error
else end
check_rc(rv, h) # throws the appropriate error
end end
end end


Expand Down Expand Up @@ -548,9 +548,10 @@ def children(path, opts={})


h = { :path => path }.merge(opts) h = { :path => path }.merge(opts)


setup_watcher!(:child, h) rv = setup_watcher!(:child, h) do
check_rc(cnx.get_children(h), h)
end


rv = check_rc(cnx.get_children(h), h)
opts[:callback] ? rv : rv[:children] opts[:callback] ? rv : rv[:children]
end end


Expand Down Expand Up @@ -798,8 +799,8 @@ def check_rc(hash, inputs=nil)
end end


# @private # @private
def setup_watcher!(watch_type, opts) def setup_watcher!(watch_type, opts, &b)
event_handler.setup_watcher!(watch_type, opts) event_handler.setup_watcher!(watch_type, opts, &b)
end end


# used in #inspect, doesn't raise an error if we're not connected # used in #inspect, doesn't raise an error if we're not connected
Expand Down
36 changes: 31 additions & 5 deletions lib/z_k/event_handler.rb
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -153,26 +153,52 @@ def get_default_watcher_block
end end
end end


# returns true if there's a pending watch of type for path
# @private
def restricting_new_watches_for?(watch_type, path)
synchronize do
if set = @outstanding_watches[watch_type]
return set.include?(path)
end
end

false
end

# implements not only setting up the watcher callback, but deduplicating # implements not only setting up the watcher callback, but deduplicating
# event delivery. Keeps track of in-flight watcher-type+path requests and # event delivery. Keeps track of in-flight watcher-type+path requests and
# doesn't re-register the watcher with the server until a response has been # doesn't re-register the watcher with the server until a response has been
# fired. This prevents one event delivery to *every* callback per :watch => true # fired. This prevents one event delivery to *every* callback per :watch => true
# argument. # argument.
# #
# due to somewhat poor design, we destructively modify opts before we yield
# and the client implictly knows this
#
# @private # @private
def setup_watcher!(watch_type, opts) def setup_watcher!(watch_type, opts)
return unless opts.delete(:watch) return yield unless opts.delete(:watch)


synchronize do synchronize do
set = @outstanding_watches.fetch(watch_type) set = @outstanding_watches.fetch(watch_type)
path = opts[:path] path = opts[:path]


if set.add?(path) if set.add?(path)
# this path has no outstanding watchers, let it do its thing # if we added the path to the set, blocking further registration of
opts[:watcher] = watcher_callback # watches and an exception is raised then we rollback
begin
# this path has no outstanding watchers, let it do its thing
opts[:watcher] = watcher_callback

yield opts
rescue Exception
set.delete(path)
raise
end
else else
# outstanding watch for path and data pair already exists, so ignore # we did not add the path to the set, which means we are not
# logger.debug { "outstanding watch request for path #{path.inspect} and watcher type #{watch_type.inspect}, not re-registering" } # responsible for removing a block on further adds if the operation
# fails, therefore, we just yield
yield opts
end end
end end
end end
Expand Down
61 changes: 47 additions & 14 deletions spec/watch_spec.rb
Original file line number Original file line Diff line number Diff line change
@@ -1,4 +1,4 @@
require File.join(File.dirname(__FILE__), %w[spec_helper]) require 'spec_helper'


describe ZK do describe ZK do
describe do describe do
Expand All @@ -8,15 +8,18 @@


@path = "/_testWatch" @path = "/_testWatch"
wait_until { @zk.connected? } wait_until { @zk.connected? }

# make sure we start w/ clean state
@zk.rm_rf(@path)
end end


after do after do
if @zk.connected?
@zk.close!
wait_until { !@zk.connected? }
end

mute_logger do mute_logger do
if @zk.connected?
@zk.close!
wait_until { !@zk.connected? }
end

ZK.open(@cnx_str) { |zk| zk.rm_rf(@path) } ZK.open(@cnx_str) { |zk| zk.rm_rf(@path) }
end end
end end
Expand All @@ -25,7 +28,7 @@
locker = Mutex.new locker = Mutex.new
callback_called = false callback_called = false


@zk.watcher.register(@path) do |event| @zk.register(@path) do |event|
locker.synchronize do locker.synchronize do
callback_called = true callback_called = true
end end
Expand All @@ -52,7 +55,7 @@ def wait_for_events_to_not_be_delivered(events)
it %[should only deliver an event once to each watcher registered for exists?] do it %[should only deliver an event once to each watcher registered for exists?] do
events = [] events = []


sub = @zk.watcher.register(@path) do |ev| sub = @zk.register(@path) do |ev|
logger.debug "got event #{ev}" logger.debug "got event #{ev}"
events << ev events << ev
end end
Expand All @@ -73,7 +76,7 @@ def wait_for_events_to_not_be_delivered(events)


@zk.create(@path, 'one', :mode => :ephemeral) @zk.create(@path, 'one', :mode => :ephemeral)


sub = @zk.watcher.register(@path) do |ev| sub = @zk.register(@path) do |ev|
logger.debug "got event #{ev}" logger.debug "got event #{ev}"
events << ev events << ev
end end
Expand All @@ -96,7 +99,7 @@ def wait_for_events_to_not_be_delivered(events)


@zk.create(@path, '') @zk.create(@path, '')


sub = @zk.watcher.register(@path) do |ev| sub = @zk.register(@path) do |ev|
logger.debug "got event #{ev}" logger.debug "got event #{ev}"
events << ev events << ev
end end
Expand All @@ -112,6 +115,40 @@ def wait_for_events_to_not_be_delivered(events)


events.length.should == 1 events.length.should == 1
end end

it %[should restrict_new_watches_for? if a successul watch has been set] do
@zk.stat(@path, watch: true)
@zk.event_handler.should be_restricting_new_watches_for(:data, @path)
end

it %[should not a block on new watches after an operation fails] do
# this is a situation where we did get('/blah', :watch => true) but
# got an exception, the next watch set should work

events = []

sub = @zk.register(@path) do |ev|
logger.debug { "got event #{ev}" }
events << ev
end

# get a path that doesn't exist with a watch

lambda { @zk.get(@path, watch: true) }.should raise_error(ZK::Exceptions::NoNode)

@zk.event_handler.should_not be_restricting_new_watches_for(:data, @path)

@zk.stat(@path, watch: true)

@zk.event_handler.should be_restricting_new_watches_for(:data, @path)

@zk.create(@path, '')

wait_while { events.empty? }

events.should_not be_empty

end
end end


describe 'state watcher' do describe 'state watcher' do
Expand Down Expand Up @@ -141,10 +178,6 @@ def wait_for_events_to_not_be_delivered(events)
m.should_receive(:state).and_return(ZookeeperConstants::ZOO_CONNECTED_STATE) m.should_receive(:state).and_return(ZookeeperConstants::ZOO_CONNECTED_STATE)
end end
end end

it %[should only fire the callback once] do
pending "not sure if this is the behavior we want"
end
end end
end end
end end
Expand Down

0 comments on commit 9ca2f90

Please sign in to comment.