diff --git a/lib/qless/retry_exceptions.rb b/lib/qless/retry_exceptions.rb new file mode 100644 index 00000000..77e14419 --- /dev/null +++ b/lib/qless/retry_exceptions.rb @@ -0,0 +1,11 @@ +module Qless + module RetryExceptions + def retryable_exception_classes + @retryable_exception_classes ||= [] + end + + def retry_on(*exception_classes) + self.retryable_exception_classes.push(*exception_classes) + end + end +end diff --git a/lib/qless/worker.rb b/lib/qless/worker.rb index e66f2443..8395d668 100644 --- a/lib/qless/worker.rb +++ b/lib/qless/worker.rb @@ -99,6 +99,8 @@ def work(interval = 5.0) def perform(job) around_perform(job) + rescue *retryable_exception_classes(job) + job.retry rescue Exception => error fail_job(job, error) else @@ -135,6 +137,11 @@ def unpause_processing private + def retryable_exception_classes(job) + return [] unless job.klass.respond_to?(:retryable_exception_classes) + job.klass.retryable_exception_classes + end + # Allow middleware modules to be mixed in and override the # definition of around_perform while providing a default # implementation so our code can assume the method is present. diff --git a/spec/integration/worker_spec.rb b/spec/integration/worker_spec.rb index 2aba6bc7..b73a38fe 100644 --- a/spec/integration/worker_spec.rb +++ b/spec/integration/worker_spec.rb @@ -3,10 +3,23 @@ require 'yaml' require 'qless/worker' require 'qless' +require 'qless/retry_exceptions' class WorkerIntegrationJob def self.perform(job) - Redis.connect(:url => job['redis_url']).rpush('worker_integration_job', job['word']) + Redis.connect(url: job['redis_url']).rpush('worker_integration_job', job['word']) + end +end + +class RetryIntegrationJob + extend Qless::RetryExceptions + + Kaboom = Class.new(StandardError) + retry_on Kaboom + + def self.perform(job) + Redis.connect(url: job['redis_url']).incr('retry_integration_job_count') + raise Kaboom end end @@ -41,5 +54,22 @@ def start_worker(run_as_single_process) it_behaves_like 'a running worker' it_behaves_like 'a running worker', '1' + + it 'will retry and eventually fail a repeatedly failing job' do + queue = client.queues["main"] + jid = queue.put(RetryIntegrationJob, {}, retries: 10) + Qless::Worker.new( + client, + Qless::JobReservers::RoundRobin.new([queue]), + run_as_a_single_process: true + ).work(0) + + job = client.jobs[jid] + + job.state.should eq('failed') + job.retries_left.should eq(-1) + job.original_retries.should eq(10) + client.redis.get('retry_integration_job_count').should eq('11') + end end diff --git a/spec/unit/retry_exceptions_spec.rb b/spec/unit/retry_exceptions_spec.rb new file mode 100644 index 00000000..de1ffb2d --- /dev/null +++ b/spec/unit/retry_exceptions_spec.rb @@ -0,0 +1,22 @@ +require 'qless/retry_exceptions' + +module Qless + describe RetryExceptions do + let(:job_class) { Class.new } + let(:exception_class) { Class.new(StandardError) } + + before do + job_class.extend(RetryExceptions) + end + + it 'defines a retryable_exceptions method that returns an empty array by default' do + job_class.retryable_exception_classes.should be_empty + end + + it 'defines a retry_on method that makes exception types retryable' do + job_class.retry_on(exception_class) + + job_class.retryable_exception_classes.should eq([exception_class]) + end + end +end diff --git a/spec/unit/worker_spec.rb b/spec/unit/worker_spec.rb index d1bf75cb..0e4d772b 100644 --- a/spec/unit/worker_spec.rb +++ b/spec/unit/worker_spec.rb @@ -62,6 +62,30 @@ class MyJobClass; end worker.perform(job) end + it 'fails the job if performing it raises a non-retryable error' do + MyJobClass.stub(:retryable_exception_classes).and_return([]) + MyJobClass.stub(:perform) { raise Exception.new("boom") } + expected_line_number = __LINE__ - 1 + job.should respond_to(:fail).with(2).arguments + + job.should_receive(:fail) do |group, message| + group.should eq("Qless::MyJobClass:Exception") + message.should include("boom") + message.should include("#{__FILE__}:#{expected_line_number}") + end + + worker.perform(job) + end + + it 'retries the job if performing it raises a retryable error' do + MyJobClass.stub(:retryable_exception_classes).and_return([ArgumentError]) + MyJobClass.stub(:perform) { raise ArgumentError.new("boom") } + + job.should_receive(:retry).with(no_args) + + worker.perform(job) + end + it 'completes the job if it finishes with no errors' do MyJobClass.stub(:perform) job.should respond_to(:complete).with(0).arguments