Skip to content

Commit 7059ab3

Browse files
committed
Add job priorities to ActiveJob
1 parent 61f9e47 commit 7059ab3

File tree

11 files changed

+132
-6
lines changed

11 files changed

+132
-6
lines changed

activejob/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
* Add job priorities to Active Job.
2+
3+
*wvengen*
4+
15
* Implement a simple `AsyncJob` processor and associated `AsyncAdapter` that
26
queue jobs to a `concurrent-ruby` thread pool.
37

activejob/lib/active_job/base.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
require 'active_job/core'
22
require 'active_job/queue_adapter'
33
require 'active_job/queue_name'
4+
require 'active_job/queue_priority'
45
require 'active_job/enqueuing'
56
require 'active_job/execution'
67
require 'active_job/callbacks'
@@ -57,6 +58,7 @@ class Base
5758
include Core
5859
include QueueAdapter
5960
include QueueName
61+
include QueuePriority
6062
include Enqueuing
6163
include Execution
6264
include Callbacks

activejob/lib/active_job/core.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ module Core
1818
# Queue in which the job will reside.
1919
attr_writer :queue_name
2020

21+
# Priority that the job will have (lower is more priority).
22+
attr_writer :priority
23+
2124
# ID optionally provided by adapter
2225
attr_accessor :provider_job_id
2326

@@ -43,6 +46,7 @@ def deserialize(job_data)
4346
# * <tt>:wait</tt> - Enqueues the job with the specified delay
4447
# * <tt>:wait_until</tt> - Enqueues the job at the time specified
4548
# * <tt>:queue</tt> - Enqueues the job on the specified queue
49+
# * <tt>:priority</tt> - Enqueues the job with the specified priority
4650
#
4751
# ==== Examples
4852
#
@@ -51,6 +55,7 @@ def deserialize(job_data)
5155
# VideoJob.set(wait_until: Time.now.tomorrow).perform_later(Video.last)
5256
# VideoJob.set(queue: :some_queue, wait: 5.minutes).perform_later(Video.last)
5357
# VideoJob.set(queue: :some_queue, wait_until: Time.now.tomorrow).perform_later(Video.last)
58+
# VideoJob.set(queue: :some_queue, wait: 5.minutes, priority: 10).perform_later(Video.last)
5459
def set(options={})
5560
ConfiguredJob.new(self, options)
5661
end
@@ -62,6 +67,7 @@ def initialize(*arguments)
6267
@arguments = arguments
6368
@job_id = SecureRandom.uuid
6469
@queue_name = self.class.queue_name
70+
@priority = self.class.priority
6571
end
6672

6773
# Returns a hash with the job data that can safely be passed to the
@@ -71,6 +77,7 @@ def serialize
7177
'job_class' => self.class.name,
7278
'job_id' => job_id,
7379
'queue_name' => queue_name,
80+
'priority' => priority,
7481
'arguments' => serialize_arguments(arguments),
7582
'locale' => I18n.locale
7683
}
@@ -99,6 +106,7 @@ def serialize
99106
def deserialize(job_data)
100107
self.job_id = job_data['job_id']
101108
self.queue_name = job_data['queue_name']
109+
self.priority = job_data['priority']
102110
self.serialized_arguments = job_data['arguments']
103111
self.locale = job_data['locale'] || I18n.locale
104112
end

activejob/lib/active_job/enqueuing.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ def job_or_instantiate(*args)
3232
# * <tt>:wait</tt> - Enqueues the job with the specified delay
3333
# * <tt>:wait_until</tt> - Enqueues the job at the time specified
3434
# * <tt>:queue</tt> - Enqueues the job on the specified queue
35+
# * <tt>:priority</tt> - Enqueues the job with the specified priority
3536
#
3637
# ==== Examples
3738
#
@@ -54,17 +55,20 @@ def retry_job(options={})
5455
# * <tt>:wait</tt> - Enqueues the job with the specified delay
5556
# * <tt>:wait_until</tt> - Enqueues the job at the time specified
5657
# * <tt>:queue</tt> - Enqueues the job on the specified queue
58+
# * <tt>:priority</tt> - Enqueues the job with the specified priority
5759
#
5860
# ==== Examples
5961
#
6062
# my_job_instance.enqueue
6163
# my_job_instance.enqueue wait: 5.minutes
6264
# my_job_instance.enqueue queue: :important
6365
# my_job_instance.enqueue wait_until: Date.tomorrow.midnight
66+
# my_job_instance.enqueue priority: 10
6467
def enqueue(options={})
6568
self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait]
6669
self.scheduled_at = options[:wait_until].to_f if options[:wait_until]
6770
self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue]
71+
self.priority = options[:priority].to_i if options[:priority]
6872
run_callbacks :enqueue do
6973
if self.scheduled_at
7074
self.class.queue_adapter.enqueue_at self, self.scheduled_at

activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@ module QueueAdapters
1414
# Rails.application.config.active_job.queue_adapter = :delayed_job
1515
class DelayedJobAdapter
1616
def enqueue(job) #:nodoc:
17-
delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name)
17+
delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority)
1818
job.provider_job_id = delayed_job.id
1919
delayed_job
2020
end
2121

