Skip to content

Commit

Permalink
Merge pull request #23 from mateusdelbianco/persist-loner-status
Browse files Browse the repository at this point in the history
Don't allow a job to be executed again immediately after execution
  • Loading branch information
jayniz committed Sep 5, 2012
2 parents a88f4db + 82b6d66 commit 6ffc295
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 2 deletions.
16 changes: 14 additions & 2 deletions lib/resque-loner/helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,19 @@ def self.mark_loner_as_queued(queue, item)
return unless item_is_a_unique_job?(item)
key = unique_job_queue_key(queue, item)
redis.set(key, 1)
unless(ttl=item_ttl(item)) == -1 # no need to incur overhead for default value
unless(ttl=item_ttl(item)) == -1 # no need to incur overhead for default value
redis.expire(key, ttl)
end
end

def self.mark_loner_as_unqueued(queue, job)
item = job.is_a?(Resque::Job) ? job.payload : job
return unless item_is_a_unique_job?(item)
redis.del(unique_job_queue_key(queue, item))
unless (ttl=loner_lock_after_execution_period(item)) == 0
redis.expire(unique_job_queue_key(queue, item), ttl)
else
redis.del(unique_job_queue_key(queue, item))
end
end

def self.unique_job_queue_key(queue, item)
Expand All @@ -46,6 +50,14 @@ def self.item_ttl(item)
end
end

def self.loner_lock_after_execution_period(item)
begin
constantize(item[:class] || item["class"]).loner_lock_after_execution_period
rescue
0
end
end

def self.job_destroy(queue, klass, *args)
klass = klass.to_s
redis_queue = "queue:#{queue}"
Expand Down
15 changes: 15 additions & 0 deletions lib/resque-loner/unique_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,21 @@ def loner_ttl
@loner_ttl || -1
end

#
# The default ttl of a persisting key is 0, i.e. immediately deleted.
# You can set loner_lock_after_execution_period if you want to block the execution
# of the job for a certain amount of time (in seconds). For example:
#
# class FooJob
# include Resque::Plugins::UniqueJob
# @loner_lock_after_execution_period = 40
# end
# end
#
def loner_lock_after_execution_period
@loner_lock_after_execution_period || 0
end

end # ClassMethods


Expand Down
26 changes: 26 additions & 0 deletions spec/loner_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,13 @@ class UniqueJobWithTtl
include Resque::Plugins::UniqueJob
@queue = :unique_with_ttl
@loner_ttl = 300
def self.perform(*args); end
end

class UniqueJobWithLockAfterExecution
include Resque::Plugins::UniqueJob
@queue = :unique_with_loner_lock_after_execution_period
@loner_lock_after_execution_period = 150
def self.perform(*args); end
end

Expand Down Expand Up @@ -158,5 +164,25 @@ def self.perform(*args); end
k.length.should == 1
Resque.redis.ttl(k[0]).should be_within(2).of(UniqueJobWithTtl.loner_ttl)
end

it "should not allow the same job to be enqueued after execution if loner_lock_after_execution_period is set" do
Resque.enqueue UniqueJobWithLockAfterExecution, "foo"
Resque.enqueue UniqueJobWithLockAfterExecution, "foo"
Resque.size(:unique_with_loner_lock_after_execution_period).should == 1

Resque.reserve(:unique_with_loner_lock_after_execution_period)
Resque.size(:unique_with_loner_lock_after_execution_period).should == 0

Resque.enqueue UniqueJobWithLockAfterExecution, "foo"
Resque.size(:unique_with_loner_lock_after_execution_period).should == 0
end

it 'should honor loner_lock_after_execution_period in the redis key' do
Resque.enqueue UniqueJobWithLockAfterExecution
Resque.reserve(:unique_with_loner_lock_after_execution_period)
k=Resque.redis.keys "loners:queue:unique_with_loner_lock_after_execution_period:job:*"
k.length.should == 1
Resque.redis.ttl(k[0]).should be_within(2).of(UniqueJobWithLockAfterExecution.loner_lock_after_execution_period)
end
end
end

0 comments on commit 6ffc295

Please sign in to comment.