From 4c5991e8ef77e470b2cbb00bf1d94606a8a02129 Mon Sep 17 00:00:00 2001 From: Otto Hilska Date: Fri, 3 Dec 2010 16:39:54 +0200 Subject: [PATCH] Support unique jobs: don't queue jobs that are already in the queue. --- README.markdown | 10 ++++++++++ lib/resque/scheduler.rb | 27 +++++++++++++++++++++------ test/scheduler_test.rb | 20 ++++++++++++++++++++ 3 files changed, 51 insertions(+), 6 deletions(-) diff --git a/README.markdown b/README.markdown index 19f3a4a3..2af573ab 100644 --- a/README.markdown +++ b/README.markdown @@ -171,6 +171,16 @@ If your extension doesn't support scheduled job, you would need to extend the cu end end +### Support for unique jobs + +Sometimes it doesn't make sense to queue a job if it's already there (with identical arguments). It would just lead to queue pollution. Unique jobs to the rescue: + + create_fake_leaderboards: + cron: "30 6 * * 1" + args: + unique_job: true + description: "This job will be queued only if the previous job is not in the queue anymore." + Resque-web additions -------------------- diff --git a/lib/resque/scheduler.rb b/lib/resque/scheduler.rb index fe6b5ffe..7dc8e442 100644 --- a/lib/resque/scheduler.rb +++ b/lib/resque/scheduler.rb @@ -130,13 +130,28 @@ def enqueue_from_config(config) klass_name = config['class'] || config[:class] params = args.is_a?(Hash) ? [args] : Array(args) queue = config['queue'] || config[:queue] || Resque.queue_from_class(constantize(klass_name)) - # Support custom job classes like job with status - if (job_klass = config['custom_job_class']) && (job_klass != 'Resque::Job') - # custom job classes not supporting the same API calls must implement the #schedule method - constantize(job_klass).scheduled(queue, klass_name, *params) + unique_job = config['unique_job'] || config[:unique_job] + + if (unique_job.to_s == 'true' || unique_job.to_s == 'yes') && job_exists_in_queue?(queue, klass_name, params) + log "Not adding unique job to queue #{queue} with #{params.inspect}" else - Resque::Job.create(queue, klass_name, *params) - end + # Support custom job classes like job with status + if (job_klass = config['custom_job_class']) && (job_klass != 'Resque::Job') + # custom job classes not supporting the same API calls must implement the #schedule method + constantize(job_klass).scheduled(queue, klass_name, *params) + else + Resque::Job.create(queue, klass_name, *params) + end + end + end + + # Test if a job exists for the given queue with the same arugments. + def job_exists_in_queue?(queue, klass_name, args) + queue_redis_name = "queue:#{queue}" + Resque.redis.lrange(queue_redis_name, 0, -1).any? do |queue_item| + parsed = Resque.decode(queue_item) + parsed["args"] == args + end end def rufus_scheduler diff --git a/test/scheduler_test.rb b/test/scheduler_test.rb index 1074093c..fe2e06e0 100644 --- a/test/scheduler_test.rb +++ b/test/scheduler_test.rb @@ -30,6 +30,26 @@ def test_enqueue_from_config_with_custom_class_job_in_the_resque_queue Resque::Scheduler.enqueue_from_config('cron' => "* * * * *", 'class' => 'SomeIvarJob', 'custom_job_class' => 'Resque::SchedulerTest::FakeJob', 'args' => "/tmp") end + def test_enqueue_from_config_doesnt_requeue_unique_jobs + # First queue "test" is empty. + assert_equal(0, Resque.redis.lrange("queue:test", 0, -1).size) + + # Then, we add an unique job with argument /tmp/ + Resque::Scheduler.enqueue_from_config('class' => 'SomeIvarJob', 'queue' => 'test', + 'args' => '/tmp/', 'unique_job' => 'true') + assert_equal(1, Resque.redis.lrange("queue:test", 0, -1).size) + + # Then, we try to add it again - but it won't be added. + Resque::Scheduler.enqueue_from_config('class' => 'SomeIvarJob', 'queue' => 'test', + 'args' => '/tmp/', 'unique_job' => 'true') + assert_equal(1, Resque.redis.lrange("queue:test", 0, -1).size) + + # Finally, we add a job with different arguments, and it will be added. + Resque::Scheduler.enqueue_from_config('class' => 'SomeIvarJob', 'queue' => 'test', + 'args' => '/home/', 'unique_job' => 'true') + assert_equal(2, Resque.redis.lrange("queue:test", 0, -1).size) + end + def test_enqueue_from_config_puts_stuff_in_the_resque_queue_when_env_match # The job should be loaded : its rails_env config matches the RAILS_ENV variable: ENV['RAILS_ENV'] = 'production'