Skip to content

Commit

Permalink
If connection is down, make Bunny::Queues#pop return empty responses
Browse files Browse the repository at this point in the history
To support the same edge case with Bunny::Queue#pop being called
in a tight loop when network connection goes down.

It is better to return something immediately rather than keep
blocking the thread we have no easy way to unblock with the
current "blocking pop" approach that replaces not-entirely-correct
wait/notify implementation.
  • Loading branch information
Michael Klishin committed Jan 16, 2013
1 parent 89055e5 commit 0f5ba46
Showing 1 changed file with 15 additions and 4 deletions.
19 changes: 15 additions & 4 deletions lib/bunny/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,11 @@ def subscribe_with(consumer, opts = {:block => false})
end

def pop(opts = {:ack => false}, &block)
delivery_info, properties, content = @channel.basic_get(@name, opts)
delivery_info, properties, content = if @channel.connection.open?
@channel.basic_get(@name, opts)
else
[nil, nil, nil]
end

if block
block.call(delivery_info, properties, content)
Expand All @@ -145,7 +149,11 @@ def pop(opts = {:ack => false}, &block)
alias get pop

def pop_as_hash(opts = {:ack => false}, &block)
delivery_info, properties, content = @channel.basic_get(@name, opts)
delivery_info, properties, content = if @channel.connection.open?
@channel.basic_get(@name, opts)
else
[nil, nil, nil]
end

result = {:header => properties, :payload => content, :delivery_details => delivery_info}

Expand Down Expand Up @@ -208,17 +216,20 @@ def recover_from_network_failure
@channel.deregister_queue_named(old_name)
end

declare!
# puts "Recovering queue #{@name}"
begin
declare!

@channel.register_queue(self)
rescue Exception => e
puts "Caught #{e.inspect} while registering #{@name}!"
puts "Caught #{e.inspect} while redeclaring and registering #{@name}!"
end
recover_bindings
end

def recover_bindings
@bindings.each do |b|
# puts "Recovering binding #{b.inspect}"
self.bind(b[:exchange], b)
end
end
Expand Down

0 comments on commit 0f5ba46

Please sign in to comment.