Permalink
Browse files

Refactor strand usage for eventmachine binding

Implementation now works like rubyio, use a single
event strand to process callbacks/watchers instead
of separate strands per event
Old model was broken for nested synchronous event
This model is much more performant
  • Loading branch information...
1 parent 38de7f5 commit 98ff620ce1538e1118829c4543ca3b7c17ad9bb1 @lwoggardner committed Mar 5, 2012
Showing with 42 additions and 25 deletions.
  1. +41 −24 lib/zkruby/eventmachine.rb
  2. +1 −1 spec/shared/basic.rb
View
65 lib/zkruby/eventmachine.rb
@@ -96,9 +96,33 @@ def self.context(&context_block)
def start(client,session)
@client = client
@session = session
+
+ @event_strand = Strand.new do
+ Strand.current[ZooKeeper::CURRENT] = [ @client ]
+ loop do
+ break unless pop_event_queue
+ end
+ logger.debug { "Event strand finished"}
+ end
@session.start()
end
+ def pop_event_queue()
+ #TODO - use Strand.yield when Strand is updated
+ queued = Fiber.yield
+ return false unless queued
+ callback,*args = queued
+ callback.call(*args)
+ return true
+ rescue Exception => ex
+ logger.error("Exception in event strand", ex)
+ #TODO - should this be raised?
+ end
+
+ def event_strand?()
+ Strand.current.equal?(@event_strand)
+ end
+
def connect(host,port,delay,timeout)
EM.add_timer(delay) do
EM.connect(host,port,ZooKeeper::EventMachine::ClientConn,@session,timeout)
@@ -107,7 +131,7 @@ def connect(host,port,delay,timeout)
# You are working in event machine it is up to you to ensure your callbacks do not block
def invoke(callback,*args)
- callback.call(*args)
+ @event_strand.fiber.resume(callback,*args)
end
def queue_request(*args,&callback)
@@ -132,41 +156,34 @@ class AsyncOp < ZooKeeper::AsyncOp
def initialize(binding,callback,&operation)
@em_binding = binding
+ @cv = Strand::ConditionVariable.new()
super(callback,&operation)
end
private
+ attr_reader :cv,:error,:result
def process_resume(error,response)
- if @strand
- @strand.fiber.resume(error,response)
- else
- @error,@result = process_response(error,response)
- end
+ @error,@result = process_response(error,response)
+ cv.signal() if resumed?
end
def wait_value()
- if resumed?
- raise @error if @error
- @result
- else
- # Start a new strand and wait for it
- @strand = Strand.new do
- Strand.current[ZooKeeper::CURRENT] = [ @em_binding.client ]
-
- #if we are not resumed then it means we've been retried during
- #processing of the callback or rescue handlers
- until resumed? do
- error, response = Strand.yield
- error, result = process_response(error,response)
- end
- raise error if error
- result
+ if @em_binding.event_strand?
+ until resumed?
+ break unless @em_binding.pop_event_queue()
end
- @strand.value
+
+ #TODO there's a problem if we have not been resumed
+ #and the event strand it dead.
+ logger.error { "Not resumed and event strand is dead" } unless resumed?
+ else
+ cv.wait() unless resumed?
end
-
+
+ raise error if error
+ result
end
end #class AsyncOp
View
2 spec/shared/basic.rb
@@ -152,7 +152,7 @@
watcher.should_receive(:process_watch).with(ZK::KeeperState::CONNECTED,nil,ZK::WatchEvent::NONE)
watcher.should_not_receive(:process_watch).with(ZK::KeeperState::EXPIRED,nil,ZK::WatchEvent::NONE)
@zk.watcher = watcher
- restart_cluster(2)
+ restart_cluster(1.5)
@zk.exists?("/zkruby").should be_true
end

0 comments on commit 98ff620

Please sign in to comment.