Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry Handler #73

Merged
merged 22 commits into from Nov 9, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
9b73a11
Initial format for message retry handler.
imothee Mar 13, 2014
c4fe049
Commit of example, retry_handler and some other logic to handle the r…
imothee Mar 13, 2014
d484520
Changes to queue to pass opts into handlers, worker to use hdr not hd…
imothee Mar 14, 2014
8c7aaf5
Merge branch 'maxretry' of https://github.com/otherlevels/sneakers in…
May 3, 2014
25939c0
Maxretry Handler
May 4, 2014
1b01c0f
Merge pull request #1 from Yesware/yw-maxretry
May 5, 2014
d4c6388
Make Maxretry Handler only retry per-queue.
May 6, 2014
15d4449
Merge pull request #2 from Yesware/yw-make-maxretry-per-queue
May 7, 2014
009a6cf
Log backtraces if present when jobs fail.
Jun 10, 2014
ab2cdde
Merge pull request #3 from Yesware/log-backtrace
Jun 10, 2014
9b24c8d
Merge branch 'master' into merge-upstream-master
Jun 20, 2014
bf89c86
Merge upstream master and cleanup backtrace logging
Jun 20, 2014
e40eca4
Merge branch 'yw-master' into merge-upstream-master
Jun 20, 2014
23ffb32
Improve logging in Maxretry handler and fix fallthrough metric in worker
Jun 20, 2014
26688e5
Merge pull request #4 from Yesware/merge-upstream-master
Jun 23, 2014
f87cc5b
Merge pull request #5 from Yesware/improve-logging-in-maxretry-handler
Jun 23, 2014
24a25b4
Reject a JSON payload with error details
Jun 23, 2014
5ea4b49
Merge pull request #6 from Yesware/reject-json-with-details
Jun 25, 2014
7e7f8cc
Add option :queue_durable to allow creation of a non-durable queue
Jul 23, 2014
72ccf98
Merge pull request #7 from Yesware/option_for_nondurable_queue
Jul 23, 2014
a2e5df6
Merge branch 'master' into merge-upstream
Nov 5, 2014
5f37a25
Merge pull request #8 from Yesware/merge-upstream
Nov 6, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Expand Up @@ -5,3 +5,5 @@ sneakers.pid
pkg/
coverage/
tmp/
.ruby-version
.ruby-gemset
78 changes: 78 additions & 0 deletions examples/max_retry_handler.rb
@@ -0,0 +1,78 @@
$: << File.expand_path('../lib', File.dirname(__FILE__))
require 'sneakers'
require 'sneakers/runner'
require 'sneakers/handlers/maxretry'
require 'logger'

Sneakers.configure(:handler => Sneakers::Handlers::Maxretry,
:workers => 1,
:threads => 1,
:prefetch => 1,
:exchange => 'sneakers',
:exchange_type => 'topic',
:routing_key => ['#', 'something'],
:durable => true,
)
Sneakers.logger.level = Logger::DEBUG

WORKER_OPTIONS = {
:ack => true,
:threads => 1,
:prefetch => 1,
:timeout_job_after => 60,
:heartbeat => 5,
:retry_timeout => 5000
}

# Example of how to write a retry worker. If your rabbit system is empty, then
# you must run this twice. Once to setup the exchanges, queues and bindings a
# second time to have the sent message end up on the downloads queue.
#
# Run this via:
# bundle exec ruby examples/max_retry_handler.rb
#
class MaxRetryWorker
include Sneakers::Worker
from_queue 'downloads',
WORKER_OPTIONS.merge({
:arguments => {
:'x-dead-letter-exchange' => 'downloads-retry'
},
})

def work(msg)
logger.info("MaxRetryWorker rejecting msg: #{msg.inspect}")

# We always want to reject to see if we do the proper timeout
reject!
end
end

# Example of a worker on the same exchange that does not fail, so it should only
# see the message once.
class SucceedingWorker
include Sneakers::Worker
from_queue 'uploads',
WORKER_OPTIONS.merge({
:arguments => {
:'x-dead-letter-exchange' => 'uploads-retry'
},
})

def work(msg)
logger.info("SucceedingWorker succeeding on msg: #{msg.inspect}")
ack!
end
end

messages = 1
puts "feeding messages in"
messages.times {
Sneakers.publish(" -- message -- ",
:to_queue => 'anywhere',
:persistence => true)
}
puts "done"

