Skip to content

Commit

Permalink
More work on queue, binding and consumer recovery
Browse files Browse the repository at this point in the history
To recover consumers we will have to make sure Bunny::Consumer instance
always have a reference to their respective queue object. The reason for
that is that when a server-named queue is recovered, consumer needs to
use the new name.
  • Loading branch information
Michael Klishin committed Jan 11, 2013
1 parent 5d831ba commit 0245585
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 4 deletions.
17 changes: 15 additions & 2 deletions lib/bunny/channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -610,25 +610,34 @@ def recover_from_network_failure

recover_prefetch_setting
recover_exchanges
# this includes recovering bindings
recover_queues
recover_consumers
end

def recover_prefetch_setting
basic_qos(@prefetch_count) if @prefetch_count
end

def recover_exchanges
@exchanges.each do |_, x|
@exchanges.values.dup.each do |x|
x.recover_from_network_failure
end
end

def recover_queues
@queues.each do |_, q|
@queues.values.dup.each do |q|
q.recover_from_network_failure
end
end

def recover_consumers
@consumers.values.dup.each do |c|
c.recover_from_network_failure
end
end



#
# Implementation
Expand Down Expand Up @@ -785,6 +794,10 @@ def deregister_queue(queue)
@queues.delete(queue.name)
end

def deregister_queue_named(name)
@queues.delete(name)
end

def register_queue(queue)
@queues[queue.name] = queue
end
Expand Down
8 changes: 8 additions & 0 deletions lib/bunny/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,13 @@ def cancel
def inspect
"#<#{self.class.name}:#{object_id} @channel_id=#{@channel.number} @queue=#{self.queue_name}> @consumer_tag=#{@consumer_tag} @exclusive=#{@exclusive} @no_ack=#{@no_ack}>"
end

#
# Recovery
#

def recover_from_network_failure
# TODO
end
end
end
29 changes: 27 additions & 2 deletions lib/bunny/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def bind(exchange, opts = {})

# store bindings for automatic recovery. We need to be very careful to
# not cause an infinite rebinding loop here when we recover. MK.
binding = { :exchange => exchange_name, :routing_key => (opts[:routing_key] || opts[:key]), :arguments => arguments }
binding = { :exchange => exchange_name, :routing_key => (opts[:routing_key] || opts[:key]), :arguments => opts[:arguments] }
@bindings.push(binding) unless @bindings.include?(binding)

self
Expand Down Expand Up @@ -195,8 +195,33 @@ def consumer_count
s[:consumer_count]
end

#
# Recovery
#

def recover_from_network_failure
puts "Recovering queue #{@name} from network failure"
# puts "Recovering queue #{@name} from network failure"

if self.server_named?
old_name = @name.dup
@name = AMQ::Protocol::EMPTY_STRING

@channel.deregister_queue_named(old_name)
end

declare!
begin
@channel.register_queue(self)
rescue Exception => e
puts "Caught #{e.inspect} while registering #{@name}!"
end
recover_bindings
end

def recover_bindings
@bindings.each do |b|
self.bind(b[:exchange], b)
end
end


Expand Down

0 comments on commit 0245585

Please sign in to comment.