From 9b73a11922943a3faead9be329e6bb328586a140 Mon Sep 17 00:00:00 2001 From: Timothy Marks Date: Thu, 13 Mar 2014 16:15:51 +1000 Subject: [PATCH 01/10] Initial format for message retry handler. --- lib/sneakers/handlers/maxretry.rb | 42 +++++++++++++++++++++++++++++++ lib/sneakers/worker.rb | 10 ++++---- 2 files changed, 47 insertions(+), 5 deletions(-) create mode 100644 lib/sneakers/handlers/maxretry.rb diff --git a/lib/sneakers/handlers/maxretry.rb b/lib/sneakers/handlers/maxretry.rb new file mode 100644 index 00000000..923f98af --- /dev/null +++ b/lib/sneakers/handlers/maxretry.rb @@ -0,0 +1,42 @@ +module Sneakers + module Handlers + class Maxretry + + ### + # Maxretry uses dead letter policies on Rabbitmq to requeue and retry messages after a timeout + # + ### + + def initialize(channel) + @channel = channel + end + + def acknowledge(tag) + @channel.acknowledge(tag, false) + end + + def reject(tag, props, msg, requeue=false) + # Check how many times it has been requeued + if props[:headers].nil? or props[:headers]['x-death'].nil? or props[:headers]['x-death'].count < 5 + @channel.reject(tag, requeue) + else + # Retried more than the max times + @exchange.publish({:msg => msg, :routing_key => rops[:headers]['routing-keys'][0]}.to_json, :routing_key => 'error') + @channel.acknowledge(tag, false) + end + end + + def error(tag, props, msg, err) + reject(tag, props, msg) + end + + def timeout(tag, props, msg) + reject(tag, props, msg) + end + + def noop(tag) + + end + end + end +end diff --git a/lib/sneakers/worker.rb b/lib/sneakers/worker.rb index 4058c786..e694d765 100644 --- a/lib/sneakers/worker.rb +++ b/lib/sneakers/worker.rb @@ -71,15 +71,15 @@ def do_work(hdr, props, msg, handler) # note to future-self. never acknowledge multiple (multiple=true) messages under threads. handler.acknowledge(hdr.delivery_tag) elsif res == :timeout - handler.timeout(hdr.delivery_tag) + handler.timeout(hdr.delivery_tag, props, msg) elsif res == :error - handler.error(hdr.delivery_tag, error) + handler.error(hdr.delivery_tag, props, msg, error) elsif res == :reject - handler.reject(hdr.delivery_tag) + handler.reject(hdr.delivery_tag, props, msg) elsif res == :requeue - handler.reject(hdr.delivery_tag, true) + handler.reject(hdr.delivery_tag, props, msg, true) else - handler.noop(hdr.delivery_tag) + handler.noop(hdr.delivery_tag, props, msg) end metrics.increment("work.#{self.class.name}.handled.#{res || 'reject'}") end From c4fe0490cae064de5deca216d76422da005bec9a Mon Sep 17 00:00:00 2001 From: Timothy Marks Date: Thu, 13 Mar 2014 17:12:30 +1000 Subject: [PATCH 02/10] Commit of example, retry_handler and some other logic to handle the retry queue setup. --- examples/max_retry_handler.rb | 42 +++++++++++++++++++++++++++++++ lib/sneakers/handlers/maxretry.rb | 19 +++++++++++++- lib/sneakers/queue.rb | 2 +- lib/sneakers/worker.rb | 2 ++ 4 files changed, 63 insertions(+), 2 deletions(-) create mode 100644 examples/max_retry_handler.rb diff --git a/examples/max_retry_handler.rb b/examples/max_retry_handler.rb new file mode 100644 index 00000000..4916ebbf --- /dev/null +++ b/examples/max_retry_handler.rb @@ -0,0 +1,42 @@ +$: << File.expand_path('../lib', File.dirname(__FILE__)) +require 'sneakers' +require 'sneakers/runner' +require 'sneakers/handlers/maxretry' +require 'logger' + + +Sneakers.configure(:handler => Sneakers::Handlers::Maxretry, :workers => 1) +Sneakers.logger.level = Logger::INFO + +class MaxRetryWorker + include Sneakers::Worker + from_queue 'downloads', + :ack => true, + :threads => 1, + :prefetch => 1, + :timeout_job_after => 60, + :exchange => 'sneakers', + :heartbeat => 5, + :arguments => { + :'x-dead-letter-exchange' => 'sneakers-retry' + } + + def work(msg) + + puts "Got message #{msg}" + + # We always want to reject to see if we do the proper timeout + return reject! + + end +end + +messages = 1 +puts "feeding messages in" +messages.times { + MaxRetryWorker.enqueue("{}") +} +puts "done" + +r = Sneakers::Runner.new([ MaxRetryWorker ]) +r.run \ No newline at end of file diff --git a/lib/sneakers/handlers/maxretry.rb b/lib/sneakers/handlers/maxretry.rb index 923f98af..60d4165d 100644 --- a/lib/sneakers/handlers/maxretry.rb +++ b/lib/sneakers/handlers/maxretry.rb @@ -9,6 +9,19 @@ class Maxretry def initialize(channel) @channel = channel + + # We need to setup the sneakers-retry exchange and queue + retry_exchange = @channel.exchange('sneakers-retry', + :type => 'fanout', + :durable => 'true') + retry_queue = @channel.queue('sneakers-retry', + :durable => 'true', + :arguments => { + :'x-dead-letter-exchange' => 'sneakers', + :'x-message-ttl' => 10000 + }) + retry_queue.bind(retry_exchange) + end def acknowledge(tag) @@ -17,11 +30,15 @@ def acknowledge(tag) def reject(tag, props, msg, requeue=false) # Check how many times it has been requeued + puts "Got xdeath #{props.inspect}" + if props[:headers].nil? or props[:headers]['x-death'].nil? or props[:headers]['x-death'].count < 5 + puts "Retrying" @channel.reject(tag, requeue) else # Retried more than the max times - @exchange.publish({:msg => msg, :routing_key => rops[:headers]['routing-keys'][0]}.to_json, :routing_key => 'error') + puts "Publishing to error queue" + #@exchange.publish({:msg => msg, :routing_key => rops[:headers]['routing-keys'][0]}.to_json, :routing_key => 'error') @channel.acknowledge(tag, false) end end diff --git a/lib/sneakers/queue.rb b/lib/sneakers/queue.rb index b2d354a5..bd535a0e 100644 --- a/lib/sneakers/queue.rb +++ b/lib/sneakers/queue.rb @@ -31,7 +31,7 @@ def subscribe(worker) routing_key = @opts[:routing_key] || @name routing_keys = [*routing_key] - queue = @channel.queue(@name, :durable => @opts[:durable]) + queue = @channel.queue(@name, :durable => @opts[:durable], :arguments => @opts[:arguments]) routing_keys.each do |key| queue.bind(@exchange, :routing_key => key) diff --git a/lib/sneakers/worker.rb b/lib/sneakers/worker.rb index e694d765..c00c9347 100644 --- a/lib/sneakers/worker.rb +++ b/lib/sneakers/worker.rb @@ -66,6 +66,8 @@ def do_work(hdr, props, msg, handler) logger.error(ex) end + puts "Got res #{res} and handler #{handler}" + if @should_ack if res == :ack # note to future-self. never acknowledge multiple (multiple=true) messages under threads. From d4845207f33a850899969d0189a5cf8726686822 Mon Sep 17 00:00:00 2001 From: Timothy Marks Date: Fri, 14 Mar 2014 12:22:50 +1000 Subject: [PATCH 03/10] Changes to queue to pass opts into handlers, worker to use hdr not hdr.delivery_tag, max_retry_handler to create and bind required queues. --- examples/max_retry_handler.rb | 4 +- lib/sneakers/handlers/maxretry.rb | 86 ++++++++++++++++++++----------- lib/sneakers/queue.rb | 2 +- lib/sneakers/worker.rb | 14 +++-- 4 files changed, 66 insertions(+), 40 deletions(-) diff --git a/examples/max_retry_handler.rb b/examples/max_retry_handler.rb index 4916ebbf..b194821f 100644 --- a/examples/max_retry_handler.rb +++ b/examples/max_retry_handler.rb @@ -5,7 +5,7 @@ require 'logger' -Sneakers.configure(:handler => Sneakers::Handlers::Maxretry, :workers => 1) +Sneakers.configure(:handler => Sneakers::Handlers::Maxretry, :workers => 1, :threads => 1, :prefetch => 1) Sneakers.logger.level = Logger::INFO class MaxRetryWorker @@ -23,7 +23,7 @@ class MaxRetryWorker def work(msg) - puts "Got message #{msg}" + puts "Got message #{msg} and rejecting now" # We always want to reject to see if we do the proper timeout return reject! diff --git a/lib/sneakers/handlers/maxretry.rb b/lib/sneakers/handlers/maxretry.rb index 60d4165d..236b3563 100644 --- a/lib/sneakers/handlers/maxretry.rb +++ b/lib/sneakers/handlers/maxretry.rb @@ -7,51 +7,79 @@ class Maxretry # ### - def initialize(channel) + def initialize(channel, opts) @channel = channel + @opts = opts - # We need to setup the sneakers-retry exchange and queue - retry_exchange = @channel.exchange('sneakers-retry', - :type => 'fanout', - :durable => 'true') - retry_queue = @channel.queue('sneakers-retry', - :durable => 'true', - :arguments => { - :'x-dead-letter-exchange' => 'sneakers', - :'x-message-ttl' => 10000 - }) - retry_queue.bind(retry_exchange) + # If there is no retry exchange specified, use #{exchange}-retry + retry_name = @opts[:retryexchange] || "#{@opts[:exchange]}-retry" - end + # Create the retry exchange as a durable topic so we retain original routing keys but bind the queue using a wildcard + retry_exchange = @channel.exchange( retry_name, + :type => 'topic', + :durable => 'true') + + # Create the retry queue with the same name as the retry exchange and a dead letter exchange + # The dead letter exchange is the default exchange and the ttl can be from the opts or defaults to 60 seconds + retry_queue = @channel.queue( retry_name, + :durable => 'true', + :arguments => { + :'x-dead-letter-exchange' => @opts[:exchange], + :'x-message-ttl' => @opts[:retry_timeout] || 60000 + }) + + # Bind the retry queue to the retry topic exchange with a wildcard + retry_queue.bind(retry_exchange, :routing_key => '#') + + ## Setup the error queue + + # If there is no error exchange specified, use #{exchange}-error + error_name = @opts[:errorexchange] || "#{@opts[:exchange]}-error" + + # Create the error exchange as a durable topic so we retain original routing keys but bind the queue using a wildcard + @error_exchange = @channel.exchange(error_name, + :type => 'topic', + :durable => 'true') + + # Create the error queue with the same name as the error exchange and a dead letter exchange + # The dead letter exchange is the default exchange and the ttl can be from the opts or defaults to 60 seconds + error_queue = @channel.queue( error_name, + :durable => 'true') + + # Bind the error queue to the error topic exchange with a wildcard + error_queue.bind(@error_exchange, :routing_key => '#') - def acknowledge(tag) - @channel.acknowledge(tag, false) end - def reject(tag, props, msg, requeue=false) - # Check how many times it has been requeued - puts "Got xdeath #{props.inspect}" + def acknowledge(hdr) + @channel.acknowledge(hdr.delivery_tag, false) + end + def reject(hdr, props, msg, requeue=false) + + # Note to readers, the count of the x-death will increment by 2 for each retry, once for the reject and once for the expiration from the retry queue if props[:headers].nil? or props[:headers]['x-death'].nil? or props[:headers]['x-death'].count < 5 - puts "Retrying" - @channel.reject(tag, requeue) + # We call reject which will route the message to the x-dead-letter-exchange (ie. retry exchange) on the queue + @channel.reject(hdr.delivery_tag, requeue) + else - # Retried more than the max times - puts "Publishing to error queue" - #@exchange.publish({:msg => msg, :routing_key => rops[:headers]['routing-keys'][0]}.to_json, :routing_key => 'error') - @channel.acknowledge(tag, false) + # Retried more than the max times + # Publish the original message with the routing_key to the error exchange + @error_exchange.publish(msg, :routing_key => hdr.routing_key) + @channel.acknowledge(hdr.delivery_tag, false) + end end - def error(tag, props, msg, err) - reject(tag, props, msg) + def error(hdr, props, msg, err) + reject(hdr.delivery_tag, props, msg) end - def timeout(tag, props, msg) - reject(tag, props, msg) + def timeout(hdr, props, msg) + reject(hdr.delivery_tag, props, msg) end - def noop(tag) + def noop(hdr) end end diff --git a/lib/sneakers/queue.rb b/lib/sneakers/queue.rb index bd535a0e..c6f52130 100644 --- a/lib/sneakers/queue.rb +++ b/lib/sneakers/queue.rb @@ -26,7 +26,7 @@ def subscribe(worker) :type => @opts[:exchange_type], :durable => @opts[:durable]) - handler = @handler_klass.new(@channel) + handler = @handler_klass.new(@channel, @opts) routing_key = @opts[:routing_key] || @name routing_keys = [*routing_key] diff --git a/lib/sneakers/worker.rb b/lib/sneakers/worker.rb index c00c9347..7fb7f90a 100644 --- a/lib/sneakers/worker.rb +++ b/lib/sneakers/worker.rb @@ -66,22 +66,20 @@ def do_work(hdr, props, msg, handler) logger.error(ex) end - puts "Got res #{res} and handler #{handler}" - if @should_ack if res == :ack # note to future-self. never acknowledge multiple (multiple=true) messages under threads. - handler.acknowledge(hdr.delivery_tag) + handler.acknowledge(hdr) elsif res == :timeout - handler.timeout(hdr.delivery_tag, props, msg) + handler.timeout(hdr, props, msg) elsif res == :error - handler.error(hdr.delivery_tag, props, msg, error) + handler.error(hdr, props, msg, error) elsif res == :reject - handler.reject(hdr.delivery_tag, props, msg) + handler.reject(hdr, props, msg) elsif res == :requeue - handler.reject(hdr.delivery_tag, props, msg, true) + handler.reject(hdr, props, msg, true) else - handler.noop(hdr.delivery_tag, props, msg) + handler.noop(hdr, props, msg) end metrics.increment("work.#{self.class.name}.handled.#{res || 'reject'}") end From 25939c0c96d9e52f9659a39cbfbc1986985558fc Mon Sep 17 00:00:00 2001 From: Justin Mills Date: Sun, 4 May 2014 10:45:09 -0400 Subject: [PATCH 04/10] Maxretry Handler This was originally started here: https://github.com/otherlevels/sneakers/tree/maxretry and discussed as a sneakers issue here: https://github.com/jondot/sneakers/issues/18. I took this, updated the rest of sneakers to work with it and wrote tests of both Handlers to ensure things are still working as expected. NOTE: This is a breaking change in that any Handlers out there will no longer work as the interface has changed. I expect there may be one more interface-changing commit to follow on to this depending on how to tackle the metrics issue. --- .gitignore | 2 + examples/max_retry_handler.rb | 14 +- lib/sneakers/handlers/maxretry.rb | 57 +++++-- lib/sneakers/handlers/oneshot.rb | 21 +-- lib/sneakers/worker.rb | 2 +- spec/sneakers/worker_handlers_spec.rb | 224 ++++++++++++++++++++++++++ spec/sneakers/worker_spec.rb | 50 +++--- spec/spec_helper.rb | 4 +- 8 files changed, 309 insertions(+), 65 deletions(-) create mode 100644 spec/sneakers/worker_handlers_spec.rb diff --git a/.gitignore b/.gitignore index d672b808..cd35642f 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,5 @@ sneakers.pid pkg/ coverage/ tmp/ +.ruby-version +.ruby-gemset diff --git a/examples/max_retry_handler.rb b/examples/max_retry_handler.rb index b194821f..fbb2b585 100644 --- a/examples/max_retry_handler.rb +++ b/examples/max_retry_handler.rb @@ -4,10 +4,16 @@ require 'sneakers/handlers/maxretry' require 'logger' - Sneakers.configure(:handler => Sneakers::Handlers::Maxretry, :workers => 1, :threads => 1, :prefetch => 1) Sneakers.logger.level = Logger::INFO +# Example of how to write a retry worker. If your rabbit system is empty, then +# you must run this twice. Once to setup the exchanges, queues and bindings a +# second time to have the sent message end up on the downloads queue. +# +# Run this via: +# bundle exec ruby examples/max_retry_handler.rb +# class MaxRetryWorker include Sneakers::Worker from_queue 'downloads', @@ -22,11 +28,11 @@ class MaxRetryWorker } def work(msg) - + puts "Got message #{msg} and rejecting now" # We always want to reject to see if we do the proper timeout - return reject! + reject! end end @@ -39,4 +45,4 @@ def work(msg) puts "done" r = Sneakers::Runner.new([ MaxRetryWorker ]) -r.run \ No newline at end of file +r.run diff --git a/lib/sneakers/handlers/maxretry.rb b/lib/sneakers/handlers/maxretry.rb index 236b3563..83d06bf5 100644 --- a/lib/sneakers/handlers/maxretry.rb +++ b/lib/sneakers/handlers/maxretry.rb @@ -1,11 +1,11 @@ module Sneakers module Handlers + # + # Maxretry uses dead letter policies on Rabbitmq to requeue and retry + # messages after failure (rejections, errors and timeouts). When the maximum + # number of retries is reached it will put the message on an error queue. + # class Maxretry - - ### - # Maxretry uses dead letter policies on Rabbitmq to requeue and retry messages after a timeout - # - ### def initialize(channel, opts) @channel = channel @@ -21,6 +21,8 @@ def initialize(channel, opts) # Create the retry queue with the same name as the retry exchange and a dead letter exchange # The dead letter exchange is the default exchange and the ttl can be from the opts or defaults to 60 seconds + # TODO: respect @opts[:durable]? Are there cases where you want the + # retry and error exchanges to match the originating exchange? retry_queue = @channel.queue( retry_name, :durable => 'true', :arguments => { @@ -32,7 +34,7 @@ def initialize(channel, opts) retry_queue.bind(retry_exchange, :routing_key => '#') ## Setup the error queue - + # If there is no error exchange specified, use #{exchange}-error error_name = @opts[:errorexchange] || "#{@opts[:exchange]}-error" @@ -41,47 +43,68 @@ def initialize(channel, opts) :type => 'topic', :durable => 'true') - # Create the error queue with the same name as the error exchange and a dead letter exchange - # The dead letter exchange is the default exchange and the ttl can be from the opts or defaults to 60 seconds + # Create the error queue with the same name as the error exchange error_queue = @channel.queue( error_name, :durable => 'true') # Bind the error queue to the error topic exchange with a wildcard error_queue.bind(@error_exchange, :routing_key => '#') + @max_retries = @opts[:retry_max_times] || 5 + end - def acknowledge(hdr) + def acknowledge(hdr, props, msg) @channel.acknowledge(hdr.delivery_tag, false) end def reject(hdr, props, msg, requeue=false) - - # Note to readers, the count of the x-death will increment by 2 for each retry, once for the reject and once for the expiration from the retry queue - if props[:headers].nil? or props[:headers]['x-death'].nil? or props[:headers]['x-death'].count < 5 + + # Note to readers, the count of the x-death will increment by 2 for each + # retry, once for the reject and once for the expiration from the retry + # queue + if requeue || ((failure_count(props[:headers]) + 1) < @max_retries) # We call reject which will route the message to the x-dead-letter-exchange (ie. retry exchange) on the queue @channel.reject(hdr.delivery_tag, requeue) - + # TODO: metrics else # Retried more than the max times # Publish the original message with the routing_key to the error exchange @error_exchange.publish(msg, :routing_key => hdr.routing_key) @channel.acknowledge(hdr.delivery_tag, false) - + # TODO: metrics end end def error(hdr, props, msg, err) - reject(hdr.delivery_tag, props, msg) + reject(hdr, props, msg) end def timeout(hdr, props, msg) - reject(hdr.delivery_tag, props, msg) + reject(hdr, props, msg) end - def noop(hdr) + def noop(hdr, props, msg) end + + # Uses the x-death header to determine the number of failures this job has + # seen in the past. This does not count the current failure. So for + # instance, the first time the job fails, this will return 0, the second + # time, 1, etc. + # @param headers [Hash] Hash of headers that Rabbit delivers as part of + # the message + # @return [Integer] Count of number of failures. + def failure_count(headers) + if headers.nil? || headers['x-death'].nil? + 0 + else + headers['x-death'].select do |x_death| + x_death['queue'] == @opts[:exchange] + end.count + end + end + private :failure_count end end end diff --git a/lib/sneakers/handlers/oneshot.rb b/lib/sneakers/handlers/oneshot.rb index 2008267a..0df27652 100644 --- a/lib/sneakers/handlers/oneshot.rb +++ b/lib/sneakers/handlers/oneshot.rb @@ -1,27 +1,28 @@ module Sneakers module Handlers class Oneshot - def initialize(channel) + def initialize(channel, opts) @channel = channel + @opts = opts end - def acknowledge(tag) - @channel.acknowledge(tag, false) + def acknowledge(hdr, props, msg) + @channel.acknowledge(hdr.delivery_tag, false) end - def reject(tag, requeue=false) - @channel.reject(tag, requeue) + def reject(hdr, props, msg, requeue=false) + @channel.reject(hdr.delivery_tag, requeue) end - def error(tag, err) - reject(tag) + def error(hdr, props, msg, err) + reject(hdr, props, msg) end - def timeout(tag) - reject(tag) + def timeout(hdr, props, msg) + reject(hdr, props, msg) end - def noop(tag) + def noop(hdr, props, msg) end end diff --git a/lib/sneakers/worker.rb b/lib/sneakers/worker.rb index 7fb7f90a..1da085f6 100644 --- a/lib/sneakers/worker.rb +++ b/lib/sneakers/worker.rb @@ -69,7 +69,7 @@ def do_work(hdr, props, msg, handler) if @should_ack if res == :ack # note to future-self. never acknowledge multiple (multiple=true) messages under threads. - handler.acknowledge(hdr) + handler.acknowledge(hdr, props, msg) elsif res == :timeout handler.timeout(hdr, props, msg) elsif res == :error diff --git a/spec/sneakers/worker_handlers_spec.rb b/spec/sneakers/worker_handlers_spec.rb new file mode 100644 index 00000000..3538b623 --- /dev/null +++ b/spec/sneakers/worker_handlers_spec.rb @@ -0,0 +1,224 @@ +require 'spec_helper' +require 'sneakers' +require 'timeout' +require 'sneakers/handlers/oneshot' +require 'sneakers/handlers/maxretry' + + +# Specific tests of the Handler implementations you can use to deal with job +# results. These tests only make sense with a worker that requires acking. + +class HandlerTestWorker + include Sneakers::Worker + from_queue 'defaults', + :ack => true + + def work(msg) + if msg.is_a?(StandardError) + raise msg + else + msg + end + end +end + +class TestPool + def process(*args,&block) + block.call + end +end + + +describe 'Handlers' do + let(:queue) { Object.new } + let(:worker) { HandlerTestWorker.new(@queue, TestPool.new) } + + before(:each) do + Sneakers.configure(:daemonize => true, :log => 'sneakers.log') + Sneakers::Worker.configure_logger(Logger.new('/dev/null')) + Sneakers::Worker.configure_metrics + end + + describe 'Oneshot' do + before(:each) do + @channel = Object.new + @opts = Object.new + @handler = Sneakers::Handlers::Oneshot.new(@channel, @opts) + + @header = Object.new + stub(@header).delivery_tag { 37 } + end + + describe '#do_work' do + it 'should work and handle acks' do + mock(@channel).acknowledge(37, false) + + worker.do_work(@header, nil, :ack, @handler) + end + + it 'should work and handle rejects' do + mock(@channel).reject(37, false) + + worker.do_work(@header, nil, :reject, @handler) + end + + it 'should work and handle requeues' do + mock(@channel).reject(37, true) + + worker.do_work(@header, nil, :requeue, @handler) + end + + it 'should work and handle user-land timeouts' do + mock(@channel).reject(37, false) + + worker.do_work(@header, nil, :timeout, @handler) + end + + it 'should work and handle user-land error' do + mock(@channel).reject(37, false) + + worker.do_work(@header, nil, StandardError.new('boom!'), @handler) + end + + it 'should work and handle noops' do + worker.do_work(@header, nil, :wait, @handler) + end + end + + end + + describe 'Maxretry' do + let(:max_retries) { nil } + + before(:each) do + @channel = Object.new + @opts = { + :exchange => 'sneakers', + }.tap do |opts| + opts[:retry_max_times] = max_retries unless max_retries.nil? + end + + @retry_exchange = Object.new + @retry_queue = Object.new + @error_exchange = Object.new + @error_queue = Object.new + + mock(@channel).exchange('sneakers-retry', + :type => 'topic', + :durable => 'true').once { @retry_exchange } + mock(@channel).queue('sneakers-retry', + :durable => 'true', + :arguments => { + :'x-dead-letter-exchange' => 'sneakers', + :'x-message-ttl' => 60000 + } + ).once { @retry_queue } + mock(@retry_queue).bind(@retry_exchange, :routing_key => '#') + + mock(@channel).exchange('sneakers-error', + :type => 'topic', + :durable => 'true').once { @error_exchange } + mock(@channel).queue('sneakers-error', + :durable => 'true').once { @error_queue } + mock(@error_queue).bind(@error_exchange, :routing_key => '#') + + @handler = Sneakers::Handlers::Maxretry.new(@channel, @opts) + + @header = Object.new + stub(@header).delivery_tag { 37 } + + @props = {} + @props_with_x_death = { + :headers => { + "x-death" => [ + { + "reason" => "expired", + "queue" => "sneakers-retry", + "time" => Time.now, + "exchange" => "RawMail-retry", + "routing-keys" => ["RawMail"] + }, + { + "reason" => "rejected", + "queue" => "sneakers", + "time" => Time.now, + "exchange" => "", + "routing-keys" => ["RawMail"] + } + ] + }, + :delivery_mode => 1} + end + + # it 'allows overriding the retry exchange name' + # it 'allows overriding the error exchange name' + # it 'allows overriding the retry timeout' + + describe '#do_work' do + it 'should work and handle acks' do + mock(@channel).acknowledge(37, false) + + worker.do_work(@header, @props, :ack, @handler) + end + + describe 'rejects' do + describe 'more retries ahead' do + it 'should work and handle rejects' do + mock(@channel).reject(37, false) + + worker.do_work(@header, @props_with_x_death, :reject, @handler) + end + end + + describe 'no more retries' do + let(:max_retries) { 1 } + + it 'sends the rejection to the error queue' do + mock(@header).routing_key { '#' } + mock(@channel).acknowledge(37, false) + mock(@error_exchange).publish(:reject, :routing_key => '#') + + worker.do_work(@header, @props_with_x_death, :reject, @handler) + end + + end + end + + describe 'requeues' do + it 'should work and handle requeues' do + mock(@channel).reject(37, true) + + worker.do_work(@header, @props_with_x_death, :requeue, @handler) + end + + describe 'no more retries left' do + let(:max_retries) { 1 } + + it 'continues to reject with requeue' do + mock(@channel).reject(37, true) + + worker.do_work(@header, @props_with_x_death, :requeue, @handler) + end + end + + end + + it 'should work and handle user-land timeouts' do + mock(@channel).reject(37, false) + + worker.do_work(@header, @props, :timeout, @handler) + end + + it 'should work and handle user-land error' do + mock(@channel).reject(37, false) + + worker.do_work(@header, @props, StandardError.new('boom!'), @handler) + end + + it 'should work and handle noops' do + worker.do_work(@header, @props, :wait, @handler) + end + end + + end +end diff --git a/spec/sneakers/worker_spec.rb b/spec/sneakers/worker_spec.rb index bb867884..459dbe45 100644 --- a/spec/sneakers/worker_spec.rb +++ b/spec/sneakers/worker_spec.rb @@ -108,13 +108,6 @@ def process(*args,&block) end end -class TestHandler - def acknowledge(tag); end - def reject(tag); end - def error(tag, err); end - def timeout(tag); end -end - def with_test_queuefactory(ctx, ack=true, msg=nil, nowork=false) qf = Object.new q = Object.new @@ -222,9 +215,8 @@ def with_test_queuefactory(ctx, ack=true, msg=nil, nowork=false) w = AcksWorker.new(@queue, TestPool.new) mock(w).work("msg").once{ raise "foo" } handler = Object.new - mock(handler).error("tag", anything) header = Object.new - stub(header).delivery_tag { "tag" } + mock(handler).error(header, nil, "msg", anything) w.do_work(header, nil, "msg", handler) end @@ -233,10 +225,9 @@ def with_test_queuefactory(ctx, ack=true, msg=nil, nowork=false) stub(w).work("msg"){ sleep 10 } handler = Object.new - mock(handler).timeout("tag") - header = Object.new - stub(header).delivery_tag { "tag" } + + mock(handler).timeout(header, nil, "msg") w.do_work(header, nil, "msg", handler) end @@ -251,35 +242,35 @@ def with_test_queuefactory(ctx, ack=true, msg=nil, nowork=false) it "should work and handle acks" do handler = Object.new - mock(handler).acknowledge("tag") + mock(handler).acknowledge(@header, nil, :ack) @worker.do_work(@header, nil, :ack, handler) end it "should work and handle rejects" do handler = Object.new - mock(handler).reject("tag") + mock(handler).reject(@header, nil, :reject) @worker.do_work(@header, nil, :reject, handler) end it "should work and handle requeues" do handler = Object.new - mock(handler).reject("tag", true) + mock(handler).reject(@header, nil, :requeue, true) @worker.do_work(@header, nil, :requeue, handler) end it "should work and handle user-land timeouts" do handler = Object.new - mock(handler).timeout("tag") + mock(handler).timeout(@header, nil, :timeout) @worker.do_work(@header, nil, :timeout, handler) end it "should work and handle user-land error" do handler = Object.new - mock(handler).error("tag",anything) + mock(handler).error(@header, nil, :error, anything) @worker.do_work(@header, nil, :error, handler) end @@ -323,14 +314,14 @@ def with_test_queuefactory(ctx, ack=true, msg=nil, nowork=false) describe 'Metrics' do before do @handler = Object.new - stub(@handler).acknowledge("tag") - stub(@handler).reject("tag") - stub(@handler).timeout("tag") - stub(@handler).error("tag", anything) - stub(@handler).noop("tag") - @header = Object.new - stub(@header).delivery_tag { "tag" } + + # We don't care how these are called, we're focusing on metrics here. + stub(@handler).acknowledge + stub(@handler).reject + stub(@handler).timeout + stub(@handler).error + stub(@handler).noop @w = MetricsWorker.new(@queue, TestPool.new) mock(@w.metrics).increment("work.MetricsWorker.started").once @@ -367,22 +358,17 @@ def with_test_queuefactory(ctx, ack=true, msg=nil, nowork=false) describe 'With Params' do before do + @props = { :foo => 1 } @handler = Object.new - stub(@handler).acknowledge("tag") - stub(@handler).reject("tag") - stub(@handler).timeout("tag") - stub(@handler).error("tag", anything) - stub(@handler).noop("tag") - @header = Object.new - stub(@header).delivery_tag { "tag" } + + stub(@handler).acknowledge(@header, @props, :ack).once @w = WithParamsWorker.new(@queue, TestPool.new) mock(@w.metrics).timing("work.WithParamsWorker.time").yields.once end it 'should call work_with_params and not work' do - mock(@w).work_with_params(:ack, @header, {:foo => 1}).once @w.do_work(@header, {:foo => 1 }, :ack, @handler) end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index c17513e2..e9e26a31 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -1,5 +1,7 @@ require 'simplecov' -SimpleCov.start +SimpleCov.start do + add_filter "/spec/" +end require 'minitest/autorun' From d4c6388dfa2473bf05ebeea11a4425d3774cdd00 Mon Sep 17 00:00:00 2001 From: Justin Mills Date: Tue, 6 May 2014 16:19:41 -0400 Subject: [PATCH 05/10] Make Maxretry Handler only retry per-queue. This required a bit of a more complicated setup, but before this change, the retry was retrying failures to the exchange which meant if you had a fanout exchange, every queue would see the retry, not just the queue that the failure occurred upon. To fix this, I created another exchange and bound the queue you're retrying on to this one (in addition to the exchange it was currently bound to). This allows us to put the retry dead lettering to it and it will ONLY send the message to the queue you're attempting retry on. Updated the example to demonstrate this more clearly. As part of this I made a couple of changes: - Add another argument to handler constructor - Use worker config when creating the handler. - Fix bug in worker where it didn't respect incoming options. This didn't look to be used anywhere. --- examples/max_retry_handler.rb | 60 ++++++++++---- lib/sneakers/handlers/maxretry.rb | 109 ++++++++++++++++---------- lib/sneakers/handlers/oneshot.rb | 2 +- lib/sneakers/queue.rb | 13 ++- lib/sneakers/worker.rb | 6 +- spec/sneakers/queue_spec.rb | 19 ++++- spec/sneakers/sneakers_spec.rb | 1 + spec/sneakers/worker_handlers_spec.rb | 78 ++++++++++-------- 8 files changed, 190 insertions(+), 98 deletions(-) diff --git a/examples/max_retry_handler.rb b/examples/max_retry_handler.rb index fbb2b585..d737cbc2 100644 --- a/examples/max_retry_handler.rb +++ b/examples/max_retry_handler.rb @@ -4,8 +4,25 @@ require 'sneakers/handlers/maxretry' require 'logger' -Sneakers.configure(:handler => Sneakers::Handlers::Maxretry, :workers => 1, :threads => 1, :prefetch => 1) -Sneakers.logger.level = Logger::INFO +Sneakers.configure(:handler => Sneakers::Handlers::Maxretry, + :workers => 1, + :threads => 1, + :prefetch => 1, + :exchange => 'sneakers', + :exchange_type => 'topic', + :routing_key => ['#', 'something'], + :durable => true, + ) +Sneakers.logger.level = Logger::DEBUG + +WORKER_OPTIONS = { + :ack => true, + :threads => 1, + :prefetch => 1, + :timeout_job_after => 60, + :heartbeat => 5, + :retry_timeout => 5000 +} # Example of how to write a retry worker. If your rabbit system is empty, then # you must run this twice. Once to setup the exchanges, queues and bindings a @@ -17,32 +34,45 @@ class MaxRetryWorker include Sneakers::Worker from_queue 'downloads', - :ack => true, - :threads => 1, - :prefetch => 1, - :timeout_job_after => 60, - :exchange => 'sneakers', - :heartbeat => 5, - :arguments => { - :'x-dead-letter-exchange' => 'sneakers-retry' - } + WORKER_OPTIONS.merge({ + :arguments => { + :'x-dead-letter-exchange' => 'downloads-retry' + }, + }) def work(msg) - - puts "Got message #{msg} and rejecting now" + logger.info("MaxRetryWorker rejecting msg: #{msg.inspect}") # We always want to reject to see if we do the proper timeout reject! + end +end + +# Example of a worker on the same exchange that does not fail, so it should only +# see the message once. +class SucceedingWorker + include Sneakers::Worker + from_queue 'uploads', + WORKER_OPTIONS.merge({ + :arguments => { + :'x-dead-letter-exchange' => 'uploads-retry' + }, + }) + def work(msg) + logger.info("SucceedingWorker succeeding on msg: #{msg.inspect}") + ack! end end messages = 1 puts "feeding messages in" messages.times { - MaxRetryWorker.enqueue("{}") + Sneakers.publish(" -- message -- ", + :to_queue => 'anywhere', + :persistence => true) } puts "done" -r = Sneakers::Runner.new([ MaxRetryWorker ]) +r = Sneakers::Runner.new([MaxRetryWorker, SucceedingWorker]) r.run diff --git a/lib/sneakers/handlers/maxretry.rb b/lib/sneakers/handlers/maxretry.rb index 83d06bf5..2079b5d2 100644 --- a/lib/sneakers/handlers/maxretry.rb +++ b/lib/sneakers/handlers/maxretry.rb @@ -4,51 +4,76 @@ module Handlers # Maxretry uses dead letter policies on Rabbitmq to requeue and retry # messages after failure (rejections, errors and timeouts). When the maximum # number of retries is reached it will put the message on an error queue. + # This handler will only retry at the queue level. To accomplish that, the + # setup is a bit complex. + # + # Input: + # worker_exchange (eXchange) + # worker_queue (Queue) + # We create: + # worker_queue-retry - (X) where we setup the worker queue to dead-letter. + # worker_queue-retry - (Q) queue bound to ^ exchange, dead-letters to + # worker_queue-retry-requeue. + # worker_queue-error - (X) where to send max-retry failures + # worker_queue-error - (Q) bound to worker_queue-error. + # worker_queue-retry-requeue - (X) exchange to bind worker_queue to for + # requeuing directly to the worker_queue. + # + # This requires that you setup arguments to the worker queue to line up the + # dead letter queue. See the example for more information. + # + # Many of these can be override with options: + # - retry_exchange - sets retry exchange & queue + # - retry_error_exchange - sets error exchange and queue + # - retry_requeue_exchange - sets the exchange created to re-queue things + # back to the worker queue. # class Maxretry - def initialize(channel, opts) + def initialize(channel, queue, opts) + @worker_queue_name = queue.name + Sneakers.logger.debug do + "Creating a Maxretry handler for queue(#{@worker_queue_name}),"\ + " opts(#{opts})" + end + @channel = channel @opts = opts - # If there is no retry exchange specified, use #{exchange}-retry - retry_name = @opts[:retryexchange] || "#{@opts[:exchange]}-retry" - - # Create the retry exchange as a durable topic so we retain original routing keys but bind the queue using a wildcard - retry_exchange = @channel.exchange( retry_name, - :type => 'topic', - :durable => 'true') - - # Create the retry queue with the same name as the retry exchange and a dead letter exchange - # The dead letter exchange is the default exchange and the ttl can be from the opts or defaults to 60 seconds - # TODO: respect @opts[:durable]? Are there cases where you want the - # retry and error exchanges to match the originating exchange? - retry_queue = @channel.queue( retry_name, - :durable => 'true', - :arguments => { - :'x-dead-letter-exchange' => @opts[:exchange], - :'x-message-ttl' => @opts[:retry_timeout] || 60000 - }) - - # Bind the retry queue to the retry topic exchange with a wildcard - retry_queue.bind(retry_exchange, :routing_key => '#') - - ## Setup the error queue - - # If there is no error exchange specified, use #{exchange}-error - error_name = @opts[:errorexchange] || "#{@opts[:exchange]}-error" - - # Create the error exchange as a durable topic so we retain original routing keys but bind the queue using a wildcard - @error_exchange = @channel.exchange(error_name, - :type => 'topic', - :durable => 'true') + # Construct names, defaulting where suitable + retry_name = @opts[:retry_exchange] || "#{@worker_queue_name}-retry" + error_name = @opts[:retry_error_exchange] || "#{@worker_queue_name}-error" + requeue_name = @opts[:retry_requeue_exchange] || "#{@worker_queue_name}-retry-requeue" + + # Create the exchanges + @retry_exchange, @error_exchange, @requeue_exchange = [retry_name, error_name, requeue_name].map do |name| + Sneakers.logger.debug { "Creating exchange #{name} for retry handler on worker queue #{@worker_queue_name}" } + @channel.exchange(name, + :type => 'topic', + :durable => opts[:durable]) + end - # Create the error queue with the same name as the error exchange - error_queue = @channel.queue( error_name, - :durable => 'true') + # Create the queues and bindings + Sneakers.logger.debug do + "Creating queue #{retry_name}, dead lettering to #{requeue_name}" + end + @retry_queue = @channel.queue(retry_name, + :durable => opts[:durable], + :arguments => { + :'x-dead-letter-exchange' => requeue_name, + :'x-message-ttl' => @opts[:retry_timeout] || 60000 + }) + @retry_queue.bind(@retry_exchange, :routing_key => '#') + + Sneakers.logger.debug do + "Creating queue #{error_name}" + end + @error_queue = @channel.queue(error_name, + :durable => opts[:durable]) + @error_queue.bind(@error_exchange, :routing_key => '#') - # Bind the error queue to the error topic exchange with a wildcard - error_queue.bind(@error_exchange, :routing_key => '#') + # Finally, bind the worker queue to our requeue exchange + queue.bind(@requeue_exchange, :routing_key => '#') @max_retries = @opts[:retry_max_times] || 5 @@ -63,8 +88,12 @@ def reject(hdr, props, msg, requeue=false) # Note to readers, the count of the x-death will increment by 2 for each # retry, once for the reject and once for the expiration from the retry # queue - if requeue || ((failure_count(props[:headers]) + 1) < @max_retries) - # We call reject which will route the message to the x-dead-letter-exchange (ie. retry exchange) on the queue + if requeue || ((failure_count(props[:headers]) + 1) <= @max_retries) + # We call reject which will route the message to the + # x-dead-letter-exchange (ie. retry exchange)on the queue + Sneakers.logger.debug do + "Retrying failure, count #{failure_count(props[:headers]) + 1}, headers #{props[:headers]}" + end unless requeue # This is only relevant if we're in the failure path. @channel.reject(hdr.delivery_tag, requeue) # TODO: metrics else @@ -100,7 +129,7 @@ def failure_count(headers) 0 else headers['x-death'].select do |x_death| - x_death['queue'] == @opts[:exchange] + x_death['queue'] == @worker_queue_name end.count end end diff --git a/lib/sneakers/handlers/oneshot.rb b/lib/sneakers/handlers/oneshot.rb index 0df27652..2194e695 100644 --- a/lib/sneakers/handlers/oneshot.rb +++ b/lib/sneakers/handlers/oneshot.rb @@ -1,7 +1,7 @@ module Sneakers module Handlers class Oneshot - def initialize(channel, opts) + def initialize(channel, queue, opts) @channel = channel @opts = opts end diff --git a/lib/sneakers/queue.rb b/lib/sneakers/queue.rb index c6f52130..825a8391 100644 --- a/lib/sneakers/queue.rb +++ b/lib/sneakers/queue.rb @@ -5,7 +5,6 @@ class Sneakers::Queue def initialize(name, opts) @name = name @opts = opts - @handler_klass = Sneakers::Config[:handler] end # @@ -26,17 +25,25 @@ def subscribe(worker) :type => @opts[:exchange_type], :durable => @opts[:durable]) - handler = @handler_klass.new(@channel, @opts) - routing_key = @opts[:routing_key] || @name routing_keys = [*routing_key] + # TODO: get the arguments from the handler? Retry handler wants this so you + # don't have to line up the queue's dead letter argument with the exchange + # you'll create for retry. queue = @channel.queue(@name, :durable => @opts[:durable], :arguments => @opts[:arguments]) routing_keys.each do |key| queue.bind(@exchange, :routing_key => key) end + # NOTE: we are using the worker's options. This is necessary so the handler + # has the same configuration as the worker. Also pass along the exchange and + # queue in case the handler requires access to them (for things like binding + # retry queues, etc). + handler_klass = worker.opts[:handler] || Sneakers::Config[:handler] + handler = handler_klass.new(@channel, queue, worker.opts) + @consumer = queue.subscribe(:block => false, :ack => @opts[:ack]) do | hdr, props, msg | worker.do_work(hdr, props, msg, handler) end diff --git a/lib/sneakers/worker.rb b/lib/sneakers/worker.rb index 1da085f6..556696fb 100644 --- a/lib/sneakers/worker.rb +++ b/lib/sneakers/worker.rb @@ -4,15 +4,15 @@ module Sneakers module Worker - attr_reader :queue, :id + attr_reader :queue, :id, :opts # For now, a worker is hardly dependant on these concerns # (because it uses methods from them directly.) include Concerns::Logging include Concerns::Metrics - def initialize(queue=nil, pool=nil, opts=nil) - opts = self.class.queue_opts + def initialize(queue = nil, pool = nil, opts = {}) + opts = opts.merge(self.class.queue_opts || {}) queue_name = self.class.queue_name opts = Sneakers::Config.merge(opts) diff --git a/spec/sneakers/queue_spec.rb b/spec/sneakers/queue_spec.rb index 15f79123..7895c652 100644 --- a/spec/sneakers/queue_spec.rb +++ b/spec/sneakers/queue_spec.rb @@ -26,6 +26,7 @@ @mkchan = Object.new @mkex = Object.new @mkqueue = Object.new + @mkworker = Object.new mock(@mkbunny).start {} mock(@mkbunny).create_channel{ @mkchan } @@ -34,6 +35,8 @@ mock(@mkchan).prefetch(25) mock(@mkchan).exchange("sneakers", :type => :direct, :durable => true){ @mkex } mock(@mkchan).queue("downloads", :durable => true){ @mkqueue } + + stub(@mkworker).opts { { :exchange => 'test-exchange' } } end it "should setup a bunny queue according to configuration values" do @@ -42,7 +45,7 @@ mock(@mkqueue).bind(@mkex, :routing_key => "downloads") mock(@mkqueue).subscribe(:block => false, :ack => true) - q.subscribe(Object.new) + q.subscribe(@mkworker) end it "supports multiple routing_keys" do @@ -53,7 +56,19 @@ mock(@mkqueue).bind(@mkex, :routing_key => "beta") mock(@mkqueue).subscribe(:block => false, :ack => true) - q.subscribe(Object.new) + q.subscribe(@mkworker) + end + + it "will use whatever handler the worker specifies" do + @handler = Object.new + worker_opts = { :handler => @handler } + stub(@mkworker).opts { worker_opts } + mock(@handler).new(@mkchan, @mkqueue, worker_opts).once + + stub(@mkqueue).bind + stub(@mkqueue).subscribe + q = Sneakers::Queue.new("downloads", queue_vars) + q.subscribe(@mkworker) end end diff --git a/spec/sneakers/sneakers_spec.rb b/spec/sneakers/sneakers_spec.rb index b7c7d5ab..b23ddc7c 100644 --- a/spec/sneakers/sneakers_spec.rb +++ b/spec/sneakers/sneakers_spec.rb @@ -23,6 +23,7 @@ def work(msg) it 'should configure itself' do Sneakers.configure Sneakers.logger.wont_be_nil + Sneakers.configured?.must_equal(true) end end diff --git a/spec/sneakers/worker_handlers_spec.rb b/spec/sneakers/worker_handlers_spec.rb index 3538b623..7176e839 100644 --- a/spec/sneakers/worker_handlers_spec.rb +++ b/spec/sneakers/worker_handlers_spec.rb @@ -30,6 +30,7 @@ def process(*args,&block) describe 'Handlers' do + let(:channel) { Object.new } let(:queue) { Object.new } let(:worker) { HandlerTestWorker.new(@queue, TestPool.new) } @@ -41,9 +42,8 @@ def process(*args,&block) describe 'Oneshot' do before(:each) do - @channel = Object.new @opts = Object.new - @handler = Sneakers::Handlers::Oneshot.new(@channel, @opts) + @handler = Sneakers::Handlers::Oneshot.new(channel, queue, @opts) @header = Object.new stub(@header).delivery_tag { 37 } @@ -51,31 +51,31 @@ def process(*args,&block) describe '#do_work' do it 'should work and handle acks' do - mock(@channel).acknowledge(37, false) + mock(channel).acknowledge(37, false) worker.do_work(@header, nil, :ack, @handler) end it 'should work and handle rejects' do - mock(@channel).reject(37, false) + mock(channel).reject(37, false) worker.do_work(@header, nil, :reject, @handler) end it 'should work and handle requeues' do - mock(@channel).reject(37, true) + mock(channel).reject(37, true) worker.do_work(@header, nil, :requeue, @handler) end it 'should work and handle user-land timeouts' do - mock(@channel).reject(37, false) + mock(channel).reject(37, false) worker.do_work(@header, nil, :timeout, @handler) end it 'should work and handle user-land error' do - mock(@channel).reject(37, false) + mock(channel).reject(37, false) worker.do_work(@header, nil, StandardError.new('boom!'), @handler) end @@ -91,38 +91,48 @@ def process(*args,&block) let(:max_retries) { nil } before(:each) do - @channel = Object.new @opts = { :exchange => 'sneakers', + :durable => 'true', }.tap do |opts| opts[:retry_max_times] = max_retries unless max_retries.nil? end + mock(queue).name { 'downloads' } + @retry_exchange = Object.new - @retry_queue = Object.new @error_exchange = Object.new + @requeue_exchange = Object.new + + @retry_queue = Object.new @error_queue = Object.new - mock(@channel).exchange('sneakers-retry', - :type => 'topic', - :durable => 'true').once { @retry_exchange } - mock(@channel).queue('sneakers-retry', - :durable => 'true', - :arguments => { - :'x-dead-letter-exchange' => 'sneakers', - :'x-message-ttl' => 60000 - } - ).once { @retry_queue } + mock(channel).exchange('downloads-retry', + :type => 'topic', + :durable => 'true').once { @retry_exchange } + mock(channel).exchange('downloads-error', + :type => 'topic', + :durable => 'true').once { @error_exchange } + mock(channel).exchange('downloads-retry-requeue', + :type => 'topic', + :durable => 'true').once { @requeue_exchange } + + mock(channel).queue('downloads-retry', + :durable => 'true', + :arguments => { + :'x-dead-letter-exchange' => 'downloads-retry-requeue', + :'x-message-ttl' => 60000 + } + ).once { @retry_queue } mock(@retry_queue).bind(@retry_exchange, :routing_key => '#') - mock(@channel).exchange('sneakers-error', - :type => 'topic', - :durable => 'true').once { @error_exchange } - mock(@channel).queue('sneakers-error', - :durable => 'true').once { @error_queue } + mock(channel).queue('downloads-error', + :durable => 'true').once { @error_queue } mock(@error_queue).bind(@error_exchange, :routing_key => '#') - @handler = Sneakers::Handlers::Maxretry.new(@channel, @opts) + mock(queue).bind(@requeue_exchange, :routing_key => '#') + + @handler = Sneakers::Handlers::Maxretry.new(channel, queue, @opts) @header = Object.new stub(@header).delivery_tag { 37 } @@ -133,14 +143,14 @@ def process(*args,&block) "x-death" => [ { "reason" => "expired", - "queue" => "sneakers-retry", + "queue" => "downloads-retry", "time" => Time.now, "exchange" => "RawMail-retry", "routing-keys" => ["RawMail"] }, { "reason" => "rejected", - "queue" => "sneakers", + "queue" => "downloads", "time" => Time.now, "exchange" => "", "routing-keys" => ["RawMail"] @@ -156,7 +166,7 @@ def process(*args,&block) describe '#do_work' do it 'should work and handle acks' do - mock(@channel).acknowledge(37, false) + mock(channel).acknowledge(37, false) worker.do_work(@header, @props, :ack, @handler) end @@ -164,7 +174,7 @@ def process(*args,&block) describe 'rejects' do describe 'more retries ahead' do it 'should work and handle rejects' do - mock(@channel).reject(37, false) + mock(channel).reject(37, false) worker.do_work(@header, @props_with_x_death, :reject, @handler) end @@ -175,7 +185,7 @@ def process(*args,&block) it 'sends the rejection to the error queue' do mock(@header).routing_key { '#' } - mock(@channel).acknowledge(37, false) + mock(channel).acknowledge(37, false) mock(@error_exchange).publish(:reject, :routing_key => '#') worker.do_work(@header, @props_with_x_death, :reject, @handler) @@ -186,7 +196,7 @@ def process(*args,&block) describe 'requeues' do it 'should work and handle requeues' do - mock(@channel).reject(37, true) + mock(channel).reject(37, true) worker.do_work(@header, @props_with_x_death, :requeue, @handler) end @@ -195,7 +205,7 @@ def process(*args,&block) let(:max_retries) { 1 } it 'continues to reject with requeue' do - mock(@channel).reject(37, true) + mock(channel).reject(37, true) worker.do_work(@header, @props_with_x_death, :requeue, @handler) end @@ -204,13 +214,13 @@ def process(*args,&block) end it 'should work and handle user-land timeouts' do - mock(@channel).reject(37, false) + mock(channel).reject(37, false) worker.do_work(@header, @props, :timeout, @handler) end it 'should work and handle user-land error' do - mock(@channel).reject(37, false) + mock(channel).reject(37, false) worker.do_work(@header, @props, StandardError.new('boom!'), @handler) end From 009a6cfc0300ac3d17727e5c016fce071bbc6717 Mon Sep 17 00:00:00 2001 From: Justin Mills Date: Tue, 10 Jun 2014 11:11:21 -0400 Subject: [PATCH 06/10] Log backtraces if present when jobs fail. This is already done on sneakers master, but by logging each line of the backtrace on a separate line. While this looks nice, it makes it very hard for log tools to get the entire backtrace by searching for lines that contain "ERROR" for instance. I'll probably merge sneakers master into our fork and undo that change so we are all sync'd up again. --- lib/sneakers/worker.rb | 6 +++++- spec/sneakers/worker_spec.rb | 10 ++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/lib/sneakers/worker.rb b/lib/sneakers/worker.rb index 556696fb..47101eb7 100644 --- a/lib/sneakers/worker.rb +++ b/lib/sneakers/worker.rb @@ -63,7 +63,11 @@ def do_work(hdr, props, msg, handler) rescue => ex res = :error error = ex - logger.error(ex) + log_msg = "error=#{ex.message.inspect} error_class=#{ex.class.name}" + unless ex.backtrace.nil? + log_msg += " backtrace=#{ex.backtrace.take(50).join("\n")}" + end + logger.error(log_msg) end if @should_ack diff --git a/spec/sneakers/worker_spec.rb b/spec/sneakers/worker_spec.rb index 459dbe45..b2ab2fe2 100644 --- a/spec/sneakers/worker_spec.rb +++ b/spec/sneakers/worker_spec.rb @@ -220,6 +220,16 @@ def with_test_queuefactory(ctx, ack=true, msg=nil, nowork=false) w.do_work(header, nil, "msg", handler) end + it "should log exceptions from workers" do + handler = Object.new + header = Object.new + w = AcksWorker.new(@queue, TestPool.new) + mock(w).work("msg").once{ raise "foo" } + mock(w.logger).error(/error="foo" error_class=RuntimeError backtrace=/) + mock(handler).error(header, nil, "msg", anything) + w.do_work(header, nil, "msg", handler) + end + it "should timeout if a work takes too long" do w = TimeoutWorker.new(@queue, TestPool.new) stub(w).work("msg"){ sleep 10 } From bf89c8624eef2a761205d3ad39c1e5b52d0a3509 Mon Sep 17 00:00:00 2001 From: Justin Mills Date: Fri, 20 Jun 2014 13:40:04 -0400 Subject: [PATCH 07/10] Merge upstream master and cleanup backtrace logging Undo the backtrace logging how it was done and do it on a single line. Also include some more information so it's easier to find all timeouts, all unexpected errors, etc. --- lib/sneakers/worker.rb | 23 +++++++++++++++++++---- spec/sneakers/worker_spec.rb | 18 ++++++++++++++++++ 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/lib/sneakers/worker.rb b/lib/sneakers/worker.rb index 1cbbcf54..389c18b4 100644 --- a/lib/sneakers/worker.rb +++ b/lib/sneakers/worker.rb @@ -59,12 +59,11 @@ def do_work(hdr, props, msg, handler) end rescue Timeout::Error res = :timeout - logger.error("timeout") + worker_error('timeout') rescue => ex res = :error error = ex - logger.error(ex) - ex.backtrace.each {|line| logger.error(line)} + worker_error('unexpected error', ex) end if @should_ack @@ -101,8 +100,24 @@ def run worker_trace "New worker: I'm alive." end + # Construct a log message with some standard prefix for this worker + def log_msg(msg) + "[#{@id}][#{Thread.current}][#{@queue.name}][#{@queue.opts}] #{msg}" + end + + # Helper to log an error message with an optional exception + def worker_error(msg, exception = nil) + s = log_msg(msg) + if exception + s += " [Exception: #{exception.class}, #{exception.message}" + s += ", backtrace:#{exception.backtrace.join(',')}" if exception.backtrace + s += "]" + end + logger.error(s) + end + def worker_trace(msg) - logger.debug "[#{@id}][#{Thread.current}][#{@queue.name}][#{@queue.opts}] #{msg}" + logger.debug(log_msg(msg)) end def self.included(base) diff --git a/spec/sneakers/worker_spec.rb b/spec/sneakers/worker_spec.rb index 459dbe45..4d3c0607 100644 --- a/spec/sneakers/worker_spec.rb +++ b/spec/sneakers/worker_spec.rb @@ -217,6 +217,7 @@ def with_test_queuefactory(ctx, ack=true, msg=nil, nowork=false) handler = Object.new header = Object.new mock(handler).error(header, nil, "msg", anything) + mock(w.logger).error(/unexpected error.*\[Exception: RuntimeError, foo, backtrace.*/) w.do_work(header, nil, "msg", handler) end @@ -228,6 +229,7 @@ def with_test_queuefactory(ctx, ack=true, msg=nil, nowork=false) header = Object.new mock(handler).timeout(header, nil, "msg") + mock(w.logger).error(/timeout/) w.do_work(header, nil, "msg", handler) end @@ -308,6 +310,22 @@ def with_test_queuefactory(ctx, ack=true, msg=nil, nowork=false) w = LoggingWorker.new(@queue, TestPool.new) w.do_work(nil,nil,'msg',nil) end + + it 'has a helper to constuct log prefix values' do + w = DummyWorker.new(@queue, TestPool.new) + w.instance_variable_set(:@id, 'worker-id') + m = w.log_msg('foo') + w.log_msg('foo').must_match(/\[worker-id\]\[#\]\[test-queue\]\[\{\}\] foo/) + end + + describe '#worker_error' do + it 'only logs backtraces if present' do + w = DummyWorker.new(@queue, TestPool.new) + mock(w.logger).error(/cuz \[Exception: RuntimeError, boom!\]/) + w.worker_error('cuz', RuntimeError.new('boom!')) + end + end + end From 23ffb32d84d01ff6e1afce3eaec19de0b8a467e1 Mon Sep 17 00:00:00 2001 From: Justin Mills Date: Fri, 20 Jun 2014 16:38:59 -0400 Subject: [PATCH 08/10] Improve logging in Maxretry handler and fix fallthrough metric in worker We needed some more insight into when a worker was retrying a job. Would be nice to have stats too, but that really requires that these handlers also have access to the worker. I think eventually we'll replace the exchange and queue with the worker as an argument. Until then, this logging should give us enough information to know how often it's happening and possibly why. Also the fallthrough case is noop when Sneakers cannot interpret the response. The metric being collected was retry which was wrong as the handler's retry method was not called and instead the noop method was. Better to keep these two in line. --- lib/sneakers/handlers/maxretry.rb | 31 ++++++++++++++++++++----------- lib/sneakers/worker.rb | 2 +- spec/sneakers/worker_spec.rb | 14 +++++++++++++- 3 files changed, 34 insertions(+), 13 deletions(-) diff --git a/lib/sneakers/handlers/maxretry.rb b/lib/sneakers/handlers/maxretry.rb index 2079b5d2..f9a9d01b 100644 --- a/lib/sneakers/handlers/maxretry.rb +++ b/lib/sneakers/handlers/maxretry.rb @@ -33,8 +33,7 @@ class Maxretry def initialize(channel, queue, opts) @worker_queue_name = queue.name Sneakers.logger.debug do - "Creating a Maxretry handler for queue(#{@worker_queue_name}),"\ - " opts(#{opts})" + "#{log_prefix} creating handler, opts=#{opts}" end @channel = channel @@ -47,7 +46,7 @@ def initialize(channel, queue, opts) # Create the exchanges @retry_exchange, @error_exchange, @requeue_exchange = [retry_name, error_name, requeue_name].map do |name| - Sneakers.logger.debug { "Creating exchange #{name} for retry handler on worker queue #{@worker_queue_name}" } + Sneakers.logger.debug { "#{log_prefix} creating exchange=#{name}" } @channel.exchange(name, :type => 'topic', :durable => opts[:durable]) @@ -55,7 +54,7 @@ def initialize(channel, queue, opts) # Create the queues and bindings Sneakers.logger.debug do - "Creating queue #{retry_name}, dead lettering to #{requeue_name}" + "#{log_prefix} creating queue=#{retry_name} x-dead-letter-exchange=#{requeue_name}" end @retry_queue = @channel.queue(retry_name, :durable => opts[:durable], @@ -66,7 +65,7 @@ def initialize(channel, queue, opts) @retry_queue.bind(@retry_exchange, :routing_key => '#') Sneakers.logger.debug do - "Creating queue #{error_name}" + "#{log_prefix} creating queue=#{error_name}" end @error_queue = @channel.queue(error_name, :durable => opts[:durable]) @@ -85,20 +84,22 @@ def acknowledge(hdr, props, msg) def reject(hdr, props, msg, requeue=false) - # Note to readers, the count of the x-death will increment by 2 for each - # retry, once for the reject and once for the expiration from the retry - # queue - if requeue || ((failure_count(props[:headers]) + 1) <= @max_retries) + # +1 for the current attempt + num_attempts = failure_count(props[:headers]) + 1 + if requeue || (num_attempts <= @max_retries) # We call reject which will route the message to the # x-dead-letter-exchange (ie. retry exchange)on the queue - Sneakers.logger.debug do - "Retrying failure, count #{failure_count(props[:headers]) + 1}, headers #{props[:headers]}" + Sneakers.logger.info do + "#{log_prefix} msg=retrying count=#{num_attempts}, headers=#{props[:headers]}" end unless requeue # This is only relevant if we're in the failure path. @channel.reject(hdr.delivery_tag, requeue) # TODO: metrics else # Retried more than the max times # Publish the original message with the routing_key to the error exchange + Sneakers.logger.info do + "#{log_prefix} msg=failing retry_count=#{num_attempts}" + end @error_exchange.publish(msg, :routing_key => hdr.routing_key) @channel.acknowledge(hdr.delivery_tag, false) # TODO: metrics @@ -134,6 +135,14 @@ def failure_count(headers) end end private :failure_count + + # Prefix all of our log messages so they are easier to find. We don't have + # the worker, so the next best thing is the queue name. + def log_prefix + "Maxretry handler [queue=#{@worker_queue_name}]" + end + private :log_prefix + end end end diff --git a/lib/sneakers/worker.rb b/lib/sneakers/worker.rb index 47101eb7..44187ef0 100644 --- a/lib/sneakers/worker.rb +++ b/lib/sneakers/worker.rb @@ -85,7 +85,7 @@ def do_work(hdr, props, msg, handler) else handler.noop(hdr, props, msg) end - metrics.increment("work.#{self.class.name}.handled.#{res || 'reject'}") + metrics.increment("work.#{self.class.name}.handled.#{res || 'noop'}") end metrics.increment("work.#{self.class.name}.ended") diff --git a/spec/sneakers/worker_spec.rb b/spec/sneakers/worker_spec.rb index b2ab2fe2..1879fead 100644 --- a/spec/sneakers/worker_spec.rb +++ b/spec/sneakers/worker_spec.rb @@ -348,7 +348,13 @@ def with_test_queuefactory(ctx, ack=true, msg=nil, nowork=false) it 'should be able to meter rejects' do mock(@w.metrics).increment("foobar").once mock(@w.metrics).increment("work.MetricsWorker.handled.reject").once - @w.do_work(@header, nil, nil, @handler) + @w.do_work(@header, nil, :reject, @handler) + end + + it 'should be able to meter requeue' do + mock(@w.metrics).increment("foobar").once + mock(@w.metrics).increment("work.MetricsWorker.handled.requeue").once + @w.do_work(@header, nil, :requeue, @handler) end it 'should be able to meter errors' do @@ -362,6 +368,12 @@ def with_test_queuefactory(ctx, ack=true, msg=nil, nowork=false) mock(@w).work('msg'){ sleep 10 } @w.do_work(@header, nil, 'msg', @handler) end + + it 'defaults to noop when no response is specified' do + mock(@w.metrics).increment("foobar").once + mock(@w.metrics).increment("work.MetricsWorker.handled.noop").once + @w.do_work(@header, nil, nil, @handler) + end end From 24a25b4d54451fb5fb9758ee68b5eac81b410272 Mon Sep 17 00:00:00 2001 From: Justin Mills Date: Mon, 23 Jun 2014 11:59:50 -0400 Subject: [PATCH 09/10] Reject a JSON payload with error details When the retry worker runs out of retries and sends a message on to the error queue, capture as much as we can so you have some hope of understanding what's going on. This will require a more complicated replay than simply moving them off this queue onto another exchange, but that seems reasonable. --- lib/sneakers/handlers/maxretry.rb | 75 +++++++++++---- spec/sneakers/worker_handlers_spec.rb | 126 +++++++++++++++++++++++++- 2 files changed, 176 insertions(+), 25 deletions(-) diff --git a/lib/sneakers/handlers/maxretry.rb b/lib/sneakers/handlers/maxretry.rb index f9a9d01b..b7e67903 100644 --- a/lib/sneakers/handlers/maxretry.rb +++ b/lib/sneakers/handlers/maxretry.rb @@ -1,3 +1,6 @@ +require 'base64' +require 'json' + module Sneakers module Handlers # @@ -82,41 +85,73 @@ def acknowledge(hdr, props, msg) @channel.acknowledge(hdr.delivery_tag, false) end - def reject(hdr, props, msg, requeue=false) + def reject(hdr, props, msg, requeue = false) + if requeue + # This was explicitly rejected specifying it be requeued so we do not + # want it to pass through our retry logic. + @channel.reject(hdr.delivery_tag, requeue) + else + handle_retry(hdr, props, msg, :reject) + end + end + + + def error(hdr, props, msg, err) + handle_retry(hdr, props, msg, err) + end + + def timeout(hdr, props, msg) + handle_retry(hdr, props, msg, :timeout) + end + + def noop(hdr, props, msg) + + end + # Helper logic for retry handling. This will reject the message if there + # are remaining retries left on it, otherwise it will publish it to the + # error exchange along with the reason. + # @param hdr [Bunny::DeliveryInfo] + # @param props [Bunny::MessageProperties] + # @param msg [String] The message + # @param reason [String, Symbol, Exception] Reason for the retry, included + # in the JSON we put on the error exchange. + def handle_retry(hdr, props, msg, reason) # +1 for the current attempt num_attempts = failure_count(props[:headers]) + 1 - if requeue || (num_attempts <= @max_retries) + if num_attempts <= @max_retries # We call reject which will route the message to the - # x-dead-letter-exchange (ie. retry exchange)on the queue + # x-dead-letter-exchange (ie. retry exchange) on the queue Sneakers.logger.info do - "#{log_prefix} msg=retrying count=#{num_attempts}, headers=#{props[:headers]}" - end unless requeue # This is only relevant if we're in the failure path. - @channel.reject(hdr.delivery_tag, requeue) + "#{log_prefix} msg=retrying, count=#{num_attempts}, headers=#{props[:headers]}" + end + @channel.reject(hdr.delivery_tag, false) # TODO: metrics else # Retried more than the max times # Publish the original message with the routing_key to the error exchange Sneakers.logger.info do - "#{log_prefix} msg=failing retry_count=#{num_attempts}" + "#{log_prefix} msg=failing, retry_count=#{num_attempts}, reason=#{reason}" end - @error_exchange.publish(msg, :routing_key => hdr.routing_key) + data = { + error: reason, + num_attempts: num_attempts, + failed_at: Time.now.iso8601, + payload: Base64.encode64(msg.to_s) + }.tap do |hash| + if reason.is_a?(Exception) + hash[:error_class] = reason.class + if reason.backtrace + hash[:backtrace] = reason.backtrace.take(10).join(', ') + end + end + end.to_json + @error_exchange.publish(data, :routing_key => hdr.routing_key) @channel.acknowledge(hdr.delivery_tag, false) # TODO: metrics end end - - def error(hdr, props, msg, err) - reject(hdr, props, msg) - end - - def timeout(hdr, props, msg) - reject(hdr, props, msg) - end - - def noop(hdr, props, msg) - - end + private :handle_retry # Uses the x-death header to determine the number of failures this job has # seen in the past. This does not count the current failure. So for diff --git a/spec/sneakers/worker_handlers_spec.rb b/spec/sneakers/worker_handlers_spec.rb index 7176e839..0c8f0450 100644 --- a/spec/sneakers/worker_handlers_spec.rb +++ b/spec/sneakers/worker_handlers_spec.rb @@ -3,6 +3,7 @@ require 'timeout' require 'sneakers/handlers/oneshot' require 'sneakers/handlers/maxretry' +require 'json' # Specific tests of the Handler implementations you can use to deal with job @@ -16,10 +17,23 @@ class HandlerTestWorker def work(msg) if msg.is_a?(StandardError) raise msg + elsif msg.is_a?(String) + hash = maybe_json(msg) + if hash.is_a?(Hash) + hash['response'].to_sym + else + hash + end else msg end end + + def maybe_json(string) + JSON.parse(string) + rescue + string + end end class TestPool @@ -165,6 +179,22 @@ def process(*args,&block) # it 'allows overriding the retry timeout' describe '#do_work' do + before do + @now = Time.now + end + + # Used to stub out the publish method args. Sadly RR doesn't support + # this, only proxying existing methods. + module MockPublish + attr_reader :data, :opts, :called + + def publish(data, opts) + @data = data + @opts = opts + @called = true + end + end + it 'should work and handle acks' do mock(channel).acknowledge(37, false) @@ -186,9 +216,16 @@ def process(*args,&block) it 'sends the rejection to the error queue' do mock(@header).routing_key { '#' } mock(channel).acknowledge(37, false) - mock(@error_exchange).publish(:reject, :routing_key => '#') + @error_exchange.extend MockPublish worker.do_work(@header, @props_with_x_death, :reject, @handler) + @error_exchange.called.must_equal(true) + @error_exchange.opts.must_equal({ :routing_key => '#' }) + data = JSON.parse(@error_exchange.data) + data['error'].must_equal('reject') + data['num_attempts'].must_equal(2) + data['payload'].must_equal(Base64.encode64(:reject.to_s)) + Time.parse(data['failed_at']).wont_be_nil end end @@ -213,10 +250,64 @@ def process(*args,&block) end - it 'should work and handle user-land timeouts' do - mock(channel).reject(37, false) + describe 'timeouts' do + describe 'more retries ahead' do + it 'should reject the message' do + mock(channel).reject(37, false) + + worker.do_work(@header, @props_with_x_death, :timeout, @handler) + end + end + + describe 'no more retries left' do + let(:max_retries) { 1 } + + it 'sends the rejection to the error queue' do + mock(@header).routing_key { '#' } + mock(channel).acknowledge(37, false) + @error_exchange.extend MockPublish + + worker.do_work(@header, @props_with_x_death, :timeout, @handler) + @error_exchange.called.must_equal(true) + @error_exchange.opts.must_equal({ :routing_key => '#' }) + data = JSON.parse(@error_exchange.data) + data['error'].must_equal('timeout') + data['num_attempts'].must_equal(2) + data['payload'].must_equal(Base64.encode64(:timeout.to_s)) + Time.parse(data['failed_at']).wont_be_nil + end + end + end + + describe 'exceptions' do + describe 'more retries ahead' do + it 'should reject the message' do + mock(channel).reject(37, false) + + worker.do_work(@header, @props_with_x_death, StandardError.new('boom!'), @handler) + end + end + + describe 'no more retries left' do + let(:max_retries) { 1 } - worker.do_work(@header, @props, :timeout, @handler) + it 'sends the rejection to the error queue' do + mock(@header).routing_key { '#' } + mock(channel).acknowledge(37, false) + @error_exchange.extend MockPublish + + worker.do_work(@header, @props_with_x_death, StandardError.new('boom!'), @handler) + @error_exchange.called.must_equal(true) + @error_exchange.opts.must_equal({ :routing_key => '#' }) + data = JSON.parse(@error_exchange.data) + data['error'].must_equal('boom!') + data['error_class'].must_equal(StandardError.to_s) + data['backtrace'].wont_be_nil + data['num_attempts'].must_equal(2) + data['payload'].must_equal(Base64.encode64('boom!')) + Time.parse(data['failed_at']).wont_be_nil + end + end end it 'should work and handle user-land error' do @@ -228,7 +319,32 @@ def process(*args,&block) it 'should work and handle noops' do worker.do_work(@header, @props, :wait, @handler) end - end + # Since we encode in json, we want to make sure if the actual payload is + # json, then it's something you can get back out. + describe 'JSON payloads' do + let(:max_retries) { 1 } + + it 'properly encodes the json payload' do + mock(@header).routing_key { '#' } + mock(channel).acknowledge(37, false) + @error_exchange.extend MockPublish + + payload = { + data: 'hello', + response: :timeout + } + worker.do_work(@header, @props_with_x_death, payload.to_json, @handler) + @error_exchange.called.must_equal(true) + @error_exchange.opts.must_equal({ :routing_key => '#' }) + data = JSON.parse(@error_exchange.data) + data['error'].must_equal('timeout') + data['num_attempts'].must_equal(2) + data['payload'].must_equal(Base64.encode64(payload.to_json)) + end + + end + + end end end From 7e7f8cc4fad9b077e19436b34017daeceda5a612 Mon Sep 17 00:00:00 2001 From: lkang Date: Wed, 23 Jul 2014 09:49:40 -0700 Subject: [PATCH 10/10] Add option :queue_durable to allow creation of a non-durable queue --- lib/sneakers/queue.rb | 3 ++- spec/sneakers/queue_spec.rb | 17 +++++++++++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/lib/sneakers/queue.rb b/lib/sneakers/queue.rb index 825a8391..b2773a43 100644 --- a/lib/sneakers/queue.rb +++ b/lib/sneakers/queue.rb @@ -31,7 +31,8 @@ def subscribe(worker) # TODO: get the arguments from the handler? Retry handler wants this so you # don't have to line up the queue's dead letter argument with the exchange # you'll create for retry. - queue = @channel.queue(@name, :durable => @opts[:durable], :arguments => @opts[:arguments]) + queue_durable = @opts[:queue_durable].nil? ? @opts[:durable] : @opts[:queue_durable] + queue = @channel.queue(@name, :durable => queue_durable, :arguments => @opts[:arguments]) routing_keys.each do |key| queue.bind(@exchange, :routing_key => key) diff --git a/spec/sneakers/queue_spec.rb b/spec/sneakers/queue_spec.rb index 7895c652..72150fec 100644 --- a/spec/sneakers/queue_spec.rb +++ b/spec/sneakers/queue_spec.rb @@ -26,6 +26,7 @@ @mkchan = Object.new @mkex = Object.new @mkqueue = Object.new + @mkqueue_nondurable = Object.new @mkworker = Object.new mock(@mkbunny).start {} @@ -34,12 +35,12 @@ mock(@mkchan).prefetch(25) mock(@mkchan).exchange("sneakers", :type => :direct, :durable => true){ @mkex } - mock(@mkchan).queue("downloads", :durable => true){ @mkqueue } stub(@mkworker).opts { { :exchange => 'test-exchange' } } end it "should setup a bunny queue according to configuration values" do + mock(@mkchan).queue("downloads", :durable => true) { @mkqueue } q = Sneakers::Queue.new("downloads", queue_vars) mock(@mkqueue).bind(@mkex, :routing_key => "downloads") @@ -49,6 +50,7 @@ end it "supports multiple routing_keys" do + mock(@mkchan).queue("downloads", :durable => true) { @mkqueue } q = Sneakers::Queue.new("downloads", queue_vars.merge(:routing_key => ["alpha", "beta"])) @@ -60,6 +62,7 @@ end it "will use whatever handler the worker specifies" do + mock(@mkchan).queue("downloads", :durable => true) { @mkqueue } @handler = Object.new worker_opts = { :handler => @handler } stub(@mkworker).opts { worker_opts } @@ -70,8 +73,18 @@ q = Sneakers::Queue.new("downloads", queue_vars) q.subscribe(@mkworker) end - end + it "creates a non-durable queue if :queue_durable => false" do + mock(@mkchan).queue("test_nondurable", :durable => false) { @mkqueue_nondurable } + queue_vars[:queue_durable] = false + q = Sneakers::Queue.new("test_nondurable", queue_vars) + + mock(@mkqueue_nondurable).bind(@mkex, :routing_key => "test_nondurable") + mock(@mkqueue_nondurable).subscribe(:block => false, :ack => true) + q.subscribe(@mkworker) + myqueue = q.instance_variable_get(:@queue) + end + end end