r = Sneakers::Runner.new([MaxRetryWorker, SucceedingWorker])
r.run
183 changes: 183 additions & 0 deletions lib/sneakers/handlers/maxretry.rb
@@ -0,0 +1,183 @@
require 'base64'
require 'json'

module Sneakers
module Handlers
#
# Maxretry uses dead letter policies on Rabbitmq to requeue and retry
# messages after failure (rejections, errors and timeouts). When the maximum
# number of retries is reached it will put the message on an error queue.
# This handler will only retry at the queue level. To accomplish that, the
# setup is a bit complex.
#
# Input:
# worker_exchange (eXchange)
# worker_queue (Queue)
# We create:
# worker_queue-retry - (X) where we setup the worker queue to dead-letter.
# worker_queue-retry - (Q) queue bound to ^ exchange, dead-letters to
# worker_queue-retry-requeue.
# worker_queue-error - (X) where to send max-retry failures
# worker_queue-error - (Q) bound to worker_queue-error.
# worker_queue-retry-requeue - (X) exchange to bind worker_queue to for
# requeuing directly to the worker_queue.
#
# This requires that you setup arguments to the worker queue to line up the
# dead letter queue. See the example for more information.
#
# Many of these can be override with options:
# - retry_exchange - sets retry exchange & queue
# - retry_error_exchange - sets error exchange and queue
# - retry_requeue_exchange - sets the exchange created to re-queue things
# back to the worker queue.
#
class Maxretry

def initialize(channel, queue, opts)
@worker_queue_name = queue.name
Sneakers.logger.debug do
"#{log_prefix} creating handler, opts=#{opts}"
end

@channel = channel
@opts = opts

# Construct names, defaulting where suitable
retry_name = @opts[:retry_exchange] || "#{@worker_queue_name}-retry"
error_name = @opts[:retry_error_exchange] || "#{@worker_queue_name}-error"
requeue_name = @opts[:retry_requeue_exchange] || "#{@worker_queue_name}-retry-requeue"

# Create the exchanges
@retry_exchange, @error_exchange, @requeue_exchange = [retry_name, error_name, requeue_name].map do |name|
Sneakers.logger.debug { "#{log_prefix} creating exchange=#{name}" }
@channel.exchange(name,
:type => 'topic',
:durable => opts[:durable])
end

# Create the queues and bindings
Sneakers.logger.debug do
"#{log_prefix} creating queue=#{retry_name} x-dead-letter-exchange=#{requeue_name}"
end
@retry_queue = @channel.queue(retry_name,
:durable => opts[:durable],
:arguments => {
:'x-dead-letter-exchange' => requeue_name,
:'x-message-ttl' => @opts[:retry_timeout] || 60000
})
@retry_queue.bind(@retry_exchange, :routing_key => '#')

Sneakers.logger.debug do
"#{log_prefix} creating queue=#{error_name}"
end
@error_queue = @channel.queue(error_name,
:durable => opts[:durable])
@error_queue.bind(@error_exchange, :routing_key => '#')

# Finally, bind the worker queue to our requeue exchange
queue.bind(@requeue_exchange, :routing_key => '#')

@max_retries = @opts[:retry_max_times] || 5

end

def acknowledge(hdr, props, msg)
@channel.acknowledge(hdr.delivery_tag, false)
end

def reject(hdr, props, msg, requeue = false)
if requeue
# This was explicitly rejected specifying it be requeued so we do not
# want it to pass through our retry logic.
@channel.reject(hdr.delivery_tag, requeue)
else
handle_retry(hdr, props, msg, :reject)
end
end


def error(hdr, props, msg, err)
handle_retry(hdr, props, msg, err)
end

def timeout(hdr, props, msg)
handle_retry(hdr, props, msg, :timeout)
end

def noop(hdr, props, msg)

end

