diff --git a/.gitignore b/.gitignore index 2cfd3c8..091b08d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,11 +1,11 @@ #No hidden files (except for .gitignore) .* !.gitignore +!.yardopts +# Gems should not checking Gemfile.lock +Gemfile.lock #Files generated by rake -*.gemspec -zk.out -zookeeper.out +lib/jute/ +*.out doc pkg -#Jedit stupid default backup settings -*'`' diff --git a/.yardopts b/.yardopts new file mode 100644 index 0000000..daa4a71 --- /dev/null +++ b/.yardopts @@ -0,0 +1,2 @@ +- +History.txt diff --git a/Gemfile b/Gemfile new file mode 100644 index 0000000..70176dd --- /dev/null +++ b/Gemfile @@ -0,0 +1,4 @@ +source "http://rubygems.org" + +# Specify your gem's dependencies in rfuse.gemspec +gemspec diff --git a/Manifest.txt b/Manifest.txt deleted file mode 100644 index b23a31a..0000000 --- a/Manifest.txt +++ /dev/null @@ -1,39 +0,0 @@ -History.txt -Manifest.txt -README.rdoc -Rakefile -jute/jute.citrus -jute/lib/hoe/jute.rb -jute/lib/jute.rb -lib/zkruby/bindata.rb -lib/zkruby/enum.rb -lib/zkruby/eventmachine.rb -lib/zkruby/multi.rb -lib/zkruby/protocol.rb -lib/zkruby/rubyio.rb -lib/zkruby/session.rb -lib/zkruby/client.rb -lib/zkruby/util.rb -lib/zkruby/zkruby.rb -lib/zkruby.rb -lib/em_zkruby.rb -spec/bindata_spec.rb -spec/enum_spec.rb -spec/eventmachine_spec.rb -spec/multi_spec.rb -spec/protocol_spec.rb -spec/recipe_helper.rb -spec/rubyio_spec.rb -spec/sequences_spec.rb -spec/shared/auth.rb -spec/shared/basic.rb -spec/shared/binding.rb -spec/shared/chroot.rb -spec/shared/multi.rb -spec/shared/util.rb -spec/shared/watch.rb -spec/spec_helper.rb -spec/server_helper.rb -src/jute/zookeeper.jute -lib/jute/zookeeper.rb -yard_ext/enum_handler.rb diff --git a/README.rdoc b/README.rdoc index c2f4783..c0048ca 100644 --- a/README.rdoc +++ b/README.rdoc @@ -1,6 +1,6 @@ = zkruby -* https://github.com/lwoggardner/zkruby +* http://rubygems.org/gems/zkruby == DESCRIPTION: @@ -10,6 +10,9 @@ Pure ruby client for ZooKeeper (http://zookeeper.apache.org) Supports full ZooKeeper API, synchronous or asynchronous style, watches etc.. with implementations over EventMachine or plain old Ruby IO/Threads +Other ruby libraries for zookeeper tend to use the underlying C/Java client libraries while zkruby +implements the zookeeper wire protocol directly. + Advantages: * Rubyist API - with block is asynchronous, without block is synchronous * Avoids conflicts between various Ruby threading models and the C/Java apis @@ -18,9 +21,9 @@ Advantages: Disadvantages: * Duplicated code from Java/C libraries, particularly around herd effect protection -* Maintain in parallel with breaking changes in protocol which are possibly more likely - than breaking changes in the client API -* Probably not as optimised in terms of performance (but your client code is ruby anyway) +* Needs to keep up with changes in wire protocol which are possibly more likely + than changes in the client API +* Possibly not as optimised in terms of performance (but your client code is ruby anyway) * Not production tested (yet- do you want to be the first?) == SYNOPSIS: @@ -77,27 +80,27 @@ Disadvantages: == DEVELOPERS: -Checkout the zookeeper source from http://zookeeper.apache.org +Download ZooKeeper from http://zookeeper.apache.org -Checkout the zkruby source into the contrib directory +Create a conf/zoo.cfg file (copying sample.zoo.cfg is fine) -Install hoe +Checkout the zkruby source into the contrib directory - $ (sudo) gem install hoe +Copy (if different) src/zookeeper.jute to contrib/zkruby/src/zookeeper.jute -Install other dependencies +Get gem dependencies - $ rake check_extra_deps + $ bundle install Generate docs and run the tests/specs - $ rake newb + $ rake == LICENSE: (The MIT License) -Copyright (c) 2011 +Copyright (c) 2012 Grant Gardner Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the diff --git a/Rakefile b/Rakefile index 502bd38..9aaf36c 100644 --- a/Rakefile +++ b/Rakefile @@ -1,39 +1,34 @@ -# -*- ruby -*- -# hoe 2.12.5 goes looking for the plugins so we have to do it this way.. -$LOAD_PATH.unshift File.dirname(__FILE__) + '/jute/lib' +#!/usr/bin/env rake +$:.unshift "jute/lib" -require 'rubygems' -require 'hoe' +require 'rake/clean' +require 'yard' +require './yard_ext/enum_handler.rb' +require "bundler/gem_tasks" +require 'rspec/core/rake_task' +require 'jute/task' -begin - require './yard_ext/enum_handler' -rescue LoadError => err - warn "%p while trying to load yard extensions: %s" % [ err.class, err.message ] +RSpec::Core::RakeTask.new +RSpec::Core::RakeTask.new(:perf_spec) do |t| + t.rspec_opts = "--tag perf" end - +YARD::Rake::YardocTask.new -# Hoe.plugin :compiler -#Hoe.plugin :gem_prelude_sucks -Hoe.plugin :git -# Hoe.plugin :inline -# Hoe.plugin :racc -# Hoe.plugin :rubyforge -Hoe.plugin :yard -Hoe.plugin :jute - -Hoe.spec 'zkruby' do - self.readme_file="README.rdoc" - developer('Grant Gardner', 'grant@lastweekend.com.au') - dependency 'slf4r' , '~> 0.4.2' - dependency 'eventmachine', '~> 0.12.10', :development - dependency 'strand', '~> 0.1.0', :development - dependency 'logging', '>= 1.4.1', :development - dependency 'rspec', '>=2.7.0', :development - dependency 'hoe-yard', '>=0.1.2', :development - - self.jute_modules = { +Jute::Task.new() do |t| + t.modules = { "org.apache.zookeeper.data" => "ZooKeeper::Data", "org.apache.zookeeper.proto" => "ZooKeeper::Proto"} end -# vim: syntax=ruby + +task :perf_spec => :jute +task :spec => :jute +task :build => :jute +task :install => :jute +task :release => :jute +task :yard => :jute + +task :default => [:spec,:yard] + +CLEAN.include "*.out","Gemfile.lock",".yardoc/" +CLOBBER.include "doc/","pkg/","lib/jute" diff --git a/jute/lib/hoe/jute.rb b/jute/lib/hoe/jute.rb deleted file mode 100644 index 5b951a6..0000000 --- a/jute/lib/hoe/jute.rb +++ /dev/null @@ -1,56 +0,0 @@ -# coding: utf-8 - -module Hoe::Jute - attr_accessor :jute - attr_accessor :jute_tasks - attr_accessor :jute_modules - - #attr_accessor :jute_compiler - def initialize_jute - self.jute_tasks = [:test,:spec,:package] - dependency 'citrus', '~> 2.4.0', :development - #dependency 'jute' # if jute is ever a separate gem - dependency 'bindata', '~> 1.4.1' - end - - def define_jute_tasks - - found = try_load_jute() - - if found - jute_compiler = ::Jute::Compiler.new() - jute_files = self.spec.files.find_all { |f| f =~ /\.jute$/ } - - record_files = jute_files.map { |f| f.pathmap("%{src,lib}X.rb") } - self.clean_globs += record_files - - - rule ".rb" => ["%{lib,src}X.jute"] do |t| - File.open(t.source) do |input| - File.open(t.name,"w") do |output| - puts "compiling #{input.inspect} to #{output.inspect}" - jute_compiler.compile(input,output,jute_modules) - end - end - end - - desc "generate jute records" unless jute_files.empty? - task :jute - - task :jute => record_files - - jute_tasks.each do |t| - task t => [:jute] - end - end - end - - def try_load_jute() - require 'jute' - rescue LoadError => err - warn "%p while trying to load jute: %s" % [ err.class, err.message ] - false - end -end - - diff --git a/jute/lib/jute/task.rb b/jute/lib/jute/task.rb new file mode 100644 index 0000000..d3d2e6b --- /dev/null +++ b/jute/lib/jute/task.rb @@ -0,0 +1,62 @@ +# coding: utf-8 +require 'rake/tasklib' +require 'jute' + +class Jute::Task < Rake::TaskLib + + attr_accessor :modules + attr_accessor :files + attr_accessor :pathmap + + def initialize name = :jute + + defaults + + @name = name + + yield self if block_given? + + define_jute_tasks + end + + def defaults + @files = "src/jute/*.jute" + @pathmap = "%{src,lib}X.rb" + end + + def define_jute_tasks + desc "Compile jute files to ruby classes" + task jute_task_name + + raise "modules hash must be defined" unless Hash === @modules + FileList.new(@files).each do | source | + target = source.pathmap(@pathmap) + + target_dir = target.pathmap("%d") + directory target_dir + + file target => [source,target_dir] do + compile_jute(source,target) + end + task jute_task_name => target + end + end + + def jute_task_name + @name + end + + def compile_jute(source,target) + + @jute_compiler = ::Jute::Compiler.new() unless @jute_compiler + + File.open(source) do |input| + File.open(target,"w") do |output| + puts "Compiling #{input.inspect} to #{output.inspect}" + @jute_compiler.compile(input,output,modules) + end + end + end +end + + diff --git a/lib/em_zkruby.rb b/lib/em_zkruby.rb index 307e165..8274407 100644 --- a/lib/em_zkruby.rb +++ b/lib/em_zkruby.rb @@ -1,4 +1,6 @@ -# This is the main require for standard ruby io/thread based binding - +# This is the main require for the eventmachine based binding +# Only use this if all use of zkruby will be within the EM Reactor require 'zkruby/zkruby' require 'zkruby/eventmachine' + +Empathy::EM.empathise(ZooKeeper) diff --git a/lib/jute/.gitignore b/lib/jute/.gitignore deleted file mode 100644 index d6b7ef3..0000000 --- a/lib/jute/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -* -!.gitignore diff --git a/lib/zkruby.rb b/lib/zkruby.rb index 93b19a2..0599be3 100644 --- a/lib/zkruby.rb +++ b/lib/zkruby.rb @@ -1,4 +1,3 @@ # This is the main require for standard ruby io/thread based binding require 'zkruby/zkruby' -require 'zkruby/rubyio' diff --git a/lib/zkruby/async_op.rb b/lib/zkruby/async_op.rb new file mode 100644 index 0000000..d169131 --- /dev/null +++ b/lib/zkruby/async_op.rb @@ -0,0 +1,194 @@ +module ZooKeeper + + # Returned by asynchronous calls + # + # @example + # op = zk.stat("\apath") { |stat| something_with_stat() } + # + # op.async_rescue ZK::Error::SESSION_EXPIRED do + # puts "Ignoring session expired" + # result_when_session_expired() + # end + # + # op.async_rescue ZK::Error::CONNECTION_LOST do |ex| + # puts "Retrying stat due to connection lost" + # op.async_retry() + # # the result of this block is ignored! + # end + # + # begin + # result_of_somthing_with_stat = op.value + # rescue StandardError => ex + # puts "Oops" + # end + # + # + class AsyncOp + + # @api private + def initialize(event_loop,callback,&operation) + @event_loop = event_loop + @operation = operation + @callback = callback + @mutex,@cv = Mutex.new(), ConditionVariable.new() + begin + execute() + rescue ZooKeeper::Error => ex + # Capture any initial exception + # The only expected condition is :session_expired + @op_error = ex + logger.debug { "Error during initial call #{ex}" } + end + end + + # @param [Proc,#to_proc] block the error callback as a Proc + # @deprecated use {#async_rescue} + def errback=(block) + async_rescue(&block) + end + + #Rescue asynchronous exceptions in a similar manner to normal + #ruby. Unfortunately rescue is a reserved word + # @param matches [Class,...] subclasses of Exception to match, defaults to {::StandardError} + # @param errblock the block to call if an error matches + # @return self + # @yieldparam [Exception] ex the exception raised by the async operation OR by its callback + def async_rescue(*matches,&errblock) + matches << StandardError if matches.empty? + rescue_blocks << [ matches ,errblock ] + if @op_error && matches.any? { |m| m === @op_error } + begin + @allow_retry = true + @op_rescue= [ nil, errblock.call(@op_error) ] + rescue StandardError => ex + @operation_rescue_result = [ ex, nil ] + ensure + @resumed = true + @op_error = nil + @allow_retry = false + end + end + self + end + alias :on_error :async_rescue + alias :op_rescue :async_rescue + + # @deprecated + alias :errback :async_rescue + + + # Must only be called inside a block supplied to {#async_rescue} + # Common case is to retry on connection lost + # Retrying :session_expired is guaranteed to give infinite loops! + def async_retry() + raise ProtocolError "cannot retry outside of a rescue block" unless @allow_retry + begin + execute() + nil + rescue ZooKeeper::Error => ex + error,result = process_response(ex,nil) + if resumed? + raise error if error + return result + end + end + end + + alias :op_retry :async_retry + alias :try_again :async_retry + + # Wait for the async op to finish and returns its value + # @return result of the operation's callback or matched rescue handler + # @raise [StandardError] any unrescued exception + def value(); + if @op_error + raise @op_error + elsif @op_rescue + error, result = @op_rescue + raise error if error + return result + else + wait_value() + end + end + + # @api private + attr_accessor :backtrace + + # @api private + def resume(op_error,response) + mutex.synchronize do + # remember this mutex is only used to wait for this response anyway + # so synchronizing here is not harmful even if processing the response + # includes a long running callback. (which can't create deadlocks + # by referencing this op! + @resumed = true + @error, @result = process_response(op_error,response) + cv.signal() if resumed? + end + end + + private + attr_reader :callback, :operation, :event_loop + attr_reader :mutex, :cv, :error, :result + + def execute() + @op_error = nil + @op_rescue = nil + @resumed = false + operation.call(self) + true + end + + def resumed? + @resumed + end + + def rescue_blocks + @rescue_blocks ||= [] + end + + def process_response(op_error,response) + logger.debug { "Processing response #{op_error} #{response}" } + + # For ZooKeeper errors, set the backtrace to the original caller, rather than the ZK event loop + op_error.set_backtrace(@backtrace) if @backtrace && ZooKeeper::Error === op_error + + begin + return [ nil, callback.call(response) ] unless op_error + rescue Exception => ex #enable clients to rescue Exceptions + op_error = ex + end + + matches,rb = rescue_blocks.detect() { |matches,errblock| matches.any? { |m| m === op_error } } + return [ op_error, nil ] unless rb + + begin + @allow_retry = true + return [ nil, rb.call(op_error) ] + rescue StandardError => ex + return [ ex , nil ] + ensure + @allow_retry = false + end + end + + def wait_value() + if event_loop.current? + #Waiting in the event loop (eg made a synchronous call inside a callback) + #Keep processing events until we are resumed + until resumed? || event_loop.dead? + event_loop.pop_event_queue() + end + else + mutex.synchronize { + logger.debug("Async op is waiting") + cv.wait(mutex) unless resumed? + } + end + + raise error if error + result + end + end +end diff --git a/lib/zkruby/client.rb b/lib/zkruby/client.rb index 42e2b0f..752158f 100644 --- a/lib/zkruby/client.rb +++ b/lib/zkruby/client.rb @@ -48,7 +48,7 @@ class Perms # Combine permissions constants - # @param [Perms] perms... list of permissions to combine, can be {Perms} constants, symbols or ints + # @param [Perms...] perms list of permissions to combine, can be {Perms} constants, symbols or ints # @return [Fixnum] integer representing the combined permission def self.perms(*perms) perms.inject(0) { | result, perm | result = result | Perms.get(perm) } @@ -56,7 +56,7 @@ def self.perms(*perms) # Convenience method to create a zk Identity # @param [String] scheme - # @param [String] identity + # @param [String] id # @return [Data::Identity] the encapsulated identity for the given scheme def self.id(scheme,id) Data::Identity.new(:scheme => scheme, :identity => id) @@ -65,7 +65,7 @@ def self.id(scheme,id) # Convenience method to create a zk ACL # ZK.acl(ZK.id("world","anyone"), ZK::Perms.DELETE, ZL::Perms.WRITE) # @param [Data::Identity] id - # @param [Perms] *perms list of permissions + # @param [Perms...] perms list of permissions # @return [Data::ACL] an access control list # @see #perms # @@ -108,8 +108,6 @@ def self.path_to_seq(path) CURRENT = :zookeeper_current # Main method for connecting to a client # @param addresses [Array] list of host:port for the ZK cluster as Array or comma separated String - # @option options [Class] :binding binding optional implementation class - # either {EventMachine::Binding} or {RubyIO::Binding} but normally autodiscovered # @option options [String] :chroot chroot path. # All client calls will be made relative to this path # @option options [Watcher] :watch the default watcher @@ -118,43 +116,35 @@ def self.path_to_seq(path) # @yieldparam [Client] # @return [Client] def self.connect(addresses,options={},&block) - if options.has_key?(:binding) - binding_type = options[:binding] - else - binding_type = @bindings.detect { |b| b.available? } - raise ProtocolError,"No available binding" unless binding_type - end - binding = binding_type.new() - session = Session.new(binding,addresses,options) - client = Client.new(binding) - binding.start(client,session) + session = Session.new(addresses,options) + client = Client.new(session) + + session.start(client) + return client unless block_given? - binding_type.context() do |storage| - @binding_storage = storage - storage.current[CURRENT] ||= [] - storage.current[CURRENT].push(client) - begin - block.call(client) - ensure - storage.current[CURRENT].pop - client.close() unless session.closed? - end + storage = Thread.current[CURRENT] ||= [] + storage.push(client) + begin + yield client + ensure + storage.pop + #TODO this will throw an exception if expired + client.close() end end - # within the block supplied to {#connect} this will return the + # within the block supplied to {ZooKeeper.connect} this will return the # current ZK client def self.current #We'd use if key? here if strand supported it - @binding_storage.current[CURRENT].last if @binding_storage.current[CURRENT] + Thread.current[CURRENT].last if Thread.current.key?(CURRENT) end # Allow ZK a chance to send its data/ping - # particularly required for the eventmachine binding def self.pass - @binding_storage.pass + Thread.pass end class WatchEvent @@ -181,7 +171,8 @@ class KeeperState end - # @abstract. + # @abstract + # The watch interface class Watcher # @param [KeeperState] state representing the session state # (:connected, :disconnected, :auth_failed, :session_expired) @@ -263,29 +254,30 @@ def op_check(path,version,&callback) # or with state :expired and event :none when the session is finalised class Client + attr_reader :session include Operations # @api private # See {::ZooKeeper.connect} - def initialize(binding) - @binding = binding + def initialize(session) + @session = session end # Session timeout, initially as supplied, but once connected is the negotiated # timeout with the server. def timeout - @binding.session.timeout + session.timeout end # The currently registered default watcher def watcher - @binding.session.watcher + session.watcher end # Assign the watcher to the session. This watcher will receive session connect/disconnect/expired # events as well as any path based watches registered to the API calls using the literal value "true" # @param [Watcher,#process_watch,Proc] watcher def watcher=(watcher) - @binding.session.watcher=watcher + session.watcher=watcher end # Retrieve the list of children at the given path @@ -451,13 +443,13 @@ def sync(path,&blk) # Close the session # @overload close() - # @raise [Error] + # @raise [Error] # @overload close() # @return [AsyncOp] asynchronous operation # @yield [] callback invoked when session is closed def close(&blk) return synchronous_call(:close) unless block_given? - @binding.close(&blk) + session.close(&blk) end # @api private @@ -507,23 +499,26 @@ def transaction(&block) yield txn txn.commit end - private - def session - @binding.session - end + private + + # This is where the magic happens! def synchronous_call(method,*args) + # Re-enter the calling method in asynchronous style op = self.send(method,*args) do |*results| results end + + # Remove this call from the stored backtrace op.backtrace = op.backtrace[2..-1] if op.backtrace + # Wait for the asynchronous op to finish and return its value op.value end def queue_request(*args,&blk) - op = @binding.queue_request(*args,&blk) + op = session.request(*args,&blk) op.backtrace = caller[1..-1] op end @@ -546,7 +541,8 @@ def unchroot(path) # If the transaction fails none of these callbacks will be executed. class Transaction include Operations - #:nodoc + # @api private + # See {Client#transaction} def initialize(client,session) @client = client @session = session diff --git a/lib/zkruby/conn.rb b/lib/zkruby/conn.rb new file mode 100644 index 0000000..342bf99 --- /dev/null +++ b/lib/zkruby/conn.rb @@ -0,0 +1,108 @@ +require 'zkruby/socket' +module ZooKeeper + + class Connection + include ZooKeeper::Protocol + include Slf4r::Logger + + attr_reader :session + + def initialize(session) + @session = session + end + + def run(host,port,timeout) + @write_queue = Queue.new() + + begin + sock = Socket.tcp_connect_timeout(host,port,timeout) + sock.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) + sock.sync=true + write_thread = Thread.new(sock) { |sock| write_loop(sock) } + begin + session.prime_connection(self) + read_loop(sock) + ensure + disconnect(sock) + end + write_thread.join + rescue Errno::ECONNREFUSED + logger.warn{"Connection refused to #{host}:#{port}"} + end + end + + # This is called from random client threads (including the event loop) + def send_data(data) + @write_queue.push(data) if data + end + + private + + # Since this runs in its very own thread and has no timers + # we can use boring blocking IO + def write_loop(socket) + Thread.current[:name] = "ZK::WriteLoop #{self} #{socket} #{session}" + begin + while !socket.closed? && data = @write_queue.pop() + socket.write(data) + logger.debug { "Sent: #{data.unpack("H*")[0]}" } + end + logger.debug { "Write loop finished" } + rescue Exception => ex + logger.warn( "Exception in write loop",ex ) + # Make sure we break out of the read loop + disconnect(socket) + end + end + + def read_loop(socket) + ping = 0 + until socket.closed? + begin + data = socket.read_timeout(512,session.ping_interval) + if data.nil? + logger.debug { "Read timed out" } + ping += 1 + case ping + when 1 ; session.ping() + when 2 + logger.warn{"No response to ping in #{session.ping_interval}*2"} + break + end + else + logger.debug { "Received (#{data.length})" + data.unpack("H*")[0] } + receive_data(data) + ping = 0 + end + rescue EOFError + # This is how we expect to end - send a close packet and the + # server closes the socket + logger.debug { "EOF reading from socket" } + break + end + end + end + + # @api protocol + def receive_records(packet_io) + session.receive_records(packet_io) + end + + def disconnect(socket) + @write_queue.push(nil) + socket.close if socket and !socket.closed? + rescue Exception => ex + #oh well + logger.debug("Exception closing socket",ex) + end + end +end + +module ZooKeeperBinding + + # connect and read from the socket until disconnected + def self.connect(session,host,port,timeout) + ZooKeeper::Connection.new(session).run(host,port,timeout) + end +end + diff --git a/lib/zkruby/enum.rb b/lib/zkruby/enum.rb index 7143df1..ac16ad6 100644 --- a/lib/zkruby/enum.rb +++ b/lib/zkruby/enum.rb @@ -32,11 +32,11 @@ def to_int end def |(num) - to_int | num + @index | num end def &(num) - to_int & num + @index & num end def to_sym @@ -46,17 +46,21 @@ def to_sym def to_s "#{super} (:#{@name} [#{@index}])" end + + def coerce(other) + [self,other] + end end module ClassMethods - # @param [] ref Symbol, Fixnum or Enumeration + # @param [Symbol,Fixnum,Enumeration] ref # @return [Enumeration] instance representing ref or nil if not found def get(ref) @enums[ref] end - # @param [] ref Symbol, Fixnum + # @param [Symbol,Fixnum] ref # @raise [KeyError] if ref not found # @return [Enumeration] def fetch(ref) diff --git a/lib/zkruby/eventmachine.rb b/lib/zkruby/eventmachine.rb index 41f7eda..d44f4a6 100644 --- a/lib/zkruby/eventmachine.rb +++ b/lib/zkruby/eventmachine.rb @@ -1,18 +1,18 @@ require 'eventmachine' +require 'empathy' if defined?(JRUBY_VERSION) && JRUBY_VERSION =~ /1\.6\.5.*/ raise "Fibers are broken in JRuby 1.6.5 (See JRUBY-6170)" end -require 'strand' - module ZooKeeper module EventMachine - class ClientConn < ::EM::Connection + class Connection < ::EM::Connection include Protocol include Slf4r::Logger + attr_reader :session unless EventMachine.methods.include?(:set_pending_connect_timeout) def set_pending_connect_timeout(timeout) @@ -20,6 +20,7 @@ def set_pending_connect_timeout(timeout) end def initialize(session,connect_timeout) + @fiber = Fiber.current @session = session @connect_timeout = connect_timeout set_pending_connect_timeout(connect_timeout) @@ -27,167 +28,94 @@ def initialize(session,connect_timeout) logger.warn("Exception in initialize",ex) end - def post_init() - rescue Exception => ex - logger.warn("Exception in post_init",ex) - end + # This "loop" is a means of keeping all the session activity + # on the session fiber + def read_loop() + event,*args = Fiber.yield + if (event == :connection_completed) + logger.debug("Connection completed") + session.prime_connection(self) - def connection_completed() - @session.prime_connection(self) - - # Make sure we connect within the timeout period - # TODO this should really be the amount of connect timeout left over - @timer = EM.add_timer(@connect_timeout) do - if @session.connected? - # Start the ping timer - ping = @session.ping_interval - @timer = EM.add_periodic_timer ( ping ) do - case @ping - when 1 then @session.ping() - when 2 then close_connection() + @timer = ::EventMachine.add_timer(session.ping_interval) do + resume(:connect_timer) + end + + ping = 0 + unbound = false + # If session sleeps or waits in here then our yield/resumes are going to get out of sync + until unbound + event,*args = Fiber.yield + logger.debug { "Received event #{event} with #{args}" } + case event + when :connect_timer + if session.connected? + @timer = ::EventMachine.add_periodic_timer(session.ping_interval) do + resume(:ping_timer) + end + else + logger.warn("Connection timed out") + break; + end + when :ping_timer + case ping + when 1 then session.ping + when 2 then break; end - @ping += 1 + ping += 1 + when :receive_records + packet_io = args[0] + ping = 0 + session.receive_records(packet_io) + when :unbind + unbound = true + else + logger.error("Unexpected resume - #{event}") + break; end - else - close_connection() end end + ensure + @fiber = nil + ::EventMachine.cancel_timer(@timer) if @timer + close_connection() unless unbound + end - rescue Exception => ex - logger.warn("Exception in connection_completed",ex) + def connection_completed() + resume(:connection_completed) end def receive_records(packet_io) - @ping = 0 - @session.receive_records(packet_io) + resume(:receive_records,packet_io) end - def disconnect() - close_connection() + def unbind(reason) + logger.warn{"Connection #{self} unbound due to #{reason}"} if reason + resume(:unbind) end - def unbind - EM.cancel_timer(@timer) if @timer - @session.disconnected() + private + def resume(event,*args) + @fiber.resume(event,*args) if @fiber rescue Exception => ex - logger.warn("Exception in unbind",ex) + logger.error("Exception resuming #{@fiber} for event #{event}",ex) end - end + end #module EventMachine +end #module ZooKeeper - # The EventMachine binding is very simple because there is only one thread! - # and we have good stuff like timers provided for us - class Binding - include Slf4r::Logger - # We can use this binding if we are running in the reactor thread - def self.available?() - EM.reactor_running? && EM.reactor_thread? - end - - def self.context(&context_block) - s = Strand.new() do - context_block.call(Strand) - end - s.join - end - - attr_reader :client, :session - def start(client,session) - @client = client - @session = session - - @event_strand = Strand.new do - Strand.current[ZooKeeper::CURRENT] = [ @client ] - loop do - break unless pop_event_queue - end - logger.debug { "Event strand finished"} - end - @session.start() - end - - def pop_event_queue() - #TODO - use Strand.yield when Strand is updated - queued = Fiber.yield - return false unless queued - callback,*args = queued - callback.call(*args) - return true - rescue Exception => ex - logger.error("Exception in event strand", ex) - #TODO - should this be raised? - end - - def event_strand?() - Strand.current.equal?(@event_strand) - end - - def connect(host,port,delay,timeout) - EM.add_timer(delay) do - EM.connect(host,port,ZooKeeper::EventMachine::ClientConn,@session,timeout) - end - end - - # You are working in event machine it is up to you to ensure your callbacks do not block - def invoke(callback,*args) - @event_strand.fiber.resume(callback,*args) - end - - def queue_request(*args,&callback) - AsyncOp.new(self,callback) do |op| - @session.queue_request(*args) do |error,response| - op.resume(error,response) - end - end - end +module Empathy - def close(&callback) - AsyncOp.new(self,callback) do |op| - @session.close() do |error,response| - op.resume(error,response) - end - end + module EM + module ZooKeeperBinding + def self.connect(session,host,port,timeout) + conn = ::EventMachine.connect(host,port,ZooKeeper::EventMachine::Connection,session,timeout) + conn.read_loop end - end #class Binding + end #module EM - class AsyncOp < ZooKeeper::AsyncOp + create_delegate_module('ZooKeeperBinding',:connect) - def initialize(binding,callback,&operation) - @em_binding = binding - @cv = Strand::ConditionVariable.new() - super(callback,&operation) - end - - private - - attr_reader :cv,:error,:result - - def process_resume(error,response) - @error,@result = process_response(error,response) - cv.signal() if resumed? - end - - def wait_value() - if @em_binding.event_strand? - until resumed? - break unless @em_binding.pop_event_queue() - end - - #TODO there's a problem if we have not been resumed - #and the event strand it dead. - logger.error { "Not resumed and event strand is dead" } unless resumed? - else - cv.wait() unless resumed? - end - - raise error if error - result - end - - end #class AsyncOp - end #module EventMachine -end #module ZooKeeper +end #module Empathy -ZooKeeper.add_binding(ZooKeeper::EventMachine::Binding) diff --git a/lib/zkruby/protocol.rb b/lib/zkruby/protocol.rb index 5223742..4285e73 100644 --- a/lib/zkruby/protocol.rb +++ b/lib/zkruby/protocol.rb @@ -15,7 +15,7 @@ module Protocol def receive_data data # :nodoc: - @buffer ||= StringIO.new() + @buffer ||= StringIO.new().set_encoding('binary') @buffer.seek(0, IO::SEEK_END) @buffer << data @buffer.rewind @@ -71,7 +71,6 @@ def initialize(op,opcode,request,response,callback) def path #Every request has a path! - #TODO - path may be chrooted! request.path if request.respond_to?(:path) end end @@ -94,7 +93,7 @@ def result(rc) error = nil unless (Error::NONE === rc) then error = Error.lookup(rc) - error = error.exception("ZooKeeper error for #{@op}(#{path}) ") + error = error.exception("ZooKeeper error #{error.to_sym} for #{@op}(#{path}) ") end [ callback, error ,response, watch_type ] end @@ -114,183 +113,5 @@ def result(rc) Error::SESSION_EXPIRED == rc ? [ callback, nil, nil, nil ] : super(rc) end end - - - # Returned by asynchronous calls - # - # @example - # op = zk.stat("\apath") { |stat| something_with_stat() } - # - # op.async_rescue ZK::Error::SESSION_EXPIRED do - # puts "Ignoring session expired" - # result_when_session_expired() - # end - # - # op.async_rescue ZK::Error::CONNECTION_LOST do |ex| - # puts "Retrying stat due to connection lost" - # op.async_retry() - # # the result of this block is ignored! - # end - # - # begin - # result_of_somthing_with_stat = op.value - # rescue StandardError => ex - # puts "Oops" - # end - # - # - class AsyncOp - - # @api binding - # @note Binding API, not for client use - attr_accessor :backtrace - - # @api binding - # @note Binding API, not for client use - def initialize(callback,&operation) - self.operation = operation - self.callback = callback - begin - execute() - rescue ZooKeeper::Error => ex - # Capture any initial exception - # The only expected condition is :session_expired - @op_error = ex - logger.debug { "Error during initial call #{ex}" } - end - end - - # @param [Proc,#to_proc] block the error callback as a Proc - # @deprecated use {#async_rescue} - def errback=(block) - async_rescue(&block) - end - - #Rescue asynchronous exceptions in a similar manner to normal - #ruby. Unfortunately rescue is a reserved word - # @param matches [Class,...] subclasses of Exception to match, defaults to {StandardError} - # @param errblock the block to call if an error matches - # @return self - # @yieldparam [Exception] ex the exception raised by the async operation OR by its callback - def async_rescue(*matches,&errblock) - matches << StandardError if matches.empty? - rescue_blocks << [ matches ,errblock ] - if @op_error && matches.any? { |m| m === @op_error } - begin - @allow_retry = true - @op_rescue= [ nil, errblock.call(@op_error) ] - rescue StandardError => ex - @operation_rescue_result = [ ex, nil ] - ensure - @resumed = true - @op_error = nil - @allow_retry = false - end - end - self - end - alias :on_error :async_rescue - alias :op_rescue :async_rescue - - # @deprecated - alias :errback :async_rescue - - - # Must only be called inside a block supplied to {#async_rescue} - # Common case is to retry on connection lost - # Retrying :session_expired is guaranteed to give infinite loops! - def async_retry() - raise ProtocolError "trying to retry outside of a rescue block" unless @allow_retry - begin - execute() - rescue ZooKeeper::Error => ex - error,result = process_response(ex,nil) - if resumed? - raise error if error - return result - end - end - end - - alias :op_retry :async_retry - alias :try_again :async_retry - - # Wait for the async op to finish and returns its value - # @return result of the operation's callback or matched rescue handler - # @raise [StandardError] any unrescued exception - def value(); - if @op_error - raise @op_error - elsif @op_rescue - error, result = @op_rescue - raise error if error - return result - else - begin - wait_value() - rescue ZooKeeper::Error => ex - # Set the backtrace to the original caller, rather than the ZK event loop - ex.set_backtrace(@backtrace) if @backtrace - raise ex - end - end - end - - # @api binding - # @note Binding API, not for client use - def resume(error,response) - process_resume(error,response) - end - - protected - attr_accessor :callback, :operation - - private - def execute() - @op_error = nil - @op_rescue = nil - @resumed = false - operation.call(self) - true - end - - def resumed? - @resumed - end - - def process_resume(error,response) - raise NotImplementedError, ":process_resume to be privately implemented by binding" - end - - def wait_value(); - raise NotImplementedError, ":wait_result to be privately implemented by binding" - end - - def rescue_blocks - @rescue_blocks ||= [] - end - - def process_response(error,response) - @resumed = true - begin - return [ nil, callback.call(response) ] unless error - rescue Exception => ex #enable clients to rescue Exceptions - error = ex - end - - matches,rb = rescue_blocks.detect() { |matches,errblock| matches.any? { |m| m === error } } - return [ error, nil ] unless rb - - begin - @allow_retry = true - return [ nil, rb.call(error) ] - rescue StandardError => ex - return [ ex , nil ] - ensure - @allow_retry = false - end - - end - end end diff --git a/lib/zkruby/rubyio.rb b/lib/zkruby/rubyio.rb deleted file mode 100644 index 4c26471..0000000 --- a/lib/zkruby/rubyio.rb +++ /dev/null @@ -1,284 +0,0 @@ -require 'socket' -require 'thread' -require 'monitor' - -# Binding over standard ruby sockets -# -# Manages 3 threads per zookeeper session -# -# Read thread -# manages connecting to and reading from the tcp socket. Uses non blocking io to manage timeouts -# and initiate the required ping requests. -# -# Write thread -# each new connection spawns a new thread. Requests coming from the session in response -# to multiple threads are written to a blocking queue. While the connection is alive -# this thread reads from the queue and writes to the socket, all in blocking fashion -# TODO: Is it really ok to do a non-blocking read during a blocking write? -# -# Event thread -# All response and watch callbacks are put on another blocking queue to be read and executed -# by this thread. -# -# All interaction with the session is synchronized -# -# Client synchronous code is implemented with a condition variable that waits on the callback/errback -module ZooKeeper::RubyIO - - class Connection - include ZooKeeper::Protocol - include Slf4r::Logger - include Socket::Constants - - def initialize(host,port,timeout,session) - @session = session - @write_queue = Queue.new() - - # JRuby cannot do non-blocking connects, which means there is - # no way to properly implement the connection-timeout - # See http://jira.codehaus.org/browse/JRUBY-5165 - # In any case this should be encapsulated in TCPSocket.open(host,port,timeout) - if RUBY_PLATFORM == "java" - begin - sock = TCPSocket.new(host,port.to_i) - rescue Errno::ECONNREFUSED - logger.warn("TCP Connection refused to #{host}:#{port}") - sock = nil - end - else - addr = Socket.getaddrinfo(host, nil) - sock = Socket.new(Socket.const_get(addr[0][0]), Socket::SOCK_STREAM, 0) - sock.setsockopt(SOL_SOCKET, SO_LINGER, [0,-1].pack("ii")) - sock.setsockopt(SOL_TCP, TCP_NODELAY,[0].pack("i_")) - sockaddr = Socket.pack_sockaddr_in(port, addr[0][3]) - begin - sock.connect_nonblock(sockaddr) - rescue Errno::EINPROGRESS - resp = IO.select(nil, [sock], nil, timeout) - begin - sock.connect_nonblock(sockaddr) - rescue Errno::ECONNREFUSED - logger.warn("Connection refused to #{ host }:#{ port }") - sock = nil - rescue Errno::EISCONN - end - end - end - @socket = sock - Thread.new(sock) { |sock| write_loop(sock) } if sock - end - - # This is called from random client threads, but only within - # a @session.synchronized() block - def send_data(data) - @write_queue.push(data) - end - - # Since this runs in its very own thread - # we can use boring blocking IO - def write_loop(socket) - Thread.current[:name] = "ZooKeeper::RubyIO::WriteLoop" - begin - while socket - data = @write_queue.pop() - if socket.write(data) != data.length() - #TODO - will this really ever happen - logger.warn("Incomplete write!") - end - logger.debug { "Sending: " + data.unpack("H*")[0] } - end - logger.debug { "Write loop finished" } - rescue Exception => ex - logger.warn("Exception in write loop",ex) - disconnect() - end - - end - - def read_loop() - socket = @socket - ping = 0 - while socket # effectively forever - begin - data = socket.read_nonblock(1024) - logger.debug { "Received (#{data.length})" + data.unpack("H*")[0] } - receive_data(data) - ping = 0 - rescue IO::WaitReadable - select_result = IO.select([socket],[],[],@session.ping_interval) - unless select_result - ping += 1 - # two timeouts in a row mean we need to send a ping - case ping - when 1 ; @session.synchronize { @session.ping() } - when 2 - logger.debug{"No response to ping in #{@session.ping_interval}*2"} - break - end - end - rescue EOFError - logger.debug { "EOF reading from socket" } - break - rescue Exception => ex - logger.warn( "#{ex.class} exception in readloop",ex ) - break - end - end - disconnect() - end - - def disconnect() - socket = @socket - @socket = nil - socket.close if socket - rescue Exception => ex - #oh well - logger.debug("Exception closing socket",ex) - end - - # Protocol requirement - def receive_records(packet_io) - @session.synchronize { @session.receive_records(packet_io) } - end - - end #Class connection - - class Binding - include Slf4r::Logger - attr_reader :session - def self.available? - true - end - - def self.context(&context_block) - yield Thread - end - - def initialize() - @event_queue = Queue.new() - end - - def pop_event_queue() - queued = @event_queue.pop() - return false unless queued - logger.debug { "Invoking #{queued[0]}" } - callback,*args = queued - callback.call(*args) - logger.debug { "Completed #{queued[0]}" } - return true - rescue Exception => ex - logger.warn( "Exception in event thread", ex ) - return true - end - - def start(client,session) - @session = session - @session.extend(MonitorMixin) - - # start the event thread - @event_thread = Thread.new() do - Thread.current[:name] = "ZooKeeper::RubyIO::EventLoop" - - # In this thread, the current client is always this client! - Thread.current[ZooKeeper::CURRENT] = [client] - loop do - break unless pop_event_queue() - end - logger.debug { "Event thread finished" } - end - - # and the read thread - Thread.new() do - begin - Thread.current[:name] = "ZooKeeper::RubyIO::ReadLoop" - conn = session.synchronize { session.start(); session.conn() } # will invoke connect - loop do - break unless conn - conn.read_loop() - conn = session.synchronize { session.disconnected(); session.conn() } - end - #event of death - logger.debug { "Ending read loop, pushing nil (event of death) to event queue" } - @event_queue.push(nil) - rescue Exception => ex - logger.error( "Exception in session thread", ex ) - end - end - end - - # session callback, IO thread - def connect(host,port,delay,timeout) - sleep(delay) - conn = Connection.new(host,port,timeout,session) - session.synchronize() { session.prime_connection(conn) } - end - - - def close(&callback) - AsyncOp.new(self,callback) do |op| - session.synchronize { - session.close() { |error,response| op.resume(error,response) } - } - end - end - - def queue_request(*args,&callback) - AsyncOp.new(self,callback) do |op| - session.synchronize { - session.queue_request(*args) { |error,response| op.resume(error,response) } - } - end - end - - def event_thread? - Thread.current.equal?(@event_thread) - end - - def invoke(*args) - @event_queue.push(args) - end - - end #Binding - - class AsyncOp < ::ZooKeeper::AsyncOp - - def initialize(binding,callback,&op_block) - @mutex = Monitor.new - @cv = @mutex.new_cond() - @rubyio = binding - super(callback,&op_block) - end - - private - - attr_reader :mutex, :cv, :error, :result - - def process_resume(error,response) - logger.debug("Resuming with error #{error} #{response}") - mutex.synchronize do - @error,@result = process_response(error,response) - #if we're no longer resumed? then it means we have retried - #so we don't want to signal - cv.signal() if resumed? - end - end - - def wait_value() - if @rubyio.event_thread? - #Waiting in the event thread (eg made a synchronous call inside a callback) - #Keep processing events until we are resumed - until resumed? - break unless @rubyio.pop_event_queue() - end - else - mutex.synchronize { cv.wait() unless resumed? } - end - - raise error if error - result - end - end - -end #ZooKeeper::RubyIO -# Add our binding -ZooKeeper.add_binding(ZooKeeper::RubyIO::Binding) diff --git a/lib/zkruby/session.rb b/lib/zkruby/session.rb index 31168c2..a5ed280 100644 --- a/lib/zkruby/session.rb +++ b/lib/zkruby/session.rb @@ -1,8 +1,26 @@ require 'set' +require 'thread' +require 'monitor' +require 'zkruby/async_op' module ZooKeeper - # Represents an session that may span connections + # Represents a session that may span connect/disconnect + # + # @note this is a private API not intended for client use class Session + # There are multiple threads of execution involved in a session + # Client threads - send requests + # Connection/Read thread + # EventLoop - callback execution + # + # All client activity is synchronised on this session as a monitor + # The read thread synchronises with client activity + # only around connection/disconnection + # ie any changes to @session_state which always happens in + # conjunction with processing all entries in @pending_queue + # + # All interaction with the event loop occurs via a Queue + include MonitorMixin DEFAULT_TIMEOUT = 4 DEFAULT_CONNECT_DELAY = 0.2 @@ -16,24 +34,30 @@ class Session attr_reader :conn attr_accessor :watcher - def initialize(binding,addresses,options=nil) - - @binding = binding - + # @api zookeeper + # See {ZooKeeper.connect} + def initialize(addresses,options=nil) + super() @addresses = parse_addresses(addresses) parse_options(options) - # These are the server states - # :disconnected, :connected, :auth_failed, :expired - @keeper_state = nil + # our transaction id + @xid=0 - # Client state is - # :ready, :closing, :closed - @client_state = :ready + # The state of the connection, nil, :disconnected, :connected, :closed, :expired + @session_state = nil - @xid=0 + # The connection we'll send packets to + @conn = nil + + # The list of pending requests + # When disconnected this builds up a list to send through when we are connected + # When connected represents the order in which we expect to see responses @pending_queue = [] + # Client state is :ready, :closing, :closed + @client_state = :ready + # Create the watch list # hash by watch type of hashes by path of set of watchers @watches = [ :children, :data, :exists ].inject({}) do |ws,wtype| @@ -41,34 +65,14 @@ def initialize(binding,addresses,options=nil) ws end + # the default watcher @watcher = nil @ping_logger = Slf4r::LoggerFacade.new("ZooKeeper::Session::Ping") - - end - - def chroot(path) - return @chroot if path == "/" - return @chroot + path end - def unchroot(path) - return path unless path - path.slice(@chroot.length..-1) - end - - # close won't run your block if the connection is - # already closed, so this is how you can check - def closed? - @client_state == :closed - end - - # Connection API - testing whether to send a ping - def connected?() - @keeper_state == :connected - end - - # Connection API - Injects a new connection that is ready to receive records + # @api connection + # Injects a new connection that is ready to receive records # @param conn that responds to #send_records(record...) and #disconnect() def prime_connection(conn) @conn = conn @@ -77,96 +81,159 @@ def prime_connection(conn) reset_watches() end - - # Connection API - called when data is available, reads and processes one packet/event - # @param io + # @api connection + # called when data is available, reads and processes one packet/event + # @param [IO] io def receive_records(io) - case @keeper_state + case @session_state when :disconnected complete_connection(io) when :connected process_reply(io) else - logger.warn { "Receive packet for closed session #{@keeper_state}" } + logger.warn { "Receive packet for closed session #{@session_state}" } end end - # Connection API - called when no data has been received for #ping_interval + # @api connection + # Connection API - testing whether to send a ping + def connected?() + @session_state == :connected + end + + + # @api connection + # called when no data has been received for #ping_interval def ping() - if @keeper_state == :connected + if connected? ping_logger.debug { "Ping send" } hdr = Proto::RequestHeader.new(:xid => -2, :_type => 11) - conn.send_records(hdr) + conn.send_records(hdr) end end - # Connection API - called when the connection has dropped from either end + # TODO: Merge all this into a connect loop called from start def disconnected() - @conn = nil - logger.info { "Disconnected id=#{@session_id}, keeper=:#{@keeper_state}, client=:#{@client_state}" } + logger.info { "Disconnected id=#{@session_id}, keeper=:#{@session_state}, client=:#{@client_state}" } # We keep trying to reconnect until the session expiration time is reached - @disconnect_time = Time.now if @keeper_state == :connected + @disconnect_time = Time.now if @session_state == :connected time_since_first_disconnect = (Time.now - @disconnect_time) - if @client_state == :closed || time_since_first_disconnect > timeout - session_expired() + if @session_state == :closing + #We were expecting this disconnect + session_expired(:closed) + elsif time_since_first_disconnect > timeout + session_expired(:expired) else - # if we are connected then everything in the pending queue has been sent so - # we must clear - # if not, then we'll keep them and hope the next reconnect works - if @keeper_state == :connected + if @session_state == :connected + #first disconnect clear_pending_queue(:disconnected) invoke_watch(@watcher,KeeperState::DISCONNECTED,nil,WatchEvent::NONE) if @watcher + @conn = nil end - @keeper_state = :disconnected - reconnect() end end - # Start the session - called by the ProtocolBinding - def start() - raise ProtocolError, "Already started!" unless @keeper_state.nil? - @keeper_state = :disconnected + # @api zookeeper + # See {ZooKeeper.connect} + def start(client) + raise ProtocolError, "Already started!" unless @session_state.nil? + @session_state = :disconnected @disconnect_time = Time.now - logger.debug("Starting new zookeeper client session") - reconnect() + logger.debug {"Starting new zookeeper client session for #{client}"} + @event_loop = EventLoop.new(client) + # This is the read/connect thread + Thread.new { + Thread.current[:name] = "ZK::Session #{self}" + reconnect() + while active? + delay = rand() * @max_connect_delay + sleep(delay) + reconnect() + end + logger.debug {"Session #{self} complete" } + } end - def queue_request(request,op,opcode,response=nil,watch_type=nil,watcher=nil,ptype=Packet,&callback) - raise Error.SESSION_EXPIRED, "Session expired due to client state #{@client_state}" unless @client_state == :ready - watch_type, watcher = resolve_watcher(watch_type,watcher) - xid = next_xid + # @api client + def chroot(path) + return @chroot if path == "/" + return @chroot + path + end - packet = ptype.new(xid,op,opcode,request,response,watch_type,watcher, callback) + # @api client + def unchroot(path) + return path unless path + path.slice(@chroot.length..-1) + end - queue_packet(packet) + # @api client + def request(*args,&callback) + AsyncOp.new(@event_loop,callback) do |op| + queue_request(*args) do |error,response| + op.resume(error,response) + end + end end + # @api client def close(&callback) - case @client_state - when :ready - # we keep the requested block in a close packet - @close_packet = ClosePacket.new(next_xid(),:close,-11,nil,nil,nil,nil,callback) - close_packet = @close_packet - @client_state = :closing - - # If there are other requests in flight, then we wait for them to finish - # before sending the close packet since it immediately causes the socket - # to close. - queue_close_packet_if_necessary() - @close_packet - when :closed, :closing - raise ProtocolError, "Already closed" - else - raise ProtocolError, "Unexpected state #{@client_state}" + AsyncOp.new(@event_loop,callback) do |op| + close_session() do |error,response| + op.resume(error,response) + end end end private + def active? + [:connected,:disconnected].include?(@session_state) + end + + def calculate_timeouts() + @ping_interval = timeout * 2.0/7.0 + @connect_timeout = timeout / 2.0 + end + + def queue_request(request,op,opcode,response=nil,watch_type=nil,watcher=nil,ptype=Packet,&callback) + synchronize do + raise ProtocolError, "Client closed #{@client_state}" unless @client_state == :ready + raise Error.SESSION_EXPIRED, "Session has expired #{@session_state}" unless active? + watch_type, watcher = resolve_watcher(watch_type,watcher) + + xid = next_xid + + packet = ptype.new(xid,op,opcode,request,response,watch_type,watcher, callback) + + queue_packet(packet) + end + end + + def close_session(&callback) + synchronize do + if @client_state == :ready + if active? + # we keep the requested block in a close packet but we don't send it + # until we've received all pending reponses + @close_packet = ClosePacket.new(next_xid(),:close,-11,nil,nil,nil,nil,callback) + + # but we can force a response by sending a ping + ping() + + else + # We've already expired put the close callback on the event loop + @event_loop.invoke_close(callback,nil,true) + end + @client_state = :closed + else + raise ProtocolError, "Client already #{@client_state}" + end + end + end + attr_reader :watches - attr_reader :binding def parse_addresses(addresses) case addresses @@ -194,50 +261,51 @@ def parse_address(address) def parse_options(options) @timeout = options.fetch(:timeout,DEFAULT_TIMEOUT) + calculate_timeouts() @max_connect_delay = options.fetch(:connect_delay,DEFAULT_CONNECT_DELAY) - @connect_timeout = options.fetch(:connect_timeout,@timeout * 1.0 / 7.0) @scheme = options.fetch(:scheme,nil) @auth = options.fetch(:auth,nil) @chroot = options.fetch(:chroot,"").chomp("/") end def reconnect() - #Rotate address host,port = @addresses.shift @addresses.push([host,port]) - - delay = rand() * @max_connect_delay - - logger.debug { "Connecting id=#{@session_id} to #{host}:#{port} with delay=#{delay}, timeout=#{@connect_timeout}" } - binding.connect(host,port,delay,@connect_timeout) + logger.debug { "Connecting id=#{@session_id} to #{host}:#{port} with timeout=#{@connect_timeout} #{ZooKeeperBinding.inspect}" } + begin + ZooKeeperBinding.connect(self,host,port,@connect_timeout) + rescue Exception => ex + logger.warn("Exception in connect loop", ex) + ensure + disconnected() + end end - def session_expired(reason=:expired) - clear_pending_queue(reason) - - invoke_response(*@close_packet.error(reason)) if @close_packet - - if @client_state == :closed - logger.info { "Session closed id=#{@session_id}, keeper=:#{@keeper_state}, client=:#{@client_state}" } + if reason == :closed + logger.info { "Session closed id=#{@session_id}, keeper=:#{@session_state}, client=:#{@client_state}" } else - logger.warn { "Session expired id=#{@session_id}, keeper=:#{@keeper_state}, client=:#{@client_state}" } + logger.warn { "Session expired reason=#{reason} id=#{@session_id}, keeper=:#{@session_state}, client=:#{@client_state}" } end + clear_pending_queue(reason) + #TODO Clients will want to distinguish between EXPIRED and CLOSED invoke_watch(@watcher,KeeperState::EXPIRED,nil,WatchEvent::NONE) if @watcher - @keeper_state = reason - @client_state = :closed + @event_loop.stop() end def complete_connection(response) result = Proto::ConnectResponse.read(response) if (result.time_out <= 0) #We're dead! - session_expired() + session_expired() else @timeout = result.time_out.to_f / 1000.0 - @keeper_state = :connected + calculate_timeouts() + @session_id = result.session_id + @session_passwd = result.passwd + logger.info { "Connected session_id=#{@session_id}, timeout=#{@timeout}, ping=#{@ping_interval}" } # Why 2 / 7 of the timeout?. If a binding sees no server response in this period it is required to # generate a ping request @@ -245,14 +313,14 @@ def complete_connection(response) # so we are already more than half way through the session timeout # and we need to give ourselves time to reconnect to another server @ping_interval = @timeout * 2.0 / 7.0 - @session_id = result.session_id - @session_passwd = result.passwd - logger.info { "Connected session_id=#{@session_id}, timeout=#{@time_out}, ping=#{@ping_interval}" } - logger.debug { "Sending #{@pending_queue.length} queued packets" } - @pending_queue.each { |p| send_packet(p) } + synchronize do + logger.debug { "Sending #{@pending_queue.length} queued packets" } + @session_state = :connected + @pending_queue.each { |p| send_packet(p) } + send_close_packet_if_necessary() + end - queue_close_packet_if_necessary() invoke_watch(@watcher,KeeperState::CONNECTED,nil,WatchEvent::NONE) if @watcher end end @@ -262,7 +330,6 @@ def send_session_connect() req.last_zxid_seen = @last_zxid_seen if @last_zxid_seen req.session_id = @session_id if @session_id req.passwd = @session_passwd if @session_passwd - conn.send_records(req) end @@ -297,6 +364,7 @@ def process_reply(packet_io) case header.xid.to_i when -2 ping_logger.debug { "Ping reply" } + send_close_packet_if_necessary() when -4 logger.debug { "Auth reply" } session_expired(:auth_failed) unless header.err.to_i == 0 @@ -313,21 +381,25 @@ def process_reply(packet_io) # A normal packet reply. They should come in the order we sent them # so we just match it to the packet at the front of the queue packet = @pending_queue.shift + + if packet == nil && @close_packet + packet = @close_packet + @close_packet = nil + @session_state = :closing + end + logger.debug { "Packet reply: #{packet.inspect}" } if (packet.xid.to_i != header.xid.to_i) - logger.error { "Bad XID! expected=#{packet.xid}, received=#{header.xid}" } - # Treat this like a dropped connection, and then force the connection # to be dropped. But wait for the connection to notify us before # we actually update our keeper_state invoke_response(*packet.error(:disconnected)) - @conn.disconnect() + raise ProtocolError, "Bad XID. expected=#{packet.xid}, received=#{header.xid}" else - @last_zxid_seen = header.zxid - + callback, error, response, watch_type = packet.result(header.err.to_i) invoke_response(callback, error, response, packet_io) @@ -335,11 +407,11 @@ def process_reply(packet_io) @watches[watch_type][packet.path] << packet.watcher logger.debug { "Registered #{packet.watcher} for type=#{watch_type} at #{packet.path}" } end - queue_close_packet_if_necessary() + send_close_packet_if_necessary() end end - end + end def process_watch_notification(state,path,event) @@ -348,17 +420,16 @@ def process_watch_notification(state,path,event) keeper_state = KeeperState.fetch(state) - watches = watch_types.inject(Set.new()) do | result, watch_type | + watches = watch_types.inject(Set.new()) do |result, watch_type| more_watches = @watches[watch_type].delete(path) - result.merge(more_watches) if more_watches - result + result.merge(more_watches) if more_watches + result end if watches.empty? logger.warn { "Received notification for unregistered watch #{state} #{path} #{event}" } end watches.each { | watch | invoke_watch(watch,keeper_state,path,watch_event) } - end def invoke_watch(watch,state,path,event) @@ -368,41 +439,50 @@ def invoke_watch(watch,state,path,event) elsif watch.respond_to?(:call) callback = watch else - raise ProtocolError("Bad watcher #{watch}") + logger.error("Bad watcher #{watch}") end - - binding.invoke(callback,state,unchroot(path),event) + @event_loop.invoke(callback,state,unchroot(path),event) end def clear_pending_queue(reason) - @pending_queue.each { |p| invoke_response(*p.error(reason)) } - @pending_queue.clear + synchronize do + @session_state = reason + @pending_queue.each { |p| invoke_response(*p.error(reason)) } + @pending_queue.clear() + if @close_packet + invoke_response(*@close_packet.error(reason)) + @close_packet = nil + end + end end - def queue_close_packet_if_necessary - if @pending_queue.empty? && @keeper_state == :connected && @close_packet + def send_close_packet_if_necessary + # We don't need to synchronize this because the creation of + # the close packet was synchronized and after that all + # client requests are rejected + # we can receive watch and ping notifications after this + # but the server drops the connection as soon as this + # packet is received + if @pending_queue.empty? && @session_state == :connected && @close_packet logger.debug { "Sending close packet!" } - @client_state = :closed - queue_packet(@close_packet) - @close_packet = nil + send_packet(@close_packet) end end def invoke_response(callback,error,response,packet_io = nil) if callback - result = if error nil - elsif response.respond_to?(:read) && packet_io - response.read(packet_io) - elsif response - response - else - nil - end + elsif response.respond_to?(:read) && packet_io + response.read(packet_io) + elsif response + response + else + nil + end logger.debug { "Invoking response cb=#{callback} err=#{error} resp=#{result}" } - binding.invoke(callback,error,result) + @event_loop.invoke(callback,error,result) end end @@ -421,12 +501,10 @@ def resolve_watcher(watch_type,watcher) [watch_type,watcher] end - def queue_packet(packet) + logger.debug { "Queuing: #{packet.inspect}" } @pending_queue.push(packet) - logger.debug { "Queued: #{packet.inspect}" } - - if @keeper_state == :connected + if @session_state == :connected send_packet(packet) end end @@ -441,6 +519,69 @@ def send_packet(packet) conn.send_records(*records) end + class EventLoop + include Slf4r::Logger + + def initialize(client) + @event_queue = Queue.new() + + @alive = true + @event_thread = Thread.new() do + logger.debug { "Starting event loop" } + Thread.current[:name] = "ZK::EventLoop #{self}" + Thread.current[CURRENT] = [ client ] + begin + pop_event_queue until dead? + logger.info { "Finished event loop" } + rescue Exception => ex + logger.error("Uncaught exception in event loop",ex) + end + end + end + + def dead? + !@alive + end + + # @api async_op + def pop_event_queue + #We're alive until we get a nil result from #stop + logger.debug { "Popping event queue" } + queued = @alive ? @event_queue.pop : nil + if queued + begin + callback,*args = queued + callback.call(*args) + rescue StandardError => ex + logger.error("Uncaught error in async callback", ex) + end + else + @alive = false + end + end + + # @api session + def invoke(*args) + @event_queue.push(args) + end + + def invoke_close(callback,*args) + Thread.new do + @event_thread.join() + callback.call(*args) + end + end + # @api session + def stop + @event_queue.push(nil) + @event_thread.join() + end + + # @api async_op + def current? + Thread.current == @event_thread + end + end end # Session end diff --git a/lib/zkruby/socket.rb b/lib/zkruby/socket.rb new file mode 100644 index 0000000..19bb7eb --- /dev/null +++ b/lib/zkruby/socket.rb @@ -0,0 +1,66 @@ +require 'socket' + +class Socket + + HAS_NONBLOCKING_CONNECT = RUBY_PLATFORM != "java"|| Gem::Version.new(JRUBY_VERSION.dup) >= Gem::Version.new("1.6.7") + + def self.tcp_connect_timeout(host,port,timeout = 0) + + if HAS_NONBLOCKING_CONNECT && timeout > 0 + # TODO: This is a blocking DNS lookup!!!!, possibly even a reverse lookup if host is a numberic address + addr = self.getaddrinfo(host, nil) + sock = Socket.new(self.const_get(addr[0][0]), Socket::SOCK_STREAM, 0) + sockaddr = Socket.pack_sockaddr_in(port, addr[0][3]) + + begin + sock.connect_nonblock(sockaddr) + return sock + rescue Errno::EINPROGRESS + begin + #Note: JRuby raises Connection Refused instead of populating error array + read,write,errors = Socket.select(nil, [sock], [sock], timeout) + optval = sock.getsockopt(Socket::SOL_SOCKET,Socket::SO_ERROR) + sockerr = (optval.unpack "i")[0] + end + + case sockerr + when 0 # Errno::NOERROR::Errno + begin + sock.connect_nonblock(sockaddr) + return sock + rescue Errno::EISCONN + #Woohoo! we're connected + return sock + end + when Errno::EINPROGRESS + # must be a timeout + logger.warn{"Connect timeout to #{host}:#{port}"} + return nil + when Errno::ECONNREFUSED::Errno + raise Errno::ECONNREFUSED, "Connection refused to #{ host }:#{ port }" + else + raise Errno::ENOTCONN, "Connection to #{ host }:#{ port } failed: #{sockerr}" + end + end + else + # JRuby prior to 1.6.7 cannot do non-blocking connects, which means there is + # no way to properly implement the connection-timeout + # See http://jira.codehaus.org/browse/JRUBY-5165 + # In any case this should be encapsulated in TCPSocket.open(host,port,timeout) + self.tcp(host,port) + end + + end + + def read_timeout(maxlen,timeout) + begin + return read_nonblock(maxlen) + rescue IO::WaitReadable + selected = IO.select([self],[],[],timeout) + return nil unless selected + retry + end + end +end + + diff --git a/lib/zkruby/version.rb b/lib/zkruby/version.rb new file mode 100644 index 0000000..52ab508 --- /dev/null +++ b/lib/zkruby/version.rb @@ -0,0 +1,5 @@ + +module ZooKeeper + # Major/Minor numbers track zookeeper itself, final digit is our build number + VERSION = "3.4.4.rc4" +end diff --git a/lib/zkruby/zkruby.rb b/lib/zkruby/zkruby.rb index 785dde0..0e1d052 100644 --- a/lib/zkruby/zkruby.rb +++ b/lib/zkruby/zkruby.rb @@ -4,12 +4,6 @@ # than calling the zk client libraries # module ZooKeeper - # Major/Minor numbers track zookeeper itself, final digit is our build number - VERSION = "3.4.4" - @bindings = [] - def self.add_binding(binding) - @bindings << binding unless @bindings.include?(binding) - end end # Shorthand @@ -21,6 +15,7 @@ def self.add_binding(binding) require 'jute/zookeeper' require 'zkruby/multi' require 'zkruby/protocol' +require 'zkruby/conn' require 'zkruby/session' require 'zkruby/client' # Utilities diff --git a/spec/enum_spec.rb b/spec/enum_spec.rb index e375a93..5c40b28 100644 --- a/spec/enum_spec.rb +++ b/spec/enum_spec.rb @@ -24,6 +24,8 @@ class TestError < StandardError t.should === 1 end + it "should do coercion" + it "should be useable in a mock" do m = mock("watchevent") m.should_receive(:test).with(TestEnum::TWO) diff --git a/spec/eventmachine_spec.rb b/spec/eventmachine_spec.rb index 4f79fe5..2257727 100644 --- a/spec/eventmachine_spec.rb +++ b/spec/eventmachine_spec.rb @@ -1,50 +1,21 @@ require 'server_helper' -require 'shared/binding' require 'zkruby/eventmachine' +require 'shared/binding' -module EMHelper - alias :restart_cluster_orig :restart_cluster - def restart_cluster(delay=0) - if EM.reactor_running? - cv = Strand::ConditionVariable.new() - op = Proc.new do - begin - restart_cluster_orig(delay) - rescue Exception => ex - logger.error ("Exception restarting cluster #{ex}") - end - true - end - cb = Proc.new { |result| cv.signal() } - defer = EM.defer(op,cb) - cv.wait() - else - restart_cluster_orig(delay) - end - end +Empathy.empathise(ZooKeeper) - def sleep(delay) - Strand.sleep(delay) - end -end - -describe ZooKeeper::EventMachine::Binding do +describe Empathy::EM::ZooKeeperBinding do include Slf4r::Logger - include EMHelper around(:each) do |example| - EventMachine.run { - Strand.new() do - begin - example.run - ensure - EM::stop - end - end - } + Empathy.run { example.run } end - it_should_behave_like "a zookeeper client binding" + it "should be running in event machine" do + Empathy.event_machine?.should be_true + end + let (:pass_every) { 3 } + it_should_behave_like "a zookeeper client binding" end diff --git a/spec/rubyio_spec.rb b/spec/rubyio_spec.rb index 1a12d19..a3c8819 100644 --- a/spec/rubyio_spec.rb +++ b/spec/rubyio_spec.rb @@ -1,8 +1,8 @@ require 'server_helper' require 'shared/binding' -require 'zkruby/rubyio' -describe ZooKeeper::RubyIO::Binding do +describe ZooKeeperBinding do + let (:pass_every) { nil } it_behaves_like "a zookeeper client binding" end diff --git a/spec/server_helper.rb b/spec/server_helper.rb index 887f2a9..1fe8acf 100644 --- a/spec/server_helper.rb +++ b/spec/server_helper.rb @@ -1,22 +1,25 @@ require 'spec_helper' -module ZooKeeperServerHelper +module ZooKeeperServerManager include Slf4r::Logger + JRUBY_COMPAT_SYSTEM = (RUBY_PLATFORM == "java" && Gem::Version.new(JRUBY_VERSION.dup) < Gem::Version.new("1.6.5")) + + def jruby_safe_system(arg) + arg = "#{arg} &" if JRUBY_COMPAT_SYSTEM + system(arg) + sleep(3) if JRUBY_COMPAT_SYSTEM + end + def restart_cluster(delay=0) - system("../../bin/zkServer.sh stop >> zk.out") - Kernel::sleep(delay) if delay > 0 - if (::RUBY_PLATFORM == "java") - #in JRuby 1.6.3 system does not return - system("../../bin/zkServer.sh start >> zk.out &") - else - system("../../bin/zkServer.sh start >> zk.out") - end + jruby_safe_system("../../bin/zkServer.sh stop >> zk.out") + sleep(delay) if delay > 0 + jruby_safe_system("../../bin/zkServer.sh start >> zk.out") end def get_addresses() - "localhost:2181" + "127.0.0.1:2181" end def safe_close(zk) @@ -31,7 +34,7 @@ def connect(options = {}) end -include ZooKeeperServerHelper +include ZooKeeperServerManager restart_cluster() sleep(3) @@ -42,5 +45,5 @@ def connect(options = {}) RSpec.configure do |c| #Exclude multi unless we are on a 3.4 server c.filter_run_excluding :multi => true unless properties - c.filter_run_excluding :perf => true + #c.filter_run_excluding :perf => true end diff --git a/spec/shared/basic.rb b/spec/shared/basic.rb index d55aacc..9716496 100644 --- a/spec/shared/basic.rb +++ b/spec/shared/basic.rb @@ -4,8 +4,10 @@ @zk.create("/zkruby","node for zk ruby testing",ZK::ACL_OPEN_UNSAFE) unless @zk.exists?("/zkruby") end + context("normal functions") do it "should return a stat for the root path" do - stat = @zk.stat("/") + + stat = @zk.stat("/") stat.should be_a ZooKeeper::Data::Stat end @@ -40,7 +42,7 @@ @zk.delete("/zkruby/rspec",-1) @zk.exists?("/zkruby/rspec").should be_false end - + end context "exceptions" do it "should raise ZK::Error for synchronous method" do @@ -52,6 +54,7 @@ # only because JRuby 1.9 doesn't support the === syntax for exceptions ZooKeeper::Error::NO_NODE.should === ex ex.message.should =~ /\/anunknownpath/ + ex.message.should =~ /no_node/ skip = if defined?(JRUBY_VERSION) then 2 else 1 end ex.backtrace[skip..-1].should == get_caller end @@ -67,11 +70,13 @@ rescue ZooKeeper::Error => ex ZooKeeper::Error::NO_NODE.should === ex ex.message.should =~ /\/an\/unknown\/path/ + ex.message.should =~ /no_node/ ex.backtrace[1..-1].should == get_caller end end it "should call the error call back for asynchronous errors" do + get_caller = caller op = @zk.get("/an/unknown/path") do :callback_invoked_unexpectedly end @@ -79,6 +84,9 @@ op.on_error do |err| case err when ZK::Error::NO_NODE + err.message.should =~ /\/an\/unknown\/path/ + err.message.should =~ /no_node/ + err.backtrace[1..-1].should == get_caller :found_no_node_error else raise err @@ -175,6 +183,7 @@ end it "should handle a synchronous call inside an asynchronous callback" do + ZK.current.should equal(@zk) op = @zk.create("/zkruby/sync_async","somedata",ZK::ACL_OPEN_UNSAFE) do ZK.current.should equal(@zk) stat, data = @zk.get("/zkruby/sync_async") diff --git a/spec/shared/performance.rb b/spec/shared/performance.rb index bae764c..8aa1535 100644 --- a/spec/shared/performance.rb +++ b/spec/shared/performance.rb @@ -5,20 +5,19 @@ it "should create and retrieve lots of nodes in a reasonable amount of time" do path = "/zkruby/rspec-perf" + + @zk.mkpath(path) + op = nil start = Time.now count = 0 - pass_every = 10 first_error = true 6000.times do count += 1 this_index = count op = @zk.create("#{path}/","hello", ZK::ACL_OPEN_UNSAFE,:sequential,:ephemeral) { } op.on_error { |ex| puts "Error @ #{this_index}" if first_error; first_error = false } - if count % pass_every == 0 - #puts "Passing @ #{count}" - ZK.pass - end + ZK.pass if pass_every && count % pass_every == 0 end op.value @@ -31,10 +30,7 @@ children.each do |child| op = @zk.get("#{path}/#{child}") { } count += 1 - if count % pass_every == 0 - #puts "Passing @ #{count}" - ZK.pass - end + ZK.pass if pass_every && count % pass_every == 0 end op.value diff --git a/spec/shared/watch.rb b/spec/shared/watch.rb index 9d670dc..d3f545b 100644 --- a/spec/shared/watch.rb +++ b/spec/shared/watch.rb @@ -18,7 +18,7 @@ stat,data = @zk.get(path,watch) # set the data on the 2nd session @zk2.set(path,"newdata",stat.version) - sleep(5) + sleep(2) watch_results.size().should == 1 watch_results[0][1].should == path watch_results[0][2].should === :node_data_changed @@ -31,7 +31,7 @@ stat,children = @zk.children("/zkruby",watch) path = @zk2.create("/zkruby/rspec_watch","somedata",ZK::ACL_OPEN_UNSAFE,:ephemeral,:sequential) - sleep(5) + sleep(2) watch_results.size().should == 1 watch_results[0][1].should == "/zkruby" watch_results[0][2].should === :node_children_changed diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 66b1ec1..b744319 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -1,12 +1,15 @@ require 'slf4r/logging_logger' require 'zkruby/zkruby' -Logging.logger.root.level = :error -Logging.logger.root.appenders = Logging.appenders.stdout(:layout => Logging.layouts.pattern(:pattern => '%c [%T] %-5l: %m\n')) -#Logging.logger[ZooKeeper::RubyIO::Connection].level = :error +Logging.logger.root.level = :warn +Logging.logger.root.appenders = Logging.appenders.stdout(:layout => Logging.layouts.pattern(:pattern => '%r %c [%T] %-5l: %m\n')) +#Logging.logger[ZooKeeper::Connection].level = :debug +#Logging.logger['ZooKeeper::EventMachine::Connection'].level = :debug #Logging.logger["ZooKeeper::RubyIO::Binding"].level = :debug #Logging.logger[ZooKeeper::Session].level = :debug +#Logging.logger[ZooKeeper::AsyncOp].level = :debug #Logging.logger["ZooKeeper::EventMachine::ClientConn"].level = :debug #Logging.logger["ZooKeeper::Session::Ping"].level = :error Thread.current[:name] = "Rspec::Main" + diff --git a/zkruby.gemspec b/zkruby.gemspec new file mode 100644 index 0000000..a5d2be7 --- /dev/null +++ b/zkruby.gemspec @@ -0,0 +1,39 @@ +# -*- encoding: utf-8 -*- +$:.push File.expand_path("../lib", __FILE__) + +require "zkruby/version" + +Gem::Specification.new do |s| + s.name = "zkruby" + s.version = ZooKeeper::VERSION + s.platform = Gem::Platform::RUBY + s.authors = ["Grant Gardner"] + s.email = ["grant@lastweekend.com.au"] + s.homepage = "http://rubygems.org/gems/zkruby" + s.summary = %q{Pure Ruby language binding for ZooKeeper} + s.description = %q{Supports full ZooKeeper API, synchronous or asynchronous style, watches etc.. with implementations over EventMachine or plain old Ruby IO/Threads} + + s.files = `git ls-files`.split("\n") + s.files << 'lib/jute/zookeeper.rb' + s.test_files = `git ls-files -- {spec}/*`.split("\n") + s.require_paths = ["lib"] + + # Yard options in .yardopts + + s.add_dependency 'slf4r' , '~> 0.4.2' + s.add_dependency 'bindata', '~> 1.4.1' + + s.add_development_dependency 'eventmachine', '>= 0.12.10' + s.add_development_dependency 'empathy', '>=0.1.0' + s.add_development_dependency 'logging', '>= 1.4.1' + s.add_development_dependency 'ruby-prof' + + s.add_development_dependency("rake") + s.add_development_dependency("rspec") + s.add_development_dependency("yard") + s.add_development_dependency("kramdown") + + # s.add_development_dependency("jute") + s.add_development_dependency "citrus" , '~> 2.4.0' + +end