diff --git a/.gitignore b/.gitignore index d672b808..cd35642f 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,5 @@ sneakers.pid pkg/ coverage/ tmp/ +.ruby-version +.ruby-gemset diff --git a/examples/max_retry_handler.rb b/examples/max_retry_handler.rb new file mode 100644 index 00000000..d737cbc2 --- /dev/null +++ b/examples/max_retry_handler.rb @@ -0,0 +1,78 @@ +$: << File.expand_path('../lib', File.dirname(__FILE__)) +require 'sneakers' +require 'sneakers/runner' +require 'sneakers/handlers/maxretry' +require 'logger' + +Sneakers.configure(:handler => Sneakers::Handlers::Maxretry, + :workers => 1, + :threads => 1, + :prefetch => 1, + :exchange => 'sneakers', + :exchange_type => 'topic', + :routing_key => ['#', 'something'], + :durable => true, + ) +Sneakers.logger.level = Logger::DEBUG + +WORKER_OPTIONS = { + :ack => true, + :threads => 1, + :prefetch => 1, + :timeout_job_after => 60, + :heartbeat => 5, + :retry_timeout => 5000 +} + +# Example of how to write a retry worker. If your rabbit system is empty, then +# you must run this twice. Once to setup the exchanges, queues and bindings a +# second time to have the sent message end up on the downloads queue. +# +# Run this via: +# bundle exec ruby examples/max_retry_handler.rb +# +class MaxRetryWorker + include Sneakers::Worker + from_queue 'downloads', + WORKER_OPTIONS.merge({ + :arguments => { + :'x-dead-letter-exchange' => 'downloads-retry' + }, + }) + + def work(msg) + logger.info("MaxRetryWorker rejecting msg: #{msg.inspect}") + + # We always want to reject to see if we do the proper timeout + reject! + end +end + +# Example of a worker on the same exchange that does not fail, so it should only +# see the message once. +class SucceedingWorker + include Sneakers::Worker + from_queue 'uploads', + WORKER_OPTIONS.merge({ + :arguments => { + :'x-dead-letter-exchange' => 'uploads-retry' + }, + }) + + def work(msg) + logger.info("SucceedingWorker succeeding on msg: #{msg.inspect}") + ack! + end +end + +messages = 1 +puts "feeding messages in" +messages.times { + Sneakers.publish(" -- message -- ", + :to_queue => 'anywhere', + :persistence => true) +} +puts "done" + +r = Sneakers::Runner.new([MaxRetryWorker, SucceedingWorker]) +r.run diff --git a/lib/sneakers/handlers/maxretry.rb b/lib/sneakers/handlers/maxretry.rb new file mode 100644 index 00000000..b7e67903 --- /dev/null +++ b/lib/sneakers/handlers/maxretry.rb @@ -0,0 +1,183 @@ +require 'base64' +require 'json' + +module Sneakers + module Handlers + # + # Maxretry uses dead letter policies on Rabbitmq to requeue and retry + # messages after failure (rejections, errors and timeouts). When the maximum + # number of retries is reached it will put the message on an error queue. + # This handler will only retry at the queue level. To accomplish that, the + # setup is a bit complex. + # + # Input: + # worker_exchange (eXchange) + # worker_queue (Queue) + # We create: + # worker_queue-retry - (X) where we setup the worker queue to dead-letter. + # worker_queue-retry - (Q) queue bound to ^ exchange, dead-letters to + # worker_queue-retry-requeue. + # worker_queue-error - (X) where to send max-retry failures + # worker_queue-error - (Q) bound to worker_queue-error. + # worker_queue-retry-requeue - (X) exchange to bind worker_queue to for + # requeuing directly to the worker_queue. + # + # This requires that you setup arguments to the worker queue to line up the + # dead letter queue. See the example for more information. + # + # Many of these can be override with options: + # - retry_exchange - sets retry exchange & queue + # - retry_error_exchange - sets error exchange and queue + # - retry_requeue_exchange - sets the exchange created to re-queue things + # back to the worker queue. + # + class Maxretry + + def initialize(channel, queue, opts) + @worker_queue_name = queue.name + Sneakers.logger.debug do + "#{log_prefix} creating handler, opts=#{opts}" + end + + @channel = channel + @opts = opts + + # Construct names, defaulting where suitable + retry_name = @opts[:retry_exchange] || "#{@worker_queue_name}-retry" + error_name = @opts[:retry_error_exchange] || "#{@worker_queue_name}-error" + requeue_name = @opts[:retry_requeue_exchange] || "#{@worker_queue_name}-retry-requeue" + + # Create the exchanges + @retry_exchange, @error_exchange, @requeue_exchange = [retry_name, error_name, requeue_name].map do |name| + Sneakers.logger.debug { "#{log_prefix} creating exchange=#{name}" } + @channel.exchange(name, + :type => 'topic', + :durable => opts[:durable]) + end + + # Create the queues and bindings + Sneakers.logger.debug do + "#{log_prefix} creating queue=#{retry_name} x-dead-letter-exchange=#{requeue_name}" + end + @retry_queue = @channel.queue(retry_name, + :durable => opts[:durable], + :arguments => { + :'x-dead-letter-exchange' => requeue_name, + :'x-message-ttl' => @opts[:retry_timeout] || 60000 + }) + @retry_queue.bind(@retry_exchange, :routing_key => '#') + + Sneakers.logger.debug do + "#{log_prefix} creating queue=#{error_name}" + end + @error_queue = @channel.queue(error_name, + :durable => opts[:durable]) + @error_queue.bind(@error_exchange, :routing_key => '#') + + # Finally, bind the worker queue to our requeue exchange + queue.bind(@requeue_exchange, :routing_key => '#') + + @max_retries = @opts[:retry_max_times] || 5 + + end + + def acknowledge(hdr, props, msg) + @channel.acknowledge(hdr.delivery_tag, false) + end + + def reject(hdr, props, msg, requeue = false) + if requeue + # This was explicitly rejected specifying it be requeued so we do not + # want it to pass through our retry logic. + @channel.reject(hdr.delivery_tag, requeue) + else + handle_retry(hdr, props, msg, :reject) + end + end + + + def error(hdr, props, msg, err) + handle_retry(hdr, props, msg, err) + end + + def timeout(hdr, props, msg) + handle_retry(hdr, props, msg, :timeout) + end + + def noop(hdr, props, msg) + + end + + # Helper logic for retry handling. This will reject the message if there + # are remaining retries left on it, otherwise it will publish it to the + # error exchange along with the reason. + # @param hdr [Bunny::DeliveryInfo] + # @param props [Bunny::MessageProperties] + # @param msg [String] The message + # @param reason [String, Symbol, Exception] Reason for the retry, included + # in the JSON we put on the error exchange. + def handle_retry(hdr, props, msg, reason) + # +1 for the current attempt + num_attempts = failure_count(props[:headers]) + 1 + if num_attempts <= @max_retries + # We call reject which will route the message to the + # x-dead-letter-exchange (ie. retry exchange) on the queue + Sneakers.logger.info do + "#{log_prefix} msg=retrying, count=#{num_attempts}, headers=#{props[:headers]}" + end + @channel.reject(hdr.delivery_tag, false) + # TODO: metrics + else + # Retried more than the max times + # Publish the original message with the routing_key to the error exchange + Sneakers.logger.info do + "#{log_prefix} msg=failing, retry_count=#{num_attempts}, reason=#{reason}" + end + data = { + error: reason, + num_attempts: num_attempts, + failed_at: Time.now.iso8601, + payload: Base64.encode64(msg.to_s) + }.tap do |hash| + if reason.is_a?(Exception) + hash[:error_class] = reason.class + if reason.backtrace + hash[:backtrace] = reason.backtrace.take(10).join(', ') + end + end + end.to_json + @error_exchange.publish(data, :routing_key => hdr.routing_key) + @channel.acknowledge(hdr.delivery_tag, false) + # TODO: metrics + end + end + private :handle_retry + + # Uses the x-death header to determine the number of failures this job has + # seen in the past. This does not count the current failure. So for + # instance, the first time the job fails, this will return 0, the second + # time, 1, etc. + # @param headers [Hash] Hash of headers that Rabbit delivers as part of + # the message + # @return [Integer] Count of number of failures. + def failure_count(headers) + if headers.nil? || headers['x-death'].nil? + 0 + else + headers['x-death'].select do |x_death| + x_death['queue'] == @worker_queue_name + end.count + end + end + private :failure_count + + # Prefix all of our log messages so they are easier to find. We don't have + # the worker, so the next best thing is the queue name. + def log_prefix + "Maxretry handler [queue=#{@worker_queue_name}]" + end + private :log_prefix + + end + end +end diff --git a/lib/sneakers/handlers/oneshot.rb b/lib/sneakers/handlers/oneshot.rb index 2008267a..2194e695 100644 --- a/lib/sneakers/handlers/oneshot.rb +++ b/lib/sneakers/handlers/oneshot.rb @@ -1,27 +1,28 @@ module Sneakers module Handlers class Oneshot - def initialize(channel) + def initialize(channel, queue, opts) @channel = channel + @opts = opts end - def acknowledge(tag) - @channel.acknowledge(tag, false) + def acknowledge(hdr, props, msg) + @channel.acknowledge(hdr.delivery_tag, false) end - def reject(tag, requeue=false) - @channel.reject(tag, requeue) + def reject(hdr, props, msg, requeue=false) + @channel.reject(hdr.delivery_tag, requeue) end - def error(tag, err) - reject(tag) + def error(hdr, props, msg, err) + reject(hdr, props, msg) end - def timeout(tag) - reject(tag) + def timeout(hdr, props, msg) + reject(hdr, props, msg) end - def noop(tag) + def noop(hdr, props, msg) end end diff --git a/lib/sneakers/queue.rb b/lib/sneakers/queue.rb index fb079523..a85d4ad3 100644 --- a/lib/sneakers/queue.rb +++ b/lib/sneakers/queue.rb @@ -26,17 +26,26 @@ def subscribe(worker) :type => @opts[:exchange_type], :durable => @opts[:durable]) - handler = @handler_klass.new(@channel) - routing_key = @opts[:routing_key] || @name routing_keys = [*routing_key] - queue = @channel.queue(@name, :durable => @opts[:durable], :arguments => @opts[:arguments]) + # TODO: get the arguments from the handler? Retry handler wants this so you + # don't have to line up the queue's dead letter argument with the exchange + # you'll create for retry. + queue_durable = @opts[:queue_durable].nil? ? @opts[:durable] : @opts[:queue_durable] + queue = @channel.queue(@name, :durable => queue_durable, :arguments => @opts[:arguments]) routing_keys.each do |key| queue.bind(@exchange, :routing_key => key) end + # NOTE: we are using the worker's options. This is necessary so the handler + # has the same configuration as the worker. Also pass along the exchange and + # queue in case the handler requires access to them (for things like binding + # retry queues, etc). + handler_klass = worker.opts[:handler] || Sneakers::CONFIG[:handler] + handler = handler_klass.new(@channel, queue, worker.opts) + @consumer = queue.subscribe(:block => false, :ack => @opts[:ack]) do | delivery_info, metadata, msg | worker.do_work(delivery_info, metadata, msg, handler) end diff --git a/lib/sneakers/worker.rb b/lib/sneakers/worker.rb index 2af27490..f002b893 100644 --- a/lib/sneakers/worker.rb +++ b/lib/sneakers/worker.rb @@ -4,15 +4,15 @@ module Sneakers module Worker - attr_reader :queue, :id + attr_reader :queue, :id, :opts # For now, a worker is hardly dependant on these concerns # (because it uses methods from them directly.) include Concerns::Logging include Concerns::Metrics - def initialize(queue=nil, pool=nil, opts=nil) - opts = self.class.queue_opts + def initialize(queue = nil, pool = nil, opts = {}) + opts = opts.merge(self.class.queue_opts || {}) queue_name = self.class.queue_name opts = Sneakers::CONFIG.merge(opts) @@ -61,32 +61,30 @@ def do_work(delivery_info, metadata, msg, handler) end rescue Timeout::Error res = :timeout - logger.error("timeout") + worker_error('timeout') rescue => ex res = :error error = ex - logger.error(ex) - ex.backtrace.each {|line| logger.error(line)} + worker_error('unexpected error', ex) end if @should_ack - delivery_tag = delivery_info.delivery_tag if res == :ack # note to future-self. never acknowledge multiple (multiple=true) messages under threads. - handler.acknowledge(delivery_tag) + handler.acknowledge(delivery_info, metadata, msg) elsif res == :timeout - handler.timeout(delivery_tag) + handler.timeout(delivery_info, metadata, msg) elsif res == :error - handler.error(delivery_tag, error) + handler.error(delivery_info, metadata, msg, error) elsif res == :reject - handler.reject(delivery_tag) + handler.reject(delivery_info, metadata, msg) elsif res == :requeue - handler.reject(delivery_tag, true) + handler.reject(delivery_info, metadata, msg, true) else - handler.noop(delivery_tag) + handler.noop(delivery_info, metadata, msg) end - metrics.increment("work.#{self.class.name}.handled.#{res || 'reject'}") + metrics.increment("work.#{self.class.name}.handled.#{res || 'noop'}") end metrics.increment("work.#{self.class.name}.ended") @@ -105,8 +103,24 @@ def run worker_trace "New worker: I'm alive." end + # Construct a log message with some standard prefix for this worker + def log_msg(msg) + "[#{@id}][#{Thread.current}][#{@queue.name}][#{@queue.opts}] #{msg}" + end + + # Helper to log an error message with an optional exception + def worker_error(msg, exception = nil) + s = log_msg(msg) + if exception + s += " [Exception error=#{exception.message.inspect} error_class=#{exception.class}" + s += " backtrace=#{exception.backtrace.take(50).join(',')}" unless exception.backtrace.nil? + s += "]" + end + logger.error(s) + end + def worker_trace(msg) - logger.debug "[#{@id}][#{Thread.current}][#{@queue.name}][#{@queue.opts}] #{msg}" + logger.debug(log_msg(msg)) end def self.included(base) diff --git a/spec/sneakers/queue_spec.rb b/spec/sneakers/queue_spec.rb index 15f79123..72150fec 100644 --- a/spec/sneakers/queue_spec.rb +++ b/spec/sneakers/queue_spec.rb @@ -26,6 +26,8 @@ @mkchan = Object.new @mkex = Object.new @mkqueue = Object.new + @mkqueue_nondurable = Object.new + @mkworker = Object.new mock(@mkbunny).start {} mock(@mkbunny).create_channel{ @mkchan } @@ -33,19 +35,22 @@ mock(@mkchan).prefetch(25) mock(@mkchan).exchange("sneakers", :type => :direct, :durable => true){ @mkex } - mock(@mkchan).queue("downloads", :durable => true){ @mkqueue } + + stub(@mkworker).opts { { :exchange => 'test-exchange' } } end it "should setup a bunny queue according to configuration values" do + mock(@mkchan).queue("downloads", :durable => true) { @mkqueue } q = Sneakers::Queue.new("downloads", queue_vars) mock(@mkqueue).bind(@mkex, :routing_key => "downloads") mock(@mkqueue).subscribe(:block => false, :ack => true) - q.subscribe(Object.new) + q.subscribe(@mkworker) end it "supports multiple routing_keys" do + mock(@mkchan).queue("downloads", :durable => true) { @mkqueue } q = Sneakers::Queue.new("downloads", queue_vars.merge(:routing_key => ["alpha", "beta"])) @@ -53,10 +58,33 @@ mock(@mkqueue).bind(@mkex, :routing_key => "beta") mock(@mkqueue).subscribe(:block => false, :ack => true) - q.subscribe(Object.new) + q.subscribe(@mkworker) end - end + it "will use whatever handler the worker specifies" do + mock(@mkchan).queue("downloads", :durable => true) { @mkqueue } + @handler = Object.new + worker_opts = { :handler => @handler } + stub(@mkworker).opts { worker_opts } + mock(@handler).new(@mkchan, @mkqueue, worker_opts).once + + stub(@mkqueue).bind + stub(@mkqueue).subscribe + q = Sneakers::Queue.new("downloads", queue_vars) + q.subscribe(@mkworker) + end + it "creates a non-durable queue if :queue_durable => false" do + mock(@mkchan).queue("test_nondurable", :durable => false) { @mkqueue_nondurable } + queue_vars[:queue_durable] = false + q = Sneakers::Queue.new("test_nondurable", queue_vars) + + mock(@mkqueue_nondurable).bind(@mkex, :routing_key => "test_nondurable") + mock(@mkqueue_nondurable).subscribe(:block => false, :ack => true) + + q.subscribe(@mkworker) + myqueue = q.instance_variable_get(:@queue) + end + end end diff --git a/spec/sneakers/sneakers_spec.rb b/spec/sneakers/sneakers_spec.rb index 29fbdd1e..7a962215 100644 --- a/spec/sneakers/sneakers_spec.rb +++ b/spec/sneakers/sneakers_spec.rb @@ -23,6 +23,7 @@ def work(msg) it 'should configure itself' do Sneakers.configure Sneakers.logger.wont_be_nil + Sneakers.configured?.must_equal(true) end end diff --git a/spec/sneakers/worker_handlers_spec.rb b/spec/sneakers/worker_handlers_spec.rb new file mode 100644 index 00000000..0c8f0450 --- /dev/null +++ b/spec/sneakers/worker_handlers_spec.rb @@ -0,0 +1,350 @@ +require 'spec_helper' +require 'sneakers' +require 'timeout' +require 'sneakers/handlers/oneshot' +require 'sneakers/handlers/maxretry' +require 'json' + + +# Specific tests of the Handler implementations you can use to deal with job +# results. These tests only make sense with a worker that requires acking. + +class HandlerTestWorker + include Sneakers::Worker + from_queue 'defaults', + :ack => true + + def work(msg) + if msg.is_a?(StandardError) + raise msg + elsif msg.is_a?(String) + hash = maybe_json(msg) + if hash.is_a?(Hash) + hash['response'].to_sym + else + hash + end + else + msg + end + end + + def maybe_json(string) + JSON.parse(string) + rescue + string + end +end + +class TestPool + def process(*args,&block) + block.call + end +end + + +describe 'Handlers' do + let(:channel) { Object.new } + let(:queue) { Object.new } + let(:worker) { HandlerTestWorker.new(@queue, TestPool.new) } + + before(:each) do + Sneakers.configure(:daemonize => true, :log => 'sneakers.log') + Sneakers::Worker.configure_logger(Logger.new('/dev/null')) + Sneakers::Worker.configure_metrics + end + + describe 'Oneshot' do + before(:each) do + @opts = Object.new + @handler = Sneakers::Handlers::Oneshot.new(channel, queue, @opts) + + @header = Object.new + stub(@header).delivery_tag { 37 } + end + + describe '#do_work' do + it 'should work and handle acks' do + mock(channel).acknowledge(37, false) + + worker.do_work(@header, nil, :ack, @handler) + end + + it 'should work and handle rejects' do + mock(channel).reject(37, false) + + worker.do_work(@header, nil, :reject, @handler) + end + + it 'should work and handle requeues' do + mock(channel).reject(37, true) + + worker.do_work(@header, nil, :requeue, @handler) + end + + it 'should work and handle user-land timeouts' do + mock(channel).reject(37, false) + + worker.do_work(@header, nil, :timeout, @handler) + end + + it 'should work and handle user-land error' do + mock(channel).reject(37, false) + + worker.do_work(@header, nil, StandardError.new('boom!'), @handler) + end + + it 'should work and handle noops' do + worker.do_work(@header, nil, :wait, @handler) + end + end + + end + + describe 'Maxretry' do + let(:max_retries) { nil } + + before(:each) do + @opts = { + :exchange => 'sneakers', + :durable => 'true', + }.tap do |opts| + opts[:retry_max_times] = max_retries unless max_retries.nil? + end + + mock(queue).name { 'downloads' } + + @retry_exchange = Object.new + @error_exchange = Object.new + @requeue_exchange = Object.new + + @retry_queue = Object.new + @error_queue = Object.new + + mock(channel).exchange('downloads-retry', + :type => 'topic', + :durable => 'true').once { @retry_exchange } + mock(channel).exchange('downloads-error', + :type => 'topic', + :durable => 'true').once { @error_exchange } + mock(channel).exchange('downloads-retry-requeue', + :type => 'topic', + :durable => 'true').once { @requeue_exchange } + + mock(channel).queue('downloads-retry', + :durable => 'true', + :arguments => { + :'x-dead-letter-exchange' => 'downloads-retry-requeue', + :'x-message-ttl' => 60000 + } + ).once { @retry_queue } + mock(@retry_queue).bind(@retry_exchange, :routing_key => '#') + + mock(channel).queue('downloads-error', + :durable => 'true').once { @error_queue } + mock(@error_queue).bind(@error_exchange, :routing_key => '#') + + mock(queue).bind(@requeue_exchange, :routing_key => '#') + + @handler = Sneakers::Handlers::Maxretry.new(channel, queue, @opts) + + @header = Object.new + stub(@header).delivery_tag { 37 } + + @props = {} + @props_with_x_death = { + :headers => { + "x-death" => [ + { + "reason" => "expired", + "queue" => "downloads-retry", + "time" => Time.now, + "exchange" => "RawMail-retry", + "routing-keys" => ["RawMail"] + }, + { + "reason" => "rejected", + "queue" => "downloads", + "time" => Time.now, + "exchange" => "", + "routing-keys" => ["RawMail"] + } + ] + }, + :delivery_mode => 1} + end + + # it 'allows overriding the retry exchange name' + # it 'allows overriding the error exchange name' + # it 'allows overriding the retry timeout' + + describe '#do_work' do + before do + @now = Time.now + end + + # Used to stub out the publish method args. Sadly RR doesn't support + # this, only proxying existing methods. + module MockPublish + attr_reader :data, :opts, :called + + def publish(data, opts) + @data = data + @opts = opts + @called = true + end + end + + it 'should work and handle acks' do + mock(channel).acknowledge(37, false) + + worker.do_work(@header, @props, :ack, @handler) + end + + describe 'rejects' do + describe 'more retries ahead' do + it 'should work and handle rejects' do + mock(channel).reject(37, false) + + worker.do_work(@header, @props_with_x_death, :reject, @handler) + end + end + + describe 'no more retries' do + let(:max_retries) { 1 } + + it 'sends the rejection to the error queue' do + mock(@header).routing_key { '#' } + mock(channel).acknowledge(37, false) + + @error_exchange.extend MockPublish + worker.do_work(@header, @props_with_x_death, :reject, @handler) + @error_exchange.called.must_equal(true) + @error_exchange.opts.must_equal({ :routing_key => '#' }) + data = JSON.parse(@error_exchange.data) + data['error'].must_equal('reject') + data['num_attempts'].must_equal(2) + data['payload'].must_equal(Base64.encode64(:reject.to_s)) + Time.parse(data['failed_at']).wont_be_nil + end + + end + end + + describe 'requeues' do + it 'should work and handle requeues' do + mock(channel).reject(37, true) + + worker.do_work(@header, @props_with_x_death, :requeue, @handler) + end + + describe 'no more retries left' do + let(:max_retries) { 1 } + + it 'continues to reject with requeue' do + mock(channel).reject(37, true) + + worker.do_work(@header, @props_with_x_death, :requeue, @handler) + end + end + + end + + describe 'timeouts' do + describe 'more retries ahead' do + it 'should reject the message' do + mock(channel).reject(37, false) + + worker.do_work(@header, @props_with_x_death, :timeout, @handler) + end + end + + describe 'no more retries left' do + let(:max_retries) { 1 } + + it 'sends the rejection to the error queue' do + mock(@header).routing_key { '#' } + mock(channel).acknowledge(37, false) + @error_exchange.extend MockPublish + + worker.do_work(@header, @props_with_x_death, :timeout, @handler) + @error_exchange.called.must_equal(true) + @error_exchange.opts.must_equal({ :routing_key => '#' }) + data = JSON.parse(@error_exchange.data) + data['error'].must_equal('timeout') + data['num_attempts'].must_equal(2) + data['payload'].must_equal(Base64.encode64(:timeout.to_s)) + Time.parse(data['failed_at']).wont_be_nil + end + end + end + + describe 'exceptions' do + describe 'more retries ahead' do + it 'should reject the message' do + mock(channel).reject(37, false) + + worker.do_work(@header, @props_with_x_death, StandardError.new('boom!'), @handler) + end + end + + describe 'no more retries left' do + let(:max_retries) { 1 } + + it 'sends the rejection to the error queue' do + mock(@header).routing_key { '#' } + mock(channel).acknowledge(37, false) + @error_exchange.extend MockPublish + + worker.do_work(@header, @props_with_x_death, StandardError.new('boom!'), @handler) + @error_exchange.called.must_equal(true) + @error_exchange.opts.must_equal({ :routing_key => '#' }) + data = JSON.parse(@error_exchange.data) + data['error'].must_equal('boom!') + data['error_class'].must_equal(StandardError.to_s) + data['backtrace'].wont_be_nil + data['num_attempts'].must_equal(2) + data['payload'].must_equal(Base64.encode64('boom!')) + Time.parse(data['failed_at']).wont_be_nil + end + end + end + + it 'should work and handle user-land error' do + mock(channel).reject(37, false) + + worker.do_work(@header, @props, StandardError.new('boom!'), @handler) + end + + it 'should work and handle noops' do + worker.do_work(@header, @props, :wait, @handler) + end + + # Since we encode in json, we want to make sure if the actual payload is + # json, then it's something you can get back out. + describe 'JSON payloads' do + let(:max_retries) { 1 } + + it 'properly encodes the json payload' do + mock(@header).routing_key { '#' } + mock(channel).acknowledge(37, false) + @error_exchange.extend MockPublish + + payload = { + data: 'hello', + response: :timeout + } + worker.do_work(@header, @props_with_x_death, payload.to_json, @handler) + @error_exchange.called.must_equal(true) + @error_exchange.opts.must_equal({ :routing_key => '#' }) + data = JSON.parse(@error_exchange.data) + data['error'].must_equal('timeout') + data['num_attempts'].must_equal(2) + data['payload'].must_equal(Base64.encode64(payload.to_json)) + end + + end + + end + end +end diff --git a/spec/sneakers/worker_spec.rb b/spec/sneakers/worker_spec.rb index 8517bdeb..3c56c144 100644 --- a/spec/sneakers/worker_spec.rb +++ b/spec/sneakers/worker_spec.rb @@ -108,13 +108,6 @@ def process(*args,&block) end end -class TestHandler - def acknowledge(tag); end - def reject(tag); end - def error(tag, err); end - def timeout(tag); end -end - def with_test_queuefactory(ctx, ack=true, msg=nil, nowork=false) qf = Object.new q = Object.new @@ -139,7 +132,6 @@ def with_test_queuefactory(ctx, ack=true, msg=nil, nowork=false) stub(@queue).exchange { @exchange } Sneakers.configure(:daemonize => true, :log => 'sneakers.log') - Sneakers::Worker.configure_logger(Logger.new('/dev/null')) Sneakers::Worker.configure_metrics end @@ -222,9 +214,19 @@ def with_test_queuefactory(ctx, ack=true, msg=nil, nowork=false) w = AcksWorker.new(@queue, TestPool.new) mock(w).work("msg").once{ raise "foo" } handler = Object.new - mock(handler).error("tag", anything) header = Object.new - stub(header).delivery_tag { "tag" } + mock(handler).error(header, nil, "msg", anything) + mock(w.logger).error(/unexpected error \[Exception error="foo" error_class=RuntimeError backtrace=.*/) + w.do_work(header, nil, "msg", handler) + end + + it "should log exceptions from workers" do + handler = Object.new + header = Object.new + w = AcksWorker.new(@queue, TestPool.new) + mock(w).work("msg").once{ raise "foo" } + mock(w.logger).error(/error="foo" error_class=RuntimeError backtrace=/) + mock(handler).error(header, nil, "msg", anything) w.do_work(header, nil, "msg", handler) end @@ -233,10 +235,10 @@ def with_test_queuefactory(ctx, ack=true, msg=nil, nowork=false) stub(w).work("msg"){ sleep 10 } handler = Object.new - mock(handler).timeout("tag") - header = Object.new - stub(header).delivery_tag { "tag" } + + mock(handler).timeout(header, nil, "msg") + mock(w.logger).error(/timeout/) w.do_work(header, nil, "msg", handler) end @@ -251,35 +253,35 @@ def with_test_queuefactory(ctx, ack=true, msg=nil, nowork=false) it "should work and handle acks" do handler = Object.new - mock(handler).acknowledge("tag") + mock(handler).acknowledge(@delivery_info, nil, :ack) @worker.do_work(@delivery_info, nil, :ack, handler) end it "should work and handle rejects" do handler = Object.new - mock(handler).reject("tag") + mock(handler).reject(@delivery_info, nil, :reject) @worker.do_work(@delivery_info, nil, :reject, handler) end it "should work and handle requeues" do handler = Object.new - mock(handler).reject("tag", true) + mock(handler).reject(@delivery_info, nil, :requeue, true) @worker.do_work(@delivery_info, nil, :requeue, handler) end it "should work and handle user-land timeouts" do handler = Object.new - mock(handler).timeout("tag") + mock(handler).timeout(@delivery_info, nil, :timeout) @worker.do_work(@delivery_info, nil, :timeout, handler) end it "should work and handle user-land error" do handler = Object.new - mock(handler).error("tag",anything) + mock(handler).error(@delivery_info, nil, :error, anything) @worker.do_work(@delivery_info, nil, :error, handler) end @@ -323,17 +325,36 @@ def with_test_queuefactory(ctx, ack=true, msg=nil, nowork=false) w = LoggingWorker.new(@queue, TestPool.new) w.do_work(nil,nil,'msg',nil) end + + it 'has a helper to constuct log prefix values' do + w = DummyWorker.new(@queue, TestPool.new) + w.instance_variable_set(:@id, 'worker-id') + m = w.log_msg('foo') + w.log_msg('foo').must_match(/\[worker-id\]\[#\]\[test-queue\]\[\{\}\] foo/) + end + + describe '#worker_error' do + it 'only logs backtraces if present' do + w = DummyWorker.new(@queue, TestPool.new) + mock(w.logger).error(/cuz \[Exception error="boom!" error_class=RuntimeError\]/) + w.worker_error('cuz', RuntimeError.new('boom!')) + end + end + end describe 'Metrics' do before do @handler = Object.new - stub(@handler).acknowledge("tag") - stub(@handler).reject("tag") - stub(@handler).timeout("tag") - stub(@handler).error("tag", anything) - stub(@handler).noop("tag") + @header = Object.new + + # We don't care how these are called, we're focusing on metrics here. + stub(@handler).acknowledge + stub(@handler).reject + stub(@handler).timeout + stub(@handler).error + stub(@handler).noop @delivery_info = Object.new stub(@delivery_info).delivery_tag { "tag" } @@ -353,7 +374,13 @@ def with_test_queuefactory(ctx, ack=true, msg=nil, nowork=false) it 'should be able to meter rejects' do mock(@w.metrics).increment("foobar").once mock(@w.metrics).increment("work.MetricsWorker.handled.reject").once - @w.do_work(@delivery_info, nil, nil, @handler) + @w.do_work(@header, nil, :reject, @handler) + end + + it 'should be able to meter requeue' do + mock(@w.metrics).increment("foobar").once + mock(@w.metrics).increment("work.MetricsWorker.handled.requeue").once + @w.do_work(@header, nil, :requeue, @handler) end it 'should be able to meter errors' do @@ -367,21 +394,25 @@ def with_test_queuefactory(ctx, ack=true, msg=nil, nowork=false) mock(@w).work('msg'){ sleep 10 } @w.do_work(@delivery_info, nil, 'msg', @handler) end + + it 'defaults to noop when no response is specified' do + mock(@w.metrics).increment("foobar").once + mock(@w.metrics).increment("work.MetricsWorker.handled.noop").once + @w.do_work(@header, nil, nil, @handler) + end end describe 'With Params' do before do + @props = { :foo => 1 } @handler = Object.new - stub(@handler).acknowledge("tag") - stub(@handler).reject("tag") - stub(@handler).timeout("tag") - stub(@handler).error("tag", anything) - stub(@handler).noop("tag") + @header = Object.new @delivery_info = Object.new - stub(@delivery_info).delivery_tag { "tag" } + + stub(@handler).noop(@delivery_info, {:foo => 1}, :ack) @w = WithParamsWorker.new(@queue, TestPool.new) mock(@w.metrics).timing("work.WithParamsWorker.time").yields.once diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index c17513e2..e9e26a31 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -1,5 +1,7 @@ require 'simplecov' -SimpleCov.start +SimpleCov.start do + add_filter "/spec/" +end require 'minitest/autorun'