Skip to content

Commit

Permalink
Support unique jobs: don't queue jobs that are already in the queue.
Browse files Browse the repository at this point in the history
  • Loading branch information
Otto Hilska committed Dec 3, 2010
1 parent 6d93f13 commit 4c5991e
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 6 deletions.
10 changes: 10 additions & 0 deletions README.markdown
Expand Up @@ -171,6 +171,16 @@ If your extension doesn't support scheduled job, you would need to extend the cu
end
end

### Support for unique jobs

Sometimes it doesn't make sense to queue a job if it's already there (with identical arguments). It would just lead to queue pollution. Unique jobs to the rescue:

create_fake_leaderboards:
cron: "30 6 * * 1"
args:
unique_job: true
description: "This job will be queued only if the previous job is not in the queue anymore."


Resque-web additions
--------------------
Expand Down
27 changes: 21 additions & 6 deletions lib/resque/scheduler.rb
Expand Up @@ -130,13 +130,28 @@ def enqueue_from_config(config)
klass_name = config['class'] || config[:class]
params = args.is_a?(Hash) ? [args] : Array(args)
queue = config['queue'] || config[:queue] || Resque.queue_from_class(constantize(klass_name))
# Support custom job classes like job with status
if (job_klass = config['custom_job_class']) && (job_klass != 'Resque::Job')
# custom job classes not supporting the same API calls must implement the #schedule method
constantize(job_klass).scheduled(queue, klass_name, *params)
unique_job = config['unique_job'] || config[:unique_job]

if (unique_job.to_s == 'true' || unique_job.to_s == 'yes') && job_exists_in_queue?(queue, klass_name, params)
log "Not adding unique job to queue #{queue} with #{params.inspect}"
else
Resque::Job.create(queue, klass_name, *params)
end
# Support custom job classes like job with status
if (job_klass = config['custom_job_class']) && (job_klass != 'Resque::Job')
# custom job classes not supporting the same API calls must implement the #schedule method
constantize(job_klass).scheduled(queue, klass_name, *params)
else
Resque::Job.create(queue, klass_name, *params)
end
end
end

# Test if a job exists for the given queue with the same arugments.
def job_exists_in_queue?(queue, klass_name, args)
queue_redis_name = "queue:#{queue}"
Resque.redis.lrange(queue_redis_name, 0, -1).any? do |queue_item|
parsed = Resque.decode(queue_item)
parsed["args"] == args
end
end

def rufus_scheduler
Expand Down
20 changes: 20 additions & 0 deletions test/scheduler_test.rb
Expand Up @@ -30,6 +30,26 @@ def test_enqueue_from_config_with_custom_class_job_in_the_resque_queue
Resque::Scheduler.enqueue_from_config('cron' => "* * * * *", 'class' => 'SomeIvarJob', 'custom_job_class' => 'Resque::SchedulerTest::FakeJob', 'args' => "/tmp")
end

def test_enqueue_from_config_doesnt_requeue_unique_jobs
# First queue "test" is empty.
assert_equal(0, Resque.redis.lrange("queue:test", 0, -1).size)

# Then, we add an unique job with argument /tmp/
Resque::Scheduler.enqueue_from_config('class' => 'SomeIvarJob', 'queue' => 'test',
'args' => '/tmp/', 'unique_job' => 'true')
assert_equal(1, Resque.redis.lrange("queue:test", 0, -1).size)

# Then, we try to add it again - but it won't be added.
Resque::Scheduler.enqueue_from_config('class' => 'SomeIvarJob', 'queue' => 'test',
'args' => '/tmp/', 'unique_job' => 'true')
assert_equal(1, Resque.redis.lrange("queue:test", 0, -1).size)

# Finally, we add a job with different arguments, and it will be added.
Resque::Scheduler.enqueue_from_config('class' => 'SomeIvarJob', 'queue' => 'test',
'args' => '/home/', 'unique_job' => 'true')
assert_equal(2, Resque.redis.lrange("queue:test", 0, -1).size)
end

def test_enqueue_from_config_puts_stuff_in_the_resque_queue_when_env_match
# The job should be loaded : its rails_env config matches the RAILS_ENV variable:
ENV['RAILS_ENV'] = 'production'
Expand Down

0 comments on commit 4c5991e

Please sign in to comment.