Add support for retryable errors through Qless::RetryExceptions mixin. #63

Merged
merged 4 commits into from Jan 7, 2013
@@ -0,0 +1,11 @@
+module Qless
+ module RetryExceptions
+ def retryable_exception_classes
+ @retryable_exception_classes ||= []
+ end
+
+ def retry_on(*exception_classes)
+ self.retryable_exception_classes.push(*exception_classes)
+ end
+ end
+end
View
@@ -99,6 +99,8 @@ def work(interval = 5.0)
def perform(job)
around_perform(job)
+ rescue *retryable_exception_classes(job)
+ job.retry
rescue Exception => error
fail_job(job, error)
else
@@ -135,6 +137,11 @@ def unpause_processing
private
+ def retryable_exception_classes(job)
+ return [] unless job.klass.respond_to?(:retryable_exception_classes)
+ job.klass.retryable_exception_classes
+ end
+
# Allow middleware modules to be mixed in and override the
# definition of around_perform while providing a default
# implementation so our code can assume the method is present.
@@ -3,10 +3,23 @@
require 'yaml'
require 'qless/worker'
require 'qless'
+require 'qless/retry_exceptions'
class WorkerIntegrationJob
def self.perform(job)
- Redis.connect(:url => job['redis_url']).rpush('worker_integration_job', job['word'])
+ Redis.connect(url: job['redis_url']).rpush('worker_integration_job', job['word'])
+ end
+end
+
+class RetryIntegrationJob
+ extend Qless::RetryExceptions
+
+ Kaboom = Class.new(StandardError)
+ retry_on Kaboom
+
+ def self.perform(job)
+ Redis.connect(url: job['redis_url']).incr('retry_integration_job_count')
+ raise Kaboom
end
end
@@ -41,5 +54,22 @@ def start_worker(run_as_single_process)
it_behaves_like 'a running worker'
it_behaves_like 'a running worker', '1'
+
+ it 'will retry and eventually fail a repeatedly failing job' do
+ queue = client.queues["main"]
+ jid = queue.put(RetryIntegrationJob, {}, retries: 10)
+ Qless::Worker.new(
+ client,
+ Qless::JobReservers::RoundRobin.new([queue]),
+ run_as_a_single_process: true
+ ).work(0)
+
+ job = client.jobs[jid]
+
+ job.state.should eq('failed')
+ job.retries_left.should eq(-1)
+ job.original_retries.should eq(10)
+ client.redis.get('retry_integration_job_count').should eq('11')
+ end
end
@@ -0,0 +1,22 @@
+require 'qless/retry_exceptions'
+
+module Qless
+ describe RetryExceptions do
+ let(:job_class) { Class.new }
+ let(:exception_class) { Class.new(StandardError) }
+
+ before do
+ job_class.extend(RetryExceptions)
+ end
+
+ it 'defines a retryable_exceptions method that returns an empty array by default' do
+ job_class.retryable_exception_classes.should be_empty
+ end
+
+ it 'defines a retry_on method that makes exception types retryable' do
+ job_class.retry_on(exception_class)
+
+ job_class.retryable_exception_classes.should eq([exception_class])
+ end
+ end
+end
View
@@ -62,6 +62,30 @@ class MyJobClass; end
worker.perform(job)
end
+ it 'fails the job if performing it raises a non-retryable error' do
+ MyJobClass.stub(:retryable_exception_classes).and_return([])
+ MyJobClass.stub(:perform) { raise Exception.new("boom") }
+ expected_line_number = __LINE__ - 1
+ job.should respond_to(:fail).with(2).arguments
+
+ job.should_receive(:fail) do |group, message|
+ group.should eq("Qless::MyJobClass:Exception")
+ message.should include("boom")
+ message.should include("#{__FILE__}:#{expected_line_number}")
+ end
+
+ worker.perform(job)
+ end
+
+ it 'retries the job if performing it raises a retryable error' do
+ MyJobClass.stub(:retryable_exception_classes).and_return([ArgumentError])
+ MyJobClass.stub(:perform) { raise ArgumentError.new("boom") }
+
+ job.should_receive(:retry).with(no_args)
+
+ worker.perform(job)
+ end
+
it 'completes the job if it finishes with no errors' do
MyJobClass.stub(:perform)
job.should respond_to(:complete).with(0).arguments