2222
def enqueue_at(job, timestamp) #:nodoc:
23-
delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, run_at: Time.at(timestamp))
23+
delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority, run_at: Time.at(timestamp))
2424
job.provider_job_id = delayed_job.id
2525
delayed_job
2626
end

activejob/lib/active_job/queue_adapters/que_adapter.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@ module QueueAdapters
1616
# Rails.application.config.active_job.queue_adapter = :que
1717
class QueAdapter
1818
def enqueue(job) #:nodoc:
19-
que_job = JobWrapper.enqueue job.serialize
19+
que_job = JobWrapper.enqueue job.serialize, priority: job.priority
2020
job.provider_job_id = que_job.attrs["job_id"]
2121
que_job
2222
end
2323

2424
def enqueue_at(job, timestamp) #:nodoc:
25-
que_job = JobWrapper.enqueue job.serialize, run_at: Time.at(timestamp)
25+
que_job = JobWrapper.enqueue job.serialize, priority: job.priority, run_at: Time.at(timestamp)
2626
job.provider_job_id = que_job.attrs["job_id"]
2727
que_job
2828
end
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
module ActiveJob
2+
module QueuePriority
3+
extend ActiveSupport::Concern
4+
5+
# Includes the ability to override the default queue priority.
6+
module ClassMethods
7+
mattr_accessor(:default_priority)
8+
9+
# Specifies the priority of the queue to create the job with.
10+
#
11+
# class PublishToFeedJob < ActiveJob::Base
12+
# queue_with_priority 50
13+
#
14+
# def perform(post)
15+
# post.to_feed!
16+
# end
17+
# end
18+
#
19+
# Specify either an argument or a block.
20+
def queue_with_priority(priority=nil, &block)
21+
if block_given?
22+
self.priority = block
23+
else
24+
self.priority = priority
25+
end
26+
end
27+
end
28+
29+
included do
30+
class_attribute :priority, instance_accessor: false
31+
32+
self.priority = default_priority
33+
end
34+
35+
# Returns the priority that the job will be created with
36+
def priority
37+
if @priority.is_a?(Proc)
38+
@priority = instance_exec(&@priority)
39+
end
40+
@priority
41+
end
42+
43+
end
44+
end
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
require 'helper'
2+
require 'jobs/hello_job'
3+
4+
class QueuePriorityTest < ActiveSupport::TestCase
5+
test 'priority unset by default' do
6+
assert_equal nil, HelloJob.priority
7+
end
8+
9+
test 'uses given priority' do
10+
original_priority = HelloJob.priority
11+
12+
begin
13+
HelloJob.queue_with_priority 90
14+
assert_equal 90, HelloJob.new.priority
15+
ensure
16+
HelloJob.priority = original_priority
17+
end
18+
end
19+
20+
test 'evals block given to priority to determine priority' do
21+
original_priority = HelloJob.priority
22+
23+
begin
24+
HelloJob.queue_with_priority { 25 }
25+
assert_equal 25, HelloJob.new.priority
26+
ensure
27+
HelloJob.priority = original_priority
28+
end
29+
end
30+
31+
test 'can use arguments to determine priority in priority block' do
32+
original_priority = HelloJob.priority
33+
34+
begin
35+
HelloJob.queue_with_priority { self.arguments.first=='1' ? 99 : 11 }
36+
assert_equal 99, HelloJob.new('1').priority
37+
assert_equal 11, HelloJob.new('3').priority
38+
ensure
39+
HelloJob.priority = original_priority
40+
end
41+
end
42+
43+
test 'uses priority passed to #set' do
44+
job = HelloJob.set(priority: 123).perform_later
45+
assert_equal 123, job.priority
46+
end
47+
end

activejob/test/integration/queuing_test.rb

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,4 +84,16 @@ class QueuingTest < ActiveSupport::TestCase
8484
I18n.locale = :en
8585
end
8686
end
87+
88+
test 'should run job with higher priority first' do
89+
skip unless adapter_is?(:delayed_job, :que)
90+
91+
wait_until = Time.now + 3.seconds
92+
TestJob.set(wait_until: wait_until, priority: 20).perform_later "#{@id}.1"
93+
TestJob.set(wait_until: wait_until, priority: 10).perform_later "#{@id}.2"
94+
wait_for_jobs_to_finish_for(10.seconds)
95+
assert job_executed "#{@id}.1"
96+
assert job_executed "#{@id}.2"
97+
assert job_executed_at("#{@id}.2") < job_executed_at("#{@id}.1")
98+
end
8799
end

activejob/test/support/integration/test_case_helpers.rb

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,12 @@ def wait_for_jobs_to_finish_for(seconds=60)
4242
end
4343
end
4444

45-
def job_executed
46-
Dummy::Application.root.join("tmp/#{@id}").exist?
45+
def job_executed(id=@id)
46+
Dummy::Application.root.join("tmp/#{id}").exist?
47+
end
48+
49+
def job_executed_at(id=@id)
50+
File.new(Dummy::Application.root.join("tmp/#{id}")).ctime
4751
end
4852

4953
def job_output

0 commit comments

Comments
 (0)