diff --git a/Rakefile b/Rakefile index 14cfe0b..b2b88b4 100644 --- a/Rakefile +++ b/Rakefile @@ -1,2 +1,13 @@ require 'bundler' Bundler::GemHelper.install_tasks + +require 'rubygems' +gem 'rdoc', '~> 2.5' +require 'rdoc/task' + +RDoc::Task.new do |rd| + rd.title = 'ZK Documentation' + rd.rdoc_files.include("lib/**/*.rb") +end + + diff --git a/lib/z_k.rb b/lib/z_k.rb index c0d8291..e80cd5d 100644 --- a/lib/z_k.rb +++ b/lib/z_k.rb @@ -1,3 +1,6 @@ +require 'rubygems' +require 'bundler/setup' + require 'logger' require 'zookeeper' require 'forwardable' @@ -19,14 +22,21 @@ module ZK ZK_ROOT = File.expand_path('../..', __FILE__) + # The logger used by the ZK library. uses a Logger to +/dev/null+ by default + # def self.logger @logger ||= Logger.new('/dev/null') end + # Assign the Logger instance to be used by ZK def self.logger=(logger) @logger = logger end + # Create a new ZK::Client instance. If no arguments are given, the default + # config of 'localhost:2181' will be used. Otherwise all args will be passed + # to ZK::Client#new + # def self.new(*args) # XXX: might need to do some param parsing here @@ -37,6 +47,8 @@ def self.new(*args) Client.new(*args) end + # Like new, yields a connection to the given block and closes it when the + # block returns def self.open(*args) cnx = new(*args) yield cnx diff --git a/lib/z_k/client.rb b/lib/z_k/client.rb index 2c4da24..0d346bc 100644 --- a/lib/z_k/client.rb +++ b/lib/z_k/client.rb @@ -1,5 +1,6 @@ module ZK - # a more ruby-friendly wrapper around the low-level drivers + # A ruby-friendly wrapper around the low-level zookeeper drivers. This is the + # class that you will likely interact with the most. # class Client extend Forwardable @@ -13,6 +14,7 @@ class Client # for backwards compatibility alias :watcher :event_handler #:nodoc: + #:stopdoc: STATE_SYM_MAP = { Zookeeper::ZOO_CLOSED_STATE => :closed, Zookeeper::ZOO_EXPIRED_SESSION_STATE => :expired_session, @@ -21,12 +23,24 @@ class Client Zookeeper::ZOO_CONNECTED_STATE => :connected, Zookeeper::ZOO_ASSOCIATING_STATE => :associating, }.freeze + #:startdoc: + # Create a new client and connect to the zookeeper server. + # + # +host+ should be a string of comma-separated host:port pairs. You can + # also supply an optional "chroot" suffix that will act as an implicit + # prefix to all paths supplied. + # + # example: + # + # ZK::Client.new("zk01:2181,zk02:2181/chroot/path") + # def initialize(host, opts={}) @event_handler = EventHandler.new(self) @cnx = ::Zookeeper.new(host, DEFAULT_TIMEOUT, get_default_watcher_block) end + # returns true if the connection has been closed def closed? defined?(::JRUBY_VERSION) ? jruby_closed? : mri_closed? end @@ -45,6 +59,15 @@ def mri_closed? end public + + # returns the current state of the connection as reported by the underlying driver + # as a symbol. The possible values are [:closed, :expired_session, :auth_failed + # :connecting, :connected, :associating]. + # + # See the Zookeeper session + # {documentation}[http://hadoop.apache.org/zookeeper/docs/current/zookeeperProgrammers.html#ch_zkSessions] + # for more information + # def state if defined?(::JRUBY_VERSION) @cnx.state.to_string.downcase.to_sym @@ -60,18 +83,22 @@ def reopen(timeout=10, watcher=nil) state end + # Returns true if the underlying connection is in the +connected+ state. def connected? wrap_state_closed_error { @cnx.connected? } end + # Returns true if the underlying connection is in the +associating+ state. def associating? wrap_state_closed_error { @cnx.associating? } end + # Returns true if the underlying connection is in the +connecting+ state. def connecting? wrap_state_closed_error { @cnx.connecting? } end + # Returns true if the underlying connection is in the +expired_session+ state. def expired_session? if defined?(::JRUBY_VERSION) @cnx.state == Java::OrgApacheZookeeper::ZooKeeper::States::EXPIRED_SESSION @@ -80,6 +107,117 @@ def expired_session? end end + + # Create a node with the given path. The node data will be the given data, + # and node acl will be the given acl. The path is returned. + # + # The ephemeral argument specifies whether the created node will be + # ephemeral or not. + # + # An ephemeral node will be removed by the server automatically when the + # session associated with the creation of the node expires. + # + # The sequence argument can also specify to create a sequential node. The + # actual path name of a sequential node will be the given path plus a + # suffix "_i" where i is the current sequential number of the node. Once + # such a node is created, the sequential number will be incremented by one. + # + # If a node with the same actual path already exists in the ZooKeeper, a + # KeeperException with error code KeeperException::NodeExists will be + # thrown. Note that since a different actual path is used for each + # invocation of creating sequential node with the same path argument, the + # call will never throw a NodeExists KeeperException. + # + # If the parent node does not exist in the ZooKeeper, a KeeperException + # with error code KeeperException::NoNode will be thrown. + # + # An ephemeral node cannot have children. If the parent node of the given + # path is ephemeral, a KeeperException with error code + # KeeperException::NoChildrenForEphemerals will be thrown. + # + # This operation, if successful, will trigger all the watches left on the + # node of the given path by exists and get API calls, and the watches left + # on the parent node by children API calls. + # + # If a node is created successfully, the ZooKeeper server will trigger the + # watches on the path left by exists calls, and the watches on the parent + # of the node by children calls. + # + # Called with a hash of arguments set. Supports being executed + # asynchronousy by passing a callback object. + # + # ==== Arguments + # * path -- path of the node + # * data -- initial data for the node, defaults to an empty string + # * :acl -- defaults to ACL::OPEN_ACL_UNSAFE, otherwise the ACL for the node + # * :ephemeral -- defaults to false, if set to true the created node will be ephemeral + # * :sequence -- defaults to false, if set to true the created node will be sequential + # * :callback -- provide a AsyncCallback::StringCallback object or + # Proc for an asynchronous call to occur + # * :context -- context object passed into callback method + # * :mode -- may be specified instead of :ephemeral and :sequence, + # accepted values are [:ephemeral_sequential, :persistent_sequential, + # :persistent, :ephemeral] + # + # ==== Examples + # + # ===== create node, no data, persistent + # + # zk.create("/path") + # # => "/path" + # + # ===== create node, ACL will default to ACL::OPEN_ACL_UNSAFE + # + # zk.create("/path", "foo") + # # => "/path" + # + # ===== create ephemeral node + # zk.create("/path", :mode => :ephemeral) + # # => "/path" + # + # ===== create sequential node + # zk.create("/path", :mode => :persistent_sequence) + # # => "/path0" + # + # ===== create ephemeral and sequential node + # zk.create("/path", "foo", :mode => :ephemeral_sequence) + # # => "/path0" + # + # ===== create a child path + # zk.create("/path/child", "bar") + # # => "/path/child" + # + # ===== create a sequential child path + # zk.create("/path/child", "bar", :mode => :ephemeral_sequence) + # # => "/path/child0" + # + #-- + # TODO: document asynchronous callback + # + # ===== create asynchronously with callback object + # + # class StringCallback + # def process_result(return_code, path, context, name) + # # do processing here + # end + # end + # + # callback = StringCallback.new + # context = Object.new + # + # zk.create("/path", "foo", :callback => callback, :context => context) + # + # ===== create asynchronously with callback proc + # + # callback = proc do |return_code, path, context, name| + # # do processing here + # end + # + # context = Object.new + # + # zk.create("/path", "foo", :callback => callback, :context => context) + # + #++ def create(path, data='', opts={}) h = { :path => path, :data => data, :ephemeral => false, :sequence => false }.merge(opts) @@ -106,7 +244,50 @@ def create(path, data='', opts={}) h[:callback] ? rv : rv[:path] end - # TODO: improve callback handling + # Return the data and stat of the node of the given path. + # + # If the watch is true and the call is successfull (no exception is + # thrown), a watch will be left on the node with the given path. The watch + # will be triggered by a successful operation that sets data on the node, + # or deletes the node. See +watcher+ for documentation on how to register + # blocks to be called when a watch event is fired. + # + # A KeeperException with error code KeeperException::NoNode will be thrown + # if no node with the given path exists. + # + # Supports being executed asynchronousy by passing a callback object. + # + # ==== Arguments + # * path -- path of the node + # * :watch -- defaults to false, set to true if you need to watch this node + # * :callback -- provide a AsyncCallback::DataCallback object or + # Proc for an asynchronous call to occur + # * :context -- context object passed into callback method + # + # ==== Examples + # ===== get data for path + # zk.get("/path") + # + # ===== get data and set watch on node + # zk.get("/path", :watch => true) + # + #-- + # ===== get data asynchronously + # + # class DataCallback + # def process_result(return_code, path, context, data, stat) + # # do processing here + # end + # end + # + # zk.get("/path") do |return_code, path, context, data, stat| + # # do processing here + # end + # + # callback = DataCallback.new + # context = Object.new + # zk.get("/path", :callback => callback, :context => context) + #++ def get(path, opts={}) h = { :path => path }.merge(opts) @@ -116,7 +297,48 @@ def get(path, opts={}) opts[:callback] ? rv : rv.values_at(:data, :stat) end - + + # Set the data for the node of the given path if such a node exists and the + # given version matches the version of the node (if the given version is + # -1, it matches any node's versions). Return the stat of the node. + # + # This operation, if successful, will trigger all the watches on the node + # of the given path left by get_data calls. + # + # A KeeperException with error code KeeperException::NoNode will be thrown + # if no node with the given path exists. A KeeperException with error code + # KeeperException::BadVersion will be thrown if the given version does not + # match the node's version. + # + # Called with a hash of arguments set. Supports being executed + # asynchronousy by passing a callback object. + # + # ==== Arguments + # * :path -- path of the node + # * :data -- data to set + # * :version -- defaults to -1, otherwise set to the expected matching version + # * :callback -- provide a AsyncCallback::StatCallback object or + # Proc for an asynchronous call to occur + # * :context -- context object passed into callback method + # + # ==== Examples + # zk.set("/path", "foo") + # zk.set("/path", "foo", :version => 0) + # + #-- + # ===== set data asynchronously + # + # class StatCallback + # def process_result(return_code, path, context, stat) + # # do processing here + # end + # end + # + # callback = StatCallback.new + # context = Object.new + # + # zk.set("/path", "foo", :callback => callback, :context => context) + #++ def set(path, data, opts={}) h = { :path => path, :data => data }.merge(opts) @@ -125,6 +347,52 @@ def set(path, data, opts={}) opts[:callback] ? nil : rv[:stat] end + # Return the stat of the node of the given path. Return nil if the node + # doesn't exist. + # + # If the watch is true and the call is successful (no exception is thrown), + # a watch will be left on the node with the given path. The watch will be + # triggered by a successful operation that creates/delete the node or sets + # the data on the node. + # + # Can be called with just the path, otherwise a hash with the arguments + # set. Supports being executed asynchronousy by passing a callback object. + # + # ==== Arguments + # * path -- path of the node + # * :watch -- defaults to false, set to true if you need to watch + # this node + # * :callback -- provide a AsyncCallback::StatCallback object or + # Proc for an asynchronous call to occur + # * :context -- context object passed into callback method + # + # ==== Examples + # ===== exists for path + # zk.stat("/path") + # # => ZK::Stat + # + # ===== exists for path with watch set + # zk.stat("/path", :watch => true) + # # => ZK::Stat + # + # ===== exists for non existent path + # zk.stat("/non_existent_path") + # # => nil + # + #-- + # ===== exist node asynchronously + # + # class StatCallback + # def process_result(return_code, path, context, stat) + # # do processing here + # end + # end + # + # callback = StatCallback.new + # context = Object.new + # + # zk.exists?("/path", :callback => callback, :context => context) + #++ def stat(path, opts={}) h = { :path => path }.merge(opts) @@ -142,13 +410,15 @@ def stat(path, opts={}) end end - # exists? is just sugar around stat, instead of - # - # zk.stat('/path').exists? + # sugar around stat # - # you can do + # ===== instead of + # zk.stat('/path').exists? + # # => true # + # ===== you can do # zk.exists?('/path') + # # => true # # this only works for the synchronous version of stat. for async version, # this method will act *exactly* like stat @@ -158,18 +428,112 @@ def exists?(path, opts={}) opts[:callback] ? rv : rv.exists? end + # closes the underlying connection and deregisters all callbacks def close! @event_handler.clear! wrap_state_closed_error { @cnx.close } end - # TODO: improve callback handling + # Delete the node with the given path. The call will succeed if such a node + # exists, and the given version matches the node's version (if the given + # version is -1, it matches any node's versions). + # + # A KeeperException with error code KeeperException::NoNode will be thrown + # if the nodes does not exist. + # + # A KeeperException with error code KeeperException::BadVersion will be + # thrown if the given version does not match the node's version. + # + # A KeeperException with error code KeeperException::NotEmpty will be + # thrown if the node has children. + # + # This operation, if successful, will trigger all the watches on the node + # of the given path left by exists API calls, and the watches on the parent + # node left by children API calls. + # + # Can be called with just the path, otherwise a hash with the arguments + # set. Supports being executed asynchronousy by passing a callback object. + # + # ==== Arguments + # * path -- path of the node to be deleted + # * :version -- defaults to -1 (deletes any version), otherwise + # set to the expected matching version + # * :callback -- provide a AsyncCallback::VoidCallback object or + # Proc for an asynchronous call to occur + # * :context -- context object passed into callback method + # + # ==== Examples + # zk.delete("/path") + # zk.delete("/path", :version => 0) + # + #-- + # ===== delete node asynchronously + # + # class VoidCallback + # def process_result(return_code, path, context) + # # do processing here + # end + # end + # + # callback = VoidCallback.new + # context = Object.new + # + # zk.delete(/path", :callback => callback, :context => context) + #++ def delete(path, opts={}) h = { :path => path, :version => -1 }.merge(opts) rv = check_rc(@cnx.delete(h)) nil end + # Return the list of the children of the node of the given path. + # + # If the watch is true and the call is successful (no exception is thrown), + # a watch will be left on the node with the given path. The watch will be + # triggered by a successful operation that deletes the node of the given + # path or creates/delete a child under the node. See +watcher+ for + # documentation on how to register blocks to be called when a watch event + # is fired. + # + # A KeeperException with error code KeeperException::NoNode will be thrown + # if no node with the given path exists. + # + # Can be called with just the path, otherwise a hash with the arguments + # set. Supports being executed asynchronousy by passing a callback object. + # + # ==== Arguments + # * path -- path of the node + # * :watch -- defaults to false, set to true if you need to watch + # this node + # * :callback -- provide a AsyncCallback::ChildrenCallback object + # or Proc for an asynchronous call to occur + # * :context -- context object passed into callback method + # + # ==== Examples + # ===== get children for path + # zk.create("/path", :data => "foo") + # zk.create("/path/child", :data => "child1", :sequence => true) + # zk.create("/path/child", :data => "child2", :sequence => true) + # zk.children("/path") + # # => ["child0", "child1"] + # + # ====== get children and set watch + # zk.children("/path", :watch => true) + # # => ["child0", "child1"] + # + #-- + # ===== get children asynchronously + # + # class ChildrenCallback + # def process_result(return_code, path, context, children) + # # do processing here + # end + # end + # + # callback = ChildrenCallback.new + # context = Object.new + # zk.children("/path", :callback => callback, :context => context) + #++ def children(path, opts={}) h = { :path => path }.merge(opts) @@ -179,18 +543,82 @@ def children(path, opts={}) opts[:callback] ? nil : rv[:children] end + # Return the ACL and stat of the node of the given path. + # + # A KeeperException with error code KeeperException::Code::NoNode will be + # thrown if no node with the given path exists. + # + # Can be called with just the path, otherwise a hash with the arguments + # set. Supports being executed asynchronousy by passing a callback object. + # + # ==== Arguments + # * path -- path of the node + # * :stat -- defaults to nil, provide a Stat object that will be + # set with the Stat information of the node path (TODO: test this) + # * :callback -- provide a AsyncCallback::AclCallback object or + # Proc for an asynchronous call to occur + # * :context -- context object passed into callback method + # + # ==== Examples + # ===== get acl + # zk.get_acl("/path") + # # => [ACL] + # + # ===== get acl with stat + # stat = ZK::Stat.new + # zk.get_acl("/path", :stat => stat) + # + #-- + # ===== get acl asynchronously + # + # class AclCallback + # def processResult(return_code, path, context, acl, stat) + # # do processing here + # end + # end + # + # callback = AclCallback.new + # context = Object.new + # zk.acls("/path", :callback => callback, :context => context) + #++ def get_acl(path, opts={}) h = { :path => path }.merge(opts) rv = check_rc(@cnx.get_acl(h)) opts[:callback] ? nil : rv.values_at(:children, :stat) end + # Set the ACL for the node of the given path if such a node exists and the + # given version matches the version of the node. Return the stat of the + # node. + # + # A KeeperException with error code KeeperException::Code::NoNode will be + # thrown if no node with the given path exists. + # + # A KeeperException with error code KeeperException::Code::BadVersion will + # be thrown if the given version does not match the node's version. + # + # Called with a hash of arguments set. Supports being executed + # asynchronousy by passing a callback object. + # + # ==== Arguments + # * path -- path of the node + # * :acl -- acl to set + # * :version -- defaults to -1, otherwise set to the expected matching version + # * :callback -- provide a AsyncCallback::StatCallback object or + # Proc for an asynchronous call to occur + # * :context -- context object passed into callback method + # + # ==== Examples + # TBA - waiting on clarification of method use + # def set_acl(path, acls, opts={}) h = { :path => path, :acl => acls }.merge(opts) rv = check_rc(@cnx.set_acl(h)) opts[:callback] ? nil : rv[:stat] end + + #-- # # EXTENSIONS @@ -199,12 +627,23 @@ def set_acl(path, acls, opts={}) # #++ - # creates all parent paths and 'path' in zookeeper as nodes with zero data - # opts should be valid options to ZooKeeper#create - #--- + # Creates all parent paths and 'path' in zookeeper as persistent nodes with + # zero data. + # + # ==== Arguments + # * path: An absolute znode path to create + # + # ==== Examples + # + # zk.exists?('/path') + # # => false + # + # zk.mkdir_p('/path/to/blah') + # # => "/path/to/blah" + # + #-- # TODO: write a non-recursive version of this. ruby doesn't have TCO, so # this could get expensive w/ psychotically long paths - # def mkdir_p(path) create(path, '', :mode => :persistent) rescue Exceptions::NodeExists @@ -264,37 +703,63 @@ def block_until_node_deleted(abs_node_path) end # creates a new locker based on the name you send in - # @param [String] name the name of the lock you wish to use - # @see ZooKeeper::Locker#initialize - # @return ZooKeeper::Locker the lock using this connection and name - # @example - # zk.locker("blah").lock! + # + # see ZK::Locker#initialize + # + # returns a ZK::Locker instance using this Client and provided + # lock name + # + # ==== Arguments + # * name name of the lock you wish to use + # + # ==== Examples + # + # zk.locker("blah") + # # => # + # def locker(name) Locker.new(self, name) end - # convenience method for acquiring a lock then executing a code block + # Convenience method for acquiring a lock then executing a code block. This + # will block the caller until the lock is acquired. + # + # ==== Examples + # + # zk.with_lock('foo') do + # # this code is executed while holding the lock + # end + # def with_lock(path, &b) locker(path).with_lock(&b) end - # convenience method for constructing an election candidate + # Convenience method for constructing a ZK::Election::Candidate object using this + # Client connection, the given election +name+ and +data+. + # def election_candidate(name, data, opts={}) opts = opts.merge(:data => data) ZK::Election::Candidate.new(self, name, opts) end - # convenience method for constructing an election observer + # Convenience method for constructing a ZK::Election::Observer object using this + # Client connection, and the given election +name+. + # def election_observer(name, opts={}) ZK::Election::Observer.new(self, name, opts) end - # creates a new message queue of name _name_ - # @param [String] name the name of the queue - # @return [ZooKeeper::MessageQueue] the queue object - # @see ZooKeeper::MessageQueue#initialize - # @example + # creates a new message queue of name +name+ + # + # returns a ZK::MessageQueue object + # + # ==== Arguments + # * name the name of the queue + # + # ==== Examples + # # zk.queue("blah").publish({:some_data => "that is yaml serializable"}) + # def queue(name) MessageQueue.new(self, name) end @@ -318,10 +783,12 @@ def set_debug_level(level) #:nodoc: end end + #:stopdoc: # the state of the underlying connection # def state #:nodoc: # @cnx.state # end + #:startdoc: # register a block to be called on connection, when the client has # connected (syncronously if connected? is true, or on a watcher thread if @@ -331,6 +798,7 @@ def set_debug_level(level) #:nodoc: # # returns an EventHandlerSubscription object that can be used to unregister # this block from further updates + # def on_connected(&block) watcher.register_state_handler(:connected, &block).tap do block.call if connected? @@ -341,6 +809,7 @@ def on_connected(&block) # to the zookeeper server. the documentation says that this state should be # taken to mean that the application should enter into "safe mode" and operate # conservatively, as it won't be getting updates until it has reconnected + # def on_connecting(&block) watcher.register_state_handler(:connecting, &block).tap do block.call if connected? @@ -350,7 +819,6 @@ def on_connecting(&block) # register a block to be called when our session has expired. This usually happens # due to a network partitioning event, and means that all callbacks and watches must # be re-registered with the server - # #--- # NOTE: need to come up with a way to test this def on_expired_session(&block) diff --git a/lib/z_k/pool.rb b/lib/z_k/pool.rb index a729572..2d3ef8e 100644 --- a/lib/z_k/pool.rb +++ b/lib/z_k/pool.rb @@ -56,7 +56,9 @@ def with_connection def checkout(blocking=true) #:nodoc: assert_open! - debugger + + # api change + raise ArgumentError, "checkout does not take a block" if block_given? @pool.pop(!blocking) rescue ThreadError @@ -168,7 +170,6 @@ def initialize(host, opts={}) # for compatibility w/ ClientPool we'll use @connections for synchronization @pool = [] # currently available connections - populate_pool!(@min_clients) end diff --git a/spec/client_pool_spec.rb b/spec/client_pool_spec.rb index 1d00307..adbd9b6 100644 --- a/spec/client_pool_spec.rb +++ b/spec/client_pool_spec.rb @@ -1,5 +1,7 @@ require File.join(File.dirname(__FILE__), %w[spec_helper]) +require 'tracer' + describe ZK::Pool do describe :Simple do @@ -86,11 +88,11 @@ @path = '/_testWatch' - @connection_pool.checkout do |zk| + @connection_pool.with_connection do |zk| zk.delete(@path) rescue ZK::Exceptions::NoNode end - @connection_pool.checkout do |zk| + @connection_pool.with_connection do |zk| $stderr.puts "registering callback" zk.watcher.register(@path) do |event| $stderr.puts "callback fired! event: #{event.inspect}" @@ -104,7 +106,7 @@ zk.exists?(@path, :watch => true).should be_false end - @connection_pool.checkout do |zk| + @connection_pool.with_connection do |zk| $stderr.puts "creating path" zk.create(@path, "", :mode => :ephemeral).should == @path end