Skip to content

Commit

Permalink
Added min/max priority levels for workers. This allows you to have de…
Browse files Browse the repository at this point in the history
…dicated workers to high priority task

This is useful when you need to DJ a lot of tasks that need to be run quickly such as things that affect the UI like image transformations.
Dedicate some workers to high priority jobs only when you have a lot of long running low priority tasks that could otherwise clog all the workers.
  • Loading branch information
Tobias Lütke committed Nov 1, 2008
1 parent d76d34a commit 249c5a9
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 27 deletions.
34 changes: 27 additions & 7 deletions lib/delayed/job.rb
Expand Up @@ -8,10 +8,12 @@ class Job < ActiveRecord::Base
MAX_ATTEMPTS = 25
set_table_name :delayed_jobs

cattr_accessor :worker_name
self.worker_name = "pid:#{Process.pid}"
cattr_accessor :worker_name, :min_priority, :max_priority
self.worker_name = "pid:#{Process.pid}"
self.min_priority = nil
self.max_priority = nil

NextTaskSQL = '`run_at` <= ? AND (`locked_at` IS NULL OR `locked_at` < ?) OR (`locked_by` = ?)'
NextTaskSQL = '(`locked_by` = ?) OR (`run_at` <= ? AND (`locked_at` IS NULL OR `locked_at` < ?))'
NextTaskOrder = 'priority DESC, run_at ASC'
ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/

Expand Down Expand Up @@ -55,15 +57,33 @@ def self.enqueue(object, priority = 0)
raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
end

Job.create(:payload_object => object, :priority => priority)
Job.create(:payload_object => object, :priority => priority.to_i)
end

def self.find_available(limit = 5)
time_now = db_time_now

time_now = db_time_now

sql = NextTaskSQL.dup
conditions = [time_now, time_now, worker_name]

if self.min_priority
sql << ' AND (`priority` >= ?)'
conditions << min_priority
end

if self.max_priority
sql << ' AND (`priority` <= ?)'
conditions << max_priority
end

conditions.unshift(sql)

ActiveRecord::Base.silence do
find(:all, :conditions => [NextTaskSQL, time_now, time_now, worker_name], :order => NextTaskOrder, :limit => limit)
find(:all, :conditions => conditions, :order => NextTaskOrder, :limit => limit)
end
end
end


# Get the payload of the next job we can get an exclusive lock on.
# If no jobs are left we return nil
Expand Down
37 changes: 19 additions & 18 deletions spec/database.rb
Expand Up @@ -3,32 +3,33 @@

require 'rubygems'
require 'active_record'

require File.dirname(__FILE__) + '/../init'
require 'spec'

ActiveRecord::Base.logger = Logger.new(nil)
ActiveRecord::Base.logger = Logger.new('/tmp/dj.log')
ActiveRecord::Base.establish_connection(:adapter => 'sqlite3', :database => '/tmp/jobs.sqlite')
ActiveRecord::Migration.verbose = false

ActiveRecord::Schema.define do

create_table :delayed_jobs, :force => true do |table|
table.integer :priority, :default => 0
table.integer :attempts, :default => 0
table.text :handler
table.string :last_error
table.datetime :run_at
table.datetime :locked_at
table.string :locked_by
table.timestamps
end

ActiveRecord::Schema.define do

create_table :delayed_jobs, :force => true do |table|
table.integer :priority, :default => 0
table.integer :attempts, :default => 0
table.text :handler
table.string :last_error
table.datetime :run_at
table.datetime :locked_at
table.string :locked_by
table.timestamps
end
create_table :stories, :force => true do |table|
table.string :text
end

create_table :stories, :force => true do |table|
table.string :text
end
end

end

# Purely useful for test cases...
class Story < ActiveRecord::Base
Expand Down
46 changes: 45 additions & 1 deletion spec/job_spec.rb
Expand Up @@ -11,7 +11,16 @@ def perform; raise 'did not work'; end
end

describe Delayed::Job do
before { Delayed::Job.delete_all }
before do
Delayed::Job.max_priority = nil
Delayed::Job.min_priority = nil

Delayed::Job.delete_all
end

before(:each) do
SimpleJob.runs = 0
end

it "should set run_at automatically" do
Delayed::Job.create(:payload_object => ErrorJob.new ).run_at.should_not == nil
Expand Down Expand Up @@ -128,5 +137,40 @@ def perform; raise 'did not work'; end
@job.lock_exclusively! 5.minutes, 'worker1'
@job.lock_exclusively! 5.minutes, 'worker1'
end
end

context "worker prioritization" do

before(:each) do
Delayed::Job.max_priority = nil
Delayed::Job.min_priority = nil
end

it "should only work_off jobs that are >= min_priority" do
Delayed::Job.min_priority = -5
Delayed::Job.max_priority = 5
SimpleJob.runs.should == 0

Delayed::Job.enqueue SimpleJob.new, -10
Delayed::Job.enqueue SimpleJob.new, 0
Delayed::Job.work_off

SimpleJob.runs.should == 1
end

it "should only work_off jobs that are <= max_priority" do
Delayed::Job.min_priority = -5
Delayed::Job.max_priority = 5
SimpleJob.runs.should == 0

Delayed::Job.enqueue SimpleJob.new, 10
Delayed::Job.enqueue SimpleJob.new, 0

Delayed::Job.work_off

SimpleJob.runs.should == 1
end

end

end
5 changes: 4 additions & 1 deletion tasks/jobs.rake
@@ -1,6 +1,6 @@
namespace :jobs do


task :work => :environment do

puts "*** Starting job worker #{Delayed::Job.worker_name}"
Expand All @@ -9,6 +9,9 @@ namespace :jobs do

trap('TERM') { puts 'Exiting...'; $exit = true }
trap('INT') { puts 'Exiting...'; $exit = true }

Delayed::Job.min_priority = ENV['MIN_PRIORITY']
Delayed::Job.max_priority = ENV['MAX_PRIORITY']

loop do
result = nil
Expand Down

0 comments on commit 249c5a9

Please sign in to comment.