Skip to content
ept edited this page Dec 11, 2012 · 10 revisions

Zookeeper provides a consistent, highly-available, fault-tolerant hierarchical data store with filesystemish and atomic semantics, which is all well and good, but the real power of Zookeeper is its facility for delivering asynchronous events. A Zookeeper connection is architected to have two threads: a synchronous request thread, and an asynchronous event thread.

There are two kinds of events, and they are both important for different reasons.

  • Watches which are used by the developer to be notified of changes to znodes
  • Session events which are changes in the connectivity of the client itself (connection, temporary and permanent disconnection)

This gives an application developer the ability to register for updates when a node is created, deleted, updated, or has children added under it. This, in Zookeeper parlance is called "a watch," and it is truly useful, because your application doesn't need to block waiting for an event to occur, it can be notified when the event happens. This has all kinds of useful applications for controlling a cluster.

ZK's Event Handler

So, what ZK has layered on top of the basic stream-of-events that the raw Zookeeper connection provides is a dispatch mechanism so that you can register to receive events related to a specific node (and not have to filter all of that yourself).

# docs/examples/events_01.rb

require 'thread'
require 'zk'

class Events
  def initialize
    @zk = ZK.new
    @queue = Queue.new
    @path = '/zk-example-events01'
  end

  def do_something_with(data)
    puts "I was told to say #{data.inspect}"
    @queue.push(:got_event)
  end

  def run
    @zk.register(@path) do |event|
      if event.node_changed? or event.node_created?
        # fetch the latest data
        data = @zk.get(@path).first
        do_something_with(data)
      end
    end

    @zk.delete(@path) rescue ZK::Exceptions::NoNode
    @zk.stat(@path, :watch => true)
    @zk.create(@path, 'Hello, events!')

    @queue.pop
  ensure
    @zk.close!
  end
end

Events.new.run

So in the run method, the first thing we do is register a block to be called back when events occur to @path. Currently, event handler blocks receive all watcher events related to a path, not just the specific ones they're interested in (this may be changed in a later release), so inside the block we first check the event to see if it's one that we're interested in. If the event is a notification that the node has changed or been created, we fetch the data from the node and then perform some action with it.

To set up the event to be fired, since we want to have a clean starting state, the first thing we do is ensure that the node does not exist before "setting the watch," so we delete the node, and rescue the possible exception. The next thing we do is perform a stat on the path, simply for the side-effect of setting the watch (which is accomplished using :watch => true). We then create the znode, which will cause the watch to be fired.

So what's with the @queue?

Events are delivered on an internal threadpool, so if we don't have some way of waiting for another thread to take action, we exit the run method, ruby sees the end of the main thread of execution, and we exit. Maybe we get the event before that, maybe not. So here we use a Queue as a simple coordination primitive between the two threads. @queue.pop will block forever until it has something to remove from the queue.

Some time after calling @zk.create, our event is delivered on a background thread, a friendly message is printed out, and something is pushed onto the queue. In the main thread, we pop the :got_event, and exit the method.

Watches only fire once

So this is a simple example, and there is something important to note about it: The watch will only fire once. If another update happens to the znode, we won't be notified, because we haven't told zookeeper to notify us again.

Ok, so what do we do?

# docs/examples/events_02.rb

require 'thread'
require 'zk'

class Events
  def initialize
    @zk = ZK.new
    @queue = Queue.new
    @path = '/zk-example-events01'
  end

  def do_something_with(data)
    puts "I was told to say #{data.inspect}"
  end

  def run
    # @sub here is an EventHandlerSubscription which is explained in the next section

    @sub = @zk.register(@path) do |event|
      if event.node_changed? or event.node_created?
        data = @zk.get(@path, watch: true).first    # fetch the latest data and re-set watch
        do_something_with(data)
        @queue.push(:got_event)
      end
    end

    @zk.delete(@path) rescue ZK::Exceptions::NoNode
    @zk.stat(@path, watch: true)
    @zk.create(@path, 'Hello, events!')

    @queue.pop

    @zk.set(@path, "ooh, an update!")

    @queue.pop
  ensure
    @zk.close!
  end
end

Events.new.run

So the difference here is inside the registered block. When we do the get we pass watch: true which will tell zookeeper to deliver notify us the next time the znode is changed, updated, or deleted. So the program runs much the same way as it did before, but now after we print out the friendly message and push to the queue, our main thread updates the znode and waits for confirmation. Some time later, ZK will deliver the change event to the callback, and we get our second friendly message. The callback pushes to the queue, the main thread wakes up, and we exit.

Subscriptions