# Helper logic for retry handling. This will reject the message if there
# are remaining retries left on it, otherwise it will publish it to the
# error exchange along with the reason.
# @param hdr [Bunny::DeliveryInfo]
# @param props [Bunny::MessageProperties]
# @param msg [String] The message
# @param reason [String, Symbol, Exception] Reason for the retry, included
# in the JSON we put on the error exchange.
def handle_retry(hdr, props, msg, reason)
# +1 for the current attempt
num_attempts = failure_count(props[:headers]) + 1
if num_attempts <= @max_retries
# We call reject which will route the message to the
# x-dead-letter-exchange (ie. retry exchange) on the queue
Sneakers.logger.info do
"#{log_prefix} msg=retrying, count=#{num_attempts}, headers=#{props[:headers]}"
end
@channel.reject(hdr.delivery_tag, false)
# TODO: metrics
else
# Retried more than the max times
# Publish the original message with the routing_key to the error exchange
Sneakers.logger.info do
"#{log_prefix} msg=failing, retry_count=#{num_attempts}, reason=#{reason}"
end
data = {
error: reason,
num_attempts: num_attempts,
failed_at: Time.now.iso8601,
payload: Base64.encode64(msg.to_s)
}.tap do |hash|
if reason.is_a?(Exception)
hash[:error_class] = reason.class
if reason.backtrace
hash[:backtrace] = reason.backtrace.take(10).join(', ')
end
end
end.to_json
@error_exchange.publish(data, :routing_key => hdr.routing_key)
@channel.acknowledge(hdr.delivery_tag, false)
# TODO: metrics
end
end
private :handle_retry

# Uses the x-death header to determine the number of failures this job has
# seen in the past. This does not count the current failure. So for
# instance, the first time the job fails, this will return 0, the second
# time, 1, etc.
# @param headers [Hash] Hash of headers that Rabbit delivers as part of
# the message
# @return [Integer] Count of number of failures.
def failure_count(headers)
if headers.nil? || headers['x-death'].nil?
0
else
headers['x-death'].select do |x_death|
x_death['queue'] == @worker_queue_name
end.count
end
end
private :failure_count

# Prefix all of our log messages so they are easier to find. We don't have
# the worker, so the next best thing is the queue name.
def log_prefix
"Maxretry handler [queue=#{@worker_queue_name}]"
end
private :log_prefix

end
end
end
21 changes: 11 additions & 10 deletions lib/sneakers/handlers/oneshot.rb
@@ -1,27 +1,28 @@
module Sneakers
module Handlers
class Oneshot
def initialize(channel)
def initialize(channel, queue, opts)
@channel = channel
@opts = opts
end

def acknowledge(tag)
@channel.acknowledge(tag, false)
def acknowledge(hdr, props, msg)
@channel.acknowledge(hdr.delivery_tag, false)
end

def reject(tag, requeue=false)
@channel.reject(tag, requeue)
def reject(hdr, props, msg, requeue=false)
@channel.reject(hdr.delivery_tag, requeue)
end

def error(tag, err)
reject(tag)
def error(hdr, props, msg, err)
reject(hdr, props, msg)
end

def timeout(tag)
reject(tag)
def timeout(hdr, props, msg)
reject(hdr, props, msg)
end

def noop(tag)
def noop(hdr, props, msg)

end
end
Expand Down
15 changes: 12 additions & 3 deletions lib/sneakers/queue.rb
Expand Up @@ -26,17 +26,26 @@ def subscribe(worker)
:type => @opts[:exchange_type],
:durable => @opts[:durable])

handler = @handler_klass.new(@channel)

routing_key = @opts[:routing_key] || @name
routing_keys = [*routing_key]

queue = @channel.queue(@name, :durable => @opts[:durable], :arguments => @opts[:arguments])
# TODO: get the arguments from the handler? Retry handler wants this so you
# don't have to line up the queue's dead letter argument with the exchange
# you'll create for retry.
queue_durable = @opts[:queue_durable].nil? ? @opts[:durable] : @opts[:queue_durable]
queue = @channel.queue(@name, :durable => queue_durable, :arguments => @opts[:arguments])

routing_keys.each do |key|
queue.bind(@exchange, :routing_key => key)
end

# NOTE: we are using the worker's options. This is necessary so the handler
# has the same configuration as the worker. Also pass along the exchange and
# queue in case the handler requires access to them (for things like binding
# retry queues, etc).
handler_klass = worker.opts[:handler] || Sneakers::CONFIG[:handler]
handler = handler_klass.new(@channel, queue, worker.opts)

@consumer = queue.subscribe(:block => false, :ack => @opts[:ack]) do | delivery_info, metadata, msg |
worker.do_work(delivery_info, metadata, msg, handler)
end
Expand Down