/
queue.rb
81 lines (68 loc) · 2.52 KB
/
queue.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
class Sneakers::Queue
attr_reader :name, :opts, :exchange
def initialize(name, opts)
@name = name
@opts = opts
@handler_klass = Sneakers::CONFIG[:handler]
end
#
# :exchange
# :heartbeat_interval
# :prefetch
# :durable
# :ack
#
def subscribe(worker)
# If we've already got a bunny object, use it. This allows people to
# specify all kinds of options we don't need to know about (e.g. for ssl).
@bunny = @opts[:connection]
@bunny ||= create_bunny_connection
@bunny.start
@channel = @bunny.create_channel
@channel.prefetch(@opts[:prefetch])
exchange_name = @opts[:exchange]
@exchange = @channel.exchange(exchange_name, @opts[:exchange_options])
routing_key = @opts[:routing_key] || @name
routing_keys = [*routing_key]
# TODO: get the arguments from the handler? Retry handler wants this so you
# don't have to line up the queue's dead letter argument with the exchange
# you'll create for retry.
queue = @channel.queue(@name, @opts[:queue_options])
if exchange_name.length > 0
routing_keys.each do |key|
queue.bind(@exchange, :routing_key => key)
end
end
# NOTE: we are using the worker's options. This is necessary so the handler
# has the same configuration as the worker. Also pass along the exchange and
# queue in case the handler requires access to them (for things like binding
# retry queues, etc).
handler_klass = worker.opts[:handler] || Sneakers::CONFIG.fetch(:handler)
handler = handler_klass.new(@channel, queue, worker.opts)
@consumer = queue.subscribe(:block => false, :manual_ack => @opts[:ack]) do | delivery_info, metadata, msg |
worker.do_work(delivery_info, metadata, msg, handler)
end
nil
end
def unsubscribe
return unless @consumer
# TODO: should we simply close the channel here?
Sneakers.logger.info("Queue: will try to cancel consumer #{@consumer.inspect}")
cancel_ok = @consumer.cancel
if cancel_ok
Sneakers.logger.info "Queue: consumer #{cancel_ok.consumer_tag} cancelled"
@consumer = nil
else
Sneakers.logger.warn "Queue: could not cancel consumer #{@consumer.inspect}"
sleep(1)
unsubscribe
end
end
def create_bunny_connection
Bunny.new(@opts[:amqp], :vhost => @opts[:vhost],
:heartbeat => @opts[:heartbeat],
:properties => @opts.fetch(:properties, {}),
:logger => Sneakers::logger)
end
private :create_bunny_connection
end