Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

wow, better implementation (use unshift, h/t: @eric)

  • Loading branch information...
commit b3bf64d4aaf0ddb23923a2866aeb011d13033ec7 1 parent 5ed4562
@slyphon slyphon authored
Showing with 23 additions and 23 deletions.
  1. +9 −20 ext/c_zookeeper.rb
  2. +14 −3 lib/zookeeper/continuation.rb
View
29 ext/c_zookeeper.rb
@@ -181,12 +181,15 @@ def wait_until_connected(timeout=10)
# blocks the caller until result has returned
def submit_and_block(meth, *args)
cnt = Continuation.new(meth, *args)
- @reg.synchronized do |r|
+ @reg.lock
+ begin
if meth == :state
- r.state_check << cnt
+ @reg.pending.unshift(cnt)
else
- r.pending << cnt
+ @reg.pending << cnt
end
+ ensure
+ @reg.unlock rescue nil
end
wake_event_loop!
cnt.value
@@ -263,28 +266,14 @@ def event_thread_body
end
def submit_pending_calls
- # this is ok, because the calling thread only ever *adds* to this hash,
- # and the keys are always unique
-
- calls = nil
-
- @reg.lock
- begin
- calls = @reg.state_check + @reg.pending
- @reg.state_check.clear
- @reg.pending.clear
- ensure
- @reg.unlock rescue nil
- end
-
- # ok, do state checks right here, they're synchronous anyway
+ calls = @reg.next_batch()
return if calls.empty?
while cntn = calls.shift
cntn.submit(self)
- if cntn.req_id # state checks will not have a req_id
- @reg.in_flight[cntn.req_id] = cntn # in_flight is only ever touched by us
+ if req_id = cntn.req_id # state checks will not have a req_id
+ @reg.in_flight[req_id] = cntn # in_flight is only ever touched by us
end
end
end
View
17 lib/zookeeper/continuation.rb
@@ -12,13 +12,13 @@ class Continuation
# `state_check` are high-priority checks that query the connection about
# its current state, they always run before other continuations
#
- class Registry < Struct.new(:pending, :state_check, :in_flight)
+ class Registry < Struct.new(:pending, :in_flight)
extend Forwardable
def_delegators :@mutex, :lock, :unlock
def initialize
- super([], [], {})
+ super([], {})
@mutex = Mutex.new
end
@@ -33,7 +33,18 @@ def synchronized
# does not lock the mutex, returns true if there are pending jobs
def anything_to_do?
- (pending.length + state_check.length) > 0
+ !pending.empty?
+ end
+
+ # returns the pending continuations, resetting the list
+ # this method is synchronized
+ def next_batch()
+ @mutex.lock
+ begin
+ pending.slice!(0,pending.length)
+ ensure
+ @mutex.unlock rescue nil
+ end
end
end # Registry
Please sign in to comment.
Something went wrong with that request. Please try again.