Browse files

Separate continuations for basic.get operations

Helps support edge cases when blocking operations (e.g. basic.get) are
executed in a tight loop and network recovery has to happen.
  • Loading branch information...
1 parent a136114 commit 89055e5562d66a2535e02e7b7e6acbd07e7009d3 Michael Klishin committed Jan 17, 2013
View
2 examples/connection/automatic_recovery_with_basic_get.rb
@@ -24,7 +24,9 @@
x.publish(body, :routing_key => ["abc", "def"].sample)
sleep 1.5
+ puts "Before q.pop"
_, _, payload = q.pop
+ puts "After q.pop"
if payload
puts "Consumed #{payload}"
else
View
9 examples/connection/automatic_recovery_with_client_named_queues.rb
@@ -22,7 +22,10 @@
end
loop do
- sleep 3
- puts "Tick"
- x.publish(rand.to_s, :routing_key => ["abc", "def", "ghi", "xyz"].sample)
+ sleep 2
+ data = rand.to_s
+ rk = ["abc", "def"].sample
+
+ puts "Published #{data}, routing key: #{rk}"
+ x.publish(data, :routing_key => rk)
end
View
9 examples/connection/automatic_recovery_with_server_named_queues.rb
@@ -22,7 +22,10 @@
end
loop do
- sleep 3
- puts "Tick"
- x.publish(rand.to_s, :routing_key => ["abc", "def", "ghi", "xyz"].sample)
+ sleep 2
+ data = rand.to_s
+ rk = ["abc", "def"].sample
+
+ puts "Published #{data}, routing key: #{rk}"
+ x.publish(data, :routing_key => rk)
end
View
111 lib/bunny/channel.rb
@@ -169,15 +169,26 @@ def initialize(connection = nil, id = nil, work_pool = ConsumerWorkPool.new(1))
@unconfirmed_set_mutex = Mutex.new
- @continuations = ::Queue.new
- @confirms_continuations = ::Queue.new
+ @continuations = ::Queue.new
+ @confirms_continuations = ::Queue.new
+ @basic_get_continuations = ::Queue.new
+ # threads awaiting on continuations. Used to unblock
+ # them when network connection goes down so that busy loops
+ # that perform synchronous operations can work. MK.
+ @threads_waiting_on_continuations = Set.new
+ @threads_waiting_on_confirms_continuations = Set.new
+ @threads_waiting_on_basic_get_continuations = Set.new
@next_publish_seq_no = 0
end
# Opens the channel and resets its internal state
# @return [Bunny::Channel] Self
def open
+ @threads_waiting_on_continuations = Set.new
+ @threads_waiting_on_confirms_continuations = Set.new
+ @threads_waiting_on_basic_get_continuations = Set.new
+
@connection.open_channel(self)
# clear last channel error
@last_channel_error = nil
@@ -517,7 +528,7 @@ def basic_get(queue, opts = {:ack => true})
raise_if_no_longer_open!
@connection.send_frame(AMQ::Protocol::Basic::Get.encode(@id, queue, !opts[:ack]))
- @last_basic_get_response = wait_on_continuations
+ @last_basic_get_response = wait_on_basic_get_continuations
raise_if_continuation_resulted_in_a_channel_error!
@last_basic_get_response
@@ -950,22 +961,6 @@ def confirm_select(callback=nil)
@last_confirm_select_ok
end
- def wait_on_continuations
- unless @connection.threaded
- connection.event_loop.run_once until @continuations.length > 0
- end
-
- @continuations.pop
- end
-
- def wait_on_confirms_continuations
- unless @connection.threaded
- connection.event_loop.run_once until @confirms_continuations.length > 0
- end
-
- @confirms_continuations.pop
- end
-
# Blocks calling thread until confirms are received for all
# currently unacknowledged published messages
def wait_for_confirms
@@ -1006,6 +1001,7 @@ def generate_consumer_tag(name = "bunny")
#
# @api plugin
def recover_from_network_failure
+ # puts "Recovering channel #{@id}"
release_all_continuations
recover_prefetch_setting
@@ -1039,6 +1035,7 @@ def recover_exchanges
# @api plugin
def recover_queues
@queues.values.dup.each do |q|
+ # puts "Recovering queue #{q.name}"
q.recover_from_network_failure
end
end
@@ -1143,12 +1140,12 @@ def handle_method(method)
# @private
def handle_basic_get_ok(basic_get_ok, properties, content)
- @continuations.push([basic_get_ok, properties, content])
+ @basic_get_continuations.push([basic_get_ok, properties, content])
end
# @private
def handle_basic_get_empty(basic_get_empty)
- @continuations.push([nil, nil, nil])
+ @basic_get_continuations.push([nil, nil, nil])
end
# @private
@@ -1202,11 +1199,79 @@ def handle_ack_or_nack(delivery_tag, multiple, nack)
end
end
+ # @private
+ def wait_on_continuations
+ if @connection.threaded
+ t = Thread.current
+ @threads_waiting_on_continuations << t
+
+ v = @continuations.pop
+ @threads_waiting_on_continuations.delete(t)
+
+ v
+ else
+ connection.event_loop.run_once until @continuations.length > 0
+
+ @continuations.pop
+ end
+ end
+
+ # @private
+ def wait_on_basic_get_continuations
+ if @connection.threaded
+ t = Thread.current
+ @threads_waiting_on_basic_get_continuations << t
+
+ v = @basic_get_continuations.pop
+ @threads_waiting_on_basic_get_continuations.delete(t)
+
+ v
+ else
+ connection.event_loop.run_once until @continuations.length > 0
+
+ @basic_get_continuations.pop
+ end
+ end
+
+ # @private
+ def wait_on_confirms_continuations
+ if @connection.threaded
+ t = Thread.current
+ @threads_waiting_on_confirms_continuations << t
+
+ v = @confirms_continuations.pop
+ @threads_waiting_on_confirms_continuations.delete(t)
+
+ v
+ else
+ connection.event_loop.run_once until @confirms_continuations.length > 0
+
+ @confirms_continuations.pop
+ end
+ end
+
# Releases all continuations. Used by automatic network recovery.
# @private
def release_all_continuations
- @confirms_continuations.push(true)
- @continuations.push(nil)
+ if @confirms_continuations.num_waiting > 0
+ @threads_waiting_on_confirms_continuations.each do |t|
+ t.run
+ end
+ end
+ if @continuations.num_waiting > 0
+ @threads_waiting_on_continuations.each do |t|
+ t.run
+ end
+ end
+ if @basic_get_continuations.num_waiting > 0
+ @threads_waiting_on_basic_get_continuations.each do |t|
+ t.run
+ end
+ end
+
+ @continuations = ::Queue.new
+ @confirms_continuations = ::Queue.new
+ @basic_get_continuations = ::Queue.new
end
# Starts consumer work pool. Lazily called by #basic_consume to avoid creating new threads
View
2 lib/bunny/session.rb
@@ -167,7 +167,7 @@ def closed?
end
def open?
- status == :open || status == :connected
+ (status == :open || status == :connected) && @transport.open?
end
alias connected? open?

0 comments on commit 89055e5

Please sign in to comment.