-
Notifications
You must be signed in to change notification settings - Fork 314
Expand file tree
/
Copy pathconsumer.rb
More file actions
128 lines (111 loc) · 4.25 KB
/
consumer.rb
File metadata and controls
128 lines (111 loc) · 4.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
module Bunny
# Base class that represents consumer interface. Subclasses of this class implement
# specific logic of handling consumer life cycle events. Note that when the only event
# you are interested in is message deliveries, it is recommended to just use
# {Bunny::Queue#subscribe} instead of subclassing this class.
#
# @see Bunny::Queue#subscribe
# @see Bunny::Queue#subscribe_with
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @api public
class Consumer
#
# API
#
attr_reader :channel
attr_reader :queue
attr_accessor :consumer_tag
attr_reader :arguments
attr_reader :no_ack
attr_reader :exclusive
# @param [Bunny::Channel] channel Channel this consumer will use
# @param [Bunny::Queue,String] queue Queue messages will be consumed from
# @param [String] consumer_tag Consumer tag (unique identifier). Generally it is better to let Bunny generate one.
# Empty string means RabbitMQ will generate consumer tag.
# @param [Boolean] no_ack (true) If true, delivered messages will be automatically acknowledged.
# If false, manual acknowledgements will be necessary.
# @param [Boolean] exclusive (false) Should this consumer be exclusive?
# @param [Hash] arguments (nil) Optional arguments that may be used by RabbitMQ extensions, etc
# @api public
def initialize(channel, queue, consumer_tag = channel.generate_consumer_tag, no_ack = true, exclusive = false, arguments = {})
@channel = channel || raise(ArgumentError, "channel is nil")
@queue = queue || raise(ArgumentError, "queue is nil")
@consumer_tag = consumer_tag
@exclusive = exclusive
@arguments = arguments
# no_ack set to true = no manual ack = automatic ack. MK.
@no_ack = no_ack
@on_cancellation = []
end
# Defines message delivery handler
# @api public
def on_delivery(&block)
@on_delivery = block
self
end
# Invokes message delivery handler
# @private
def call(*args)
@on_delivery.call(*args) if @on_delivery
end
alias handle_delivery call
# Defines consumer cancellation notification handler
#
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions guide
# @api public
def on_cancellation(&block)
@on_cancellation << block
self
end
# Invokes consumer cancellation notification handler
# @private
def handle_cancellation(basic_cancel)
@on_cancellation.each do |fn|
fn.call(basic_cancel)
end
end
# Cancels this consumer. Messages for this consumer will no longer be delivered. If the queue
# it was on is auto-deleted and this consumer was the last one, the queue will be deleted.
#
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @api public
def cancel
@channel.basic_cancel(@consumer_tag)
end
# @return [String] More detailed human-readable string representation of this consumer
def inspect
"#<#{self.class.name}:#{object_id} @channel_id=#{@channel.number} @queue=#{self.queue_name}> @consumer_tag=#{@consumer_tag} @exclusive=#{@exclusive} @no_ack=#{@no_ack}>"
end
# @return [String] Brief human-readable string representation of this consumer
def to_s
"#<#{self.class.name}:#{object_id} @channel_id=#{@channel.number} @queue=#{self.queue_name}> @consumer_tag=#{@consumer_tag}>"
end
# @return [Boolean] true if this consumer uses automatic acknowledgement mode
# @api public
def automatic_acknowledgement?
@no_ack == true
end
# @return [Boolean] true if this consumer uses manual (explicit) acknowledgement mode
# @api public
def manual_acknowledgement?
@no_ack == false
end
# @return [String] Name of the queue this consumer is on
# @api public
def queue_name
if @queue.respond_to?(:name)
@queue.name
else
@queue
end
end
#
# Recovery
#
# @private
def recover_from_network_failure
@channel.basic_consume_with(self)
end
end
end