Permalink
Browse files

Add failure messages to job/worker retry logic

  • Loading branch information...
1 parent 4cfcbc6 commit 5d8b1b4bb1ea4288b6c20f625e522a477ab9d657 @benkirzhner benkirzhner committed Feb 2, 2013
Showing with 33 additions and 11 deletions.
  1. +13 −2 lib/qless/job.rb
  2. +1 −1 lib/qless/qless-core
  3. +11 −4 lib/qless/worker.rb
  4. +7 −3 spec/integration/worker_spec.rb
  5. +1 −1 spec/unit/worker_spec.rb
View
@@ -193,9 +193,20 @@ def untag(*tags)
@client._tag.call([], ['remove', @jid, Time.now.to_f] + tags)
end
- def retry(delay=0)
+ def retry(args = {})
+ if args.is_a?(Numeric)
+ delay = args
+ message = nil
+ else
+ delay = args.fetch(:delay) { 0 }
+ message = args[:message]
+ end
+
+ non_key_args = [@jid, @queue_name, @worker_name, Time.now.to_f, delay]
+ non_key_args << message if message
+
note_state_change do
- results = @client._retry.call([], [@jid, @queue_name, @worker_name, Time.now.to_f, delay])
+ results = @client._retry.call([], non_key_args)
results.nil? ? false : results
end
end
Submodule qless-core updated 1 files
+9 −1 retry.lua
View
@@ -107,8 +107,8 @@ def work(interval = 5.0)
def perform(job)
around_perform(job)
- rescue *retryable_exception_classes(job)
- job.retry
+ rescue *retryable_exception_classes(job) => error
+ retry_job(job, error)
rescue Exception => error
fail_job(job, error)
else
@@ -176,9 +176,16 @@ def around_perform(job)
def fail_job(job, error)
group = "#{job.klass_name}:#{error.class}"
- message = "#{error.message}\n\n#{error.backtrace.join("\n")}"
log "Got #{group} failure from #{job.inspect}"
- job.fail(group, message)
+ job.fail(group, failure_message_for(job, error))
+ end
+
+ def retry_job(job, error)
+ job.retry(message: "#{error.class}\n\n" + failure_message_for(job, error))
+ end
+
+ def failure_message_for(job, error)
+ "#{error.message}\n\n#{error.backtrace.join("\n")}"
end
def procline(value)
@@ -18,8 +18,8 @@ class RetryIntegrationJob
retry_on Kaboom
def self.perform(job)
- Redis.connect(url: job['redis_url']).incr('retry_integration_job_count')
- raise Kaboom
+ count = Redis.connect(url: job['redis_url']).incr('retry_integration_job_count')
+ raise Kaboom, "Failure number #{count}!"
end
end
@@ -58,7 +58,7 @@ def start_worker(run_as_single_process)
it_behaves_like 'a running worker', '1'
- it 'will retry and eventually fail a repeatedly failing job' do
+ it "will retry and eventually fail a repeatedly failing job, recording the last error in the job's message" do
queue = client.queues["main"]
jid = queue.put(RetryIntegrationJob, {"redis_url" => client.redis.client.id}, retries: 10)
Qless::Worker.new(
@@ -72,6 +72,10 @@ def start_worker(run_as_single_process)
job.retries_left.should eq(-1)
job.original_retries.should eq(10)
client.redis.get('retry_integration_job_count').should eq('11')
+
+ job.failure['message'].should match(/Kaboom/)
+ job.failure['message'].should match(/Failure number 11/)
+ job.failure['message'].should_not match(/Failure number 5/)
end
end
@@ -82,7 +82,7 @@ class MyJobClass; end
MyJobClass.stub(:retryable_exception_classes).and_return([ArgumentError])
MyJobClass.stub(:perform) { raise ArgumentError.new("boom") }
- job.should_receive(:retry).with(no_args)
+ job.should_receive(:retry)
worker.perform(job)
end

0 comments on commit 5d8b1b4

Please sign in to comment.