Another feature of the event handler system worth mentioning is the subscription object. When you call @zk.register, you get back an EventHandlerSubscription which provides you a way to unregister the block when you no longer want it to receive updates. This is important from an application developer's perspective, because there are times when you have registered a watcher, but in taking some action, you discover that you don't need to watch anymore (because the znode you're interested in is already in the state you want it in). If you don't unregister your watcher, the next time something happens to that znode, your watcher will still fire. This can be the source of odd bugs.

There is also the more mundane reason to unsubscribe, which is that if you're doing a lot of registering for events on many different paths, you're storing those blocks in the registry, and those blocks will not be garbage collected until they're unregistered. This is not a huge issue for short-running programs, but for longer running ones it could be a source of a slow memory leak.

Concurrency

Ok, so you know how to listen for asynchronus events, you know how to manipulate the state on the server, you're ready to go!

Well, let's hold on a sec. There's something you should know about how to think about writing ZooKeeper apps, and more generally about how to leverage the atomic semantics ZooKeeper offers.

At any given moment, the only actor in the system who knows what the true state of the system is, is ZooKeeper. There is always going to be a delay between event delivery and you making your next synchronus request to the server, and in that moment of delay, someone can change things "behind your back." Just because your paranoid, doesn't mean they're not updating your znodes. What this means is, at any given point when you go to change state on the server, you must put aside your assumptions about what you think the state is, and be prepared to deal with reality. Usually this means taking decisive action and rescuing exceptions.

So, let's take an example from ZK's codebase (simplified slightly), that is used to implement locks. This method blocks the calling thread until the given path is deleted (for the curious, here's ZooKeeper's lock recipe)

# docs/examples/block_until_node_deleted_ex.rb

require 'zk'

class BlockUntilNodeDeleted
  attr_reader :zk

  def initialize
    @zk = ZK.new
    @path = @zk.create('/zk-examples', sequence: true, ephemeral: true)
  end

  def block_until_node_deleted(abs_node_path)
    queue = Queue.new

    ev_sub = zk.register(abs_node_path) do |event|
      if event.node_deleted?
        queue.enq(:deleted) 
      else
        if zk.exists?(abs_node_path, :watch => true)
          # node still exists, wait for next event (better luck next time)
        else
          # ooh! surprise! it's gone!
          queue.enq(:deleted) 
        end
      end
    end
  
    # set up the callback, but bail if we don't need to wait
    return true unless zk.exists?(abs_node_path, :watch => true)  

    queue.pop # block waiting for node deletion
    true
  ensure
    ev_sub.unsubscribe
  end

  def run
    waiter = Thread.new do
      $stderr.puts "waiter thread, about to block"
      block_until_node_deleted(@path)
      $stderr.puts "waiter unblocked"
    end

    # This is not a good way to wait on another thread in general (it's
    # busy-waiting) but simple for this example.
    #
    Thread.pass until waiter.status == 'sleep'

    # we now know the other thread is waiting for deletion
    # so give 'em a thrill
    @zk.delete(@path)

    waiter.join

    $stderr.puts "hooray! success!"
  ensure
    @zk.close!
  end
end

BlockUntilNodeDeleted.new.run

This code follows a pattern you're likely to need when dealing with unpredictable state on the server. So, in the block_until_node_deleted method, first we createa Queue object that will be used to commmunicate between the event delivery thread and our main thread. Next is where the action is. We register a callback to handle events related to the node we're watching. When we're called back, if the event is that the node was deleted, we're done.

Next is the interesting case, while re-registering the watch (by calling exists?, which is syntactic sugar around stat), we may discover that the node has been deleted, and we have to notify the other thread to wake up. Otherwise, as a side effect of setting watch: true, we'll be notified of the next event. It is in this way that we're able to not miss the deletion of the znode.

So after registering the callback, we have some of the same logic. We check to see if the path exists. In the case where the znode does not exist, we're done, and we return to the caller. In the case where the znode does exist we've set a watch for events, and we then hit the queue.pop which will cause our thread to go to sleep until the event handler block wakes us up.

Now the real beauty here is that we're able to peform this feat (waiting for a remote system to take an action), without polling, and avoiding race conditions. We don't need to be wasting cycles asking every N seconds "are we there yet? are we there yet?" and we do it in a way where, if we're careful, we can know for certain what the state of the system is.

At every step along the way, we're reacting to what ZooKeeper is telling us, because when we call any of the ZooKeeper API methods on zk, that is a point of synchronization between us and ZooKeeper. Things may change all around us, but when we ask, ZooKeeper will always tell us the truth.