Skip to content

Commit

Permalink
Merge branch 'separate-retry-queue' of https://github.com/onyx/resque…
Browse files Browse the repository at this point in the history
…-retry into onyx-separate-retry-queue
  • Loading branch information
lantins committed Mar 9, 2012
2 parents ca84d24 + 0e206a6 commit b0b202b
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 12 deletions.
2 changes: 1 addition & 1 deletion Gemfile
@@ -1,2 +1,2 @@
source :rubygems
gemspec
gemspec
28 changes: 23 additions & 5 deletions lib/resque/plugins/retry.rb
Expand Up @@ -149,7 +149,7 @@ def retry_criteria_valid?(exception, *args)

# call user retry criteria check blocks.
retry_criteria_checks.each do |criteria_check|
should_retry ||= !!criteria_check.call(exception, *args)
should_retry ||= !!instance_exec(exception, *args, &criteria_check)
end

should_retry
Expand Down Expand Up @@ -206,13 +206,16 @@ def try_again(exception, *args)
# we'll just check here to see whether it takes the additional exception class argument or not
temp_retry_delay = ([-1, 1].include?(method(:retry_delay).arity) ? retry_delay(exception.class) : retry_delay)

retry_in_queue = @retry_job_class ? @retry_job_class : self
if temp_retry_delay <= 0
# If the delay is 0, no point passing it through the scheduler
Resque.enqueue(self, *args_for_retry(*args))
Resque.enqueue(retry_in_queue, *args_for_retry(*args))
else
Resque.enqueue_in(temp_retry_delay, self, *args_for_retry(*args))
Resque.enqueue_in(temp_retry_delay, retry_in_queue, *args_for_retry(*args))
end
sleep(sleep_after_requeue) if sleep_after_requeue > 0

clean_retry_key(*args) if @retry_job_class
end

# Resque before_perform hook.
Expand All @@ -228,7 +231,7 @@ def before_perform_retry(*args)
#
# Deletes retry attempt count from Redis.
def after_perform_retry(*args)
Resque.redis.del(redis_retry_key(*args))
clean_retry_key(*args)
end

# Resque on_failure hook.
Expand All @@ -239,8 +242,23 @@ def on_failure_retry(exception, *args)
if retry_criteria_valid?(exception, *args)
try_again(exception, *args)
else
Resque.redis.del(redis_retry_key(*args))
clean_retry_key(*args)
end
end

def instance_exec(*args, &block)
mname = "__instance_exec_#{Thread.current.object_id.abs}"
class << self; self end.class_eval{ define_method(mname, &block) }
begin
ret = send(mname, *args)
ensure
class << self; self end.class_eval{ undef_method(mname) } rescue nil
end
ret
end

def clean_retry_key(*args)
Resque.redis.del(redis_retry_key(*args))
end

end
Expand Down
3 changes: 2 additions & 1 deletion resque-retry.gemspec
Expand Up @@ -20,6 +20,7 @@ spec = Gem::Specification.new do |s|
s.add_development_dependency('yard')
s.add_development_dependency('json')
s.add_development_dependency('simplecov', '>= 0.3.0')
s.add_development_dependency('mocha')

s.description = <<-EOL
resque-retry provides retry, delay and exponential backoff support for
Expand All @@ -33,4 +34,4 @@ spec = Gem::Specification.new do |s|
* Multiple failure backend with retry suppression & resque-web tab.
* Small & Extendable - plenty of places to override retry logic/settings.
EOL
end
end
16 changes: 16 additions & 0 deletions test/multiple_failure_test.rb
Expand Up @@ -26,6 +26,22 @@ def test_last_failure_is_saved_in_redis_if_delay
assert Resque.redis.exists(key)
end

def test_retry_key_splatting_args
RetryDefaultsJob.expects(:redis_retry_key).with({"a" => 1, "b" => 2}).times(3)
Resque.enqueue(RetryDefaultsJob, {"a" => 1, "b" => 2})

perform_next_job(@worker)
end

def test_last_failure_removed_from_redis_after_error_limit
Resque.enqueue(LimitThreeJob)
3.times do
perform_next_job(@worker)
end

key = failure_key_for(LimitThreeJob)
assert Resque.redis.exists(key)
end

def test_last_failure_has_double_delay_redis_expiry_if_delay
Resque.enqueue(LimitThreeJobDelay1Hour)
Expand Down
12 changes: 12 additions & 0 deletions test/retry_inheriting_checks_test.rb
Expand Up @@ -30,4 +30,16 @@ def test_extending_with_resque_retry_then_defining_inherited_does_not_override_p
assert_equal 1, klass.retry_criteria_checks.size
assert_equal 'test', klass.test_value
end

def test_retry_criteria_check_should_be_evaluated_under_child_context
Resque.enqueue(InheritedJob, 'arg')

10.times do
perform_next_job(@worker)
end

assert_equal 0, BaseJob.retry_attempt, "BaseJob retry attempts"
assert_equal 0, InheritedJob.retry_attempt, "InheritedJob retry attempts"
assert_equal 5, InheritedRetryJob.retry_attempt, "InheritedRetryJob retry attempts"
end
end
25 changes: 25 additions & 0 deletions test/retry_test.rb
Expand Up @@ -142,6 +142,31 @@ def test_do_not_retry_if_failed_and_exception_does_not_allow_retry
assert_equal 0, Resque.info[:pending], 'pending jobs'
end

def test_retry_failed_jobs_in_separate_queue
Resque.enqueue(JobWithRetryQueue, 'arg1')

perform_next_job(@worker)

assert job_from_retry_queue = Resque.pop(:testing_retry)
assert_equal ['arg1'], job_from_retry_queue['args']
assert_equal nil, Resque.redis.get(JobWithRetryQueue.redis_retry_key('arg1'))
end

def test_clean_retry_key_should_splat_args
JobWithRetryQueue.expects(:clean_retry_key).once.with({"a" => 1, "b" => 2})

Resque.enqueue(JobWithRetryQueue, {"a" => 1, "b" => 2})

perform_next_job(@worker)
end

def test_retry_delayed_failed_jobs_in_separate_queue
Resque.enqueue(DelayedJobWithRetryQueue, 'arg1')
Resque.expects(:enqueue_in).with(1, JobRetryQueue, 'arg1')

perform_next_job(@worker)
end

def test_delete_redis_key_when_job_is_successful
Resque.enqueue(GoodJob, 'arg1')

Expand Down
7 changes: 2 additions & 5 deletions test/test_helper.rb
Expand Up @@ -7,6 +7,7 @@
require 'minitest/pride'
require 'rack/test'
require 'simplecov'
require 'mocha'

SimpleCov.start do
add_filter "/test/"
Expand All @@ -29,11 +30,7 @@
at_exit do
next if $!

if defined?(MiniTest)
exit_code = MiniTest::Unit.new.run(ARGV)
else
exit_code = Test::Unit::AutoRunner.run
end
exit_code = Test::Unit::AutoRunner.run

pid = `ps -e -o pid,command | grep [r]edis-test`.split(" ")[0]
puts "Killing test redis server..."
Expand Down
81 changes: 81 additions & 0 deletions test/test_jobs.rb
Expand Up @@ -44,6 +44,35 @@ def self.perform(*args)
end
end

class JobRetryQueue
extend Resque::Plugins::Retry
@queue = :testing_retry

def self.perform(*args)
end
end

class JobWithRetryQueue
extend Resque::Plugins::Retry
@queue = :testing
@retry_job_class = JobRetryQueue

def self.perform(*args)
raise
end
end

class DelayedJobWithRetryQueue
extend Resque::Plugins::Retry
@queue = :testing
@retry_delay = 1
@retry_job_class = JobRetryQueue

def self.perform(*args)
raise
end
end

class InheritTestJob < RetryDefaultsJob
end

Expand Down Expand Up @@ -147,6 +176,58 @@ def self.perform(*args)
end
end

class AsyncJob
extend Resque::Plugins::Retry

class << self
def perform(*opts)
process
end

def process
raise "Shouldn't be called"
end
end
end

class BaseJob < AsyncJob
@retry_limit = 0
@auto_retry_limit = 5
@retry_exceptions = []

retry_criteria_check do |exception, *args|
keep_trying?
end

class << self
def keep_trying?
retry_attempt < @auto_retry_limit
end

def inherited(subclass)
super
%w(@retry_exceptions @retry_delay @retry_limit @auto_retry_limit).each do |variable|
value = BaseJob.instance_variable_get(variable)
value = value.dup rescue value
subclass.instance_variable_set(variable, value)
end
end

def process
raise "Got called #{Time.now}"
end
end
end

class InheritedRetryJob < BaseJob
@queue = :testing
end

class InheritedJob < BaseJob
@queue = :testing
@retry_job_class = InheritedRetryJob
end

module RetryModuleCustomRetryCriteriaCheck
extend Resque::Plugins::Retry
@queue = :testing
Expand Down

0 comments on commit b0b202b

Please sign in to comment.