Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge branch 'master' into jbarnette/master

Conflicts:

	tasks/jobs.rake
  • Loading branch information...
commit b9ebbb4ba0679bc0b003ab4e85dd2731325d93e2 2 parents 450908d + 249c5a9
@tobi tobi authored
View
8 README.textile
@@ -27,10 +27,10 @@ The library evolves around a delayed_jobs table which looks as follows:
table.integer :attempts, :default => 0
table.text :handler
table.string :last_error
- table.datetime :run_at
- table.datetime :locked_at
- table.string :locked_by
- table.timestamps
+ table.datetime :run_at
+ table.datetime :locked_at
+ table.string :locked_by
+ table.timestamps
end
h2. Usage
View
171 lib/delayed/job.rb
@@ -2,27 +2,28 @@
module Delayed
class DeserializationError < StandardError
- end
+ end
- class Job < ActiveRecord::Base
- MAX_ATTEMPTS = 25
+ class Job < ActiveRecord::Base
+ MAX_ATTEMPTS = 25
set_table_name :delayed_jobs
- cattr_accessor :worker_name
- self.worker_name = "pid:#{Process.pid}"
-
-
- NextTaskSQL = '`run_at` <= ? AND (`locked_at` IS NULL OR `locked_at` < ?) OR (`locked_by` = ?)'
+ cattr_accessor :worker_name, :min_priority, :max_priority
+ self.worker_name = "pid:#{Process.pid}"
+ self.min_priority = nil
+ self.max_priority = nil
+
+ NextTaskSQL = '(`locked_by` = ?) OR (`run_at` <= ? AND (`locked_at` IS NULL OR `locked_at` < ?))'
NextTaskOrder = 'priority DESC, run_at ASC'
ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/
class LockError < StandardError
- end
+ end
def self.clear_locks!
connection.execute "UPDATE #{table_name} SET `locked_by`=NULL, `locked_at`=NULL WHERE `locked_by`=#{quote_value(worker_name)}"
end
-
+
def payload_object
@payload_object ||= deserialize(self['handler'])
end
@@ -31,43 +32,59 @@ def name
text = handler.gsub(/\n/, ' ')
"#{id} (#{text.length > 40 ? "#{text[0..40]}..." : text})"
end
-
+
def payload_object=(object)
self['handler'] = object.to_yaml
end
-
- def reschedule(message, time = nil)
-
+
+ def reschedule(message, time = nil)
if self.attempts < MAX_ATTEMPTS
time ||= Job.db_time_now + (attempts ** 4) + 5
self.attempts += 1
self.run_at = time
self.last_error = message
- self.unlock
+ self.unlock
save!
else
logger.info "* [JOB] PERMANENTLY removing #{self.name} because of #{attempts} consequetive failures."
destroy
end
end
-
-
+
def self.enqueue(object, priority = 0)
unless object.respond_to?(:perform)
- raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
+ raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
end
-
- Job.create(:payload_object => object, :priority => priority)
- end
-
+
+ Job.create(:payload_object => object, :priority => priority.to_i)
+ end
+
def self.find_available(limit = 5)
- time_now = db_time_now
- ActiveRecord::Base.silence do
- find(:all, :conditions => [NextTaskSQL, time_now, time_now, worker_name], :order => NextTaskOrder, :limit => limit)
+
+ time_now = db_time_now
+
+ sql = NextTaskSQL.dup
+ conditions = [time_now, time_now, worker_name]
+
+ if self.min_priority
+ sql << ' AND (`priority` >= ?)'
+ conditions << min_priority
end
- end
-
+
+ if self.max_priority
+ sql << ' AND (`priority` <= ?)'
+ conditions << max_priority
+ end
+
+ conditions.unshift(sql)
+
+ ActiveRecord::Base.silence do
+ find(:all, :conditions => conditions, :order => NextTaskOrder, :limit => limit)
+ end
+ end
+
+
# Get the payload of the next job we can get an exclusive lock on.
# If no jobs are left we return nil
def self.reserve(max_run_time = 4.hours)
@@ -92,20 +109,20 @@ def self.reserve(max_run_time = 4.hours)
job.reschedule e.message
logger.error "* [JOB] #{job.name} failed with #{e.class.name}: #{e.message} - #{job.attempts} failed attempts"
logger.error(e)
- return job
+ return job
end
end
nil
- end
+ end
# This method is used internally by reserve method to ensure exclusive access
# to the given job. It will rise a LockError if it cannot get this lock.
def lock_exclusively!(max_run_time, worker = worker_name)
- now = self.class.db_time_now
+ now = self.class.db_time_now
- affected_rows = if locked_by != worker
-
+ affected_rows = if locked_by != worker
+
# We don't own this job so we will update the locked_by name and the locked_at
connection.update(<<-end_sql, "#{self.class.name} Update to aquire exclusive lock")
UPDATE #{self.class.table_name}
@@ -113,9 +130,9 @@ def lock_exclusively!(max_run_time, worker = worker_name)
WHERE #{self.class.primary_key} = #{quote_value(id)} AND (`locked_at` IS NULL OR `locked_at` < #{quote_value(now - max_run_time.to_i)})
end_sql
- else
-
- # We alrady own this job, this may happen if the job queue crashes.
+ else
+
+ # We already own this job, this may happen if the job queue crashes.
# Simply resume and update the locked_at
connection.update(<<-end_sql, "#{self.class.name} Update exclusive lock")
UPDATE #{self.class.table_name}
@@ -124,95 +141,95 @@ def lock_exclusively!(max_run_time, worker = worker_name)
end_sql
end
-
- unless affected_rows == 1
+
+ unless affected_rows == 1
raise LockError, "Attempted to aquire exclusive lock failed"
- end
-
+ end
+
self.locked_at = now
- self.locked_by = worker
- end
-
+ self.locked_by = worker
+ end
+
def unlock
self.locked_at = nil
self.locked_by = nil
end
-
+
def self.work_off(num = 100)
success, failure = 0, 0
-
+
num.times do
-
+
job = self.reserve do |j|
begin
- j.perform
+ j.perform
success += 1
- rescue
+ rescue
failure += 1
raise
end
end
-
+
break if job.nil?
- end
-
+ end
+
return [success, failure]
end
-
+
private
-
- def deserialize(source)
+
+ def deserialize(source)
attempt_to_load_file = true
-
- begin
- handler = YAML.load(source) rescue nil
- return handler if handler.respond_to?(:perform)
-
+
+ begin
+ handler = YAML.load(source) rescue nil
+ return handler if handler.respond_to?(:perform)
+
if handler.nil?
if source =~ ParseObjectFromYaml
-
+
# Constantize the object so that ActiveSupport can attempt
# its auto loading magic. Will raise LoadError if not successful.
attempt_to_load($1)
-
+
# If successful, retry the yaml.load
handler = YAML.load(source)
- return handler if handler.respond_to?(:perform)
+ return handler if handler.respond_to?(:perform)
end
end
-
+
if handler.is_a?(YAML::Object)
-
+
# Constantize the object so that ActiveSupport can attempt
# its auto loading magic. Will raise LoadError if not successful.
attempt_to_load(handler.class)
-
+
# If successful, retry the yaml.load
handler = YAML.load(source)
return handler if handler.respond_to?(:perform)
end
-
- raise DeserializationError, 'Job failed to load: Unknown handler. Try to manually require the appropiate file.'
-
+
+ raise DeserializationError, 'Job failed to load: Unknown handler. Try to manually require the appropiate file.'
+
rescue TypeError, LoadError, NameError => e
-
- raise DeserializationError, "Job failed to load: #{e.message}. Try to manually require the required file."
+
+ raise DeserializationError, "Job failed to load: #{e.message}. Try to manually require the required file."
end
end
-
+
def attempt_to_load(klass)
- klass.constantize
+ klass.constantize
end
def self.db_time_now
- (ActiveRecord::Base.default_timezone == :utc) ? Time.now.utc : Time.now
+ (ActiveRecord::Base.default_timezone == :utc) ? Time.now.utc : Time.now
end
-
- protected
-
+
+ protected
+
def before_save
self.run_at ||= self.class.db_time_now
- end
-
+ end
+
end
end
View
4 lib/delayed/message_sending.rb
@@ -1,7 +1,7 @@
module Delayed
module MessageSending
- def send_later(method, *args)
+ def send_later(method, *args)
Delayed::Job.enqueue Delayed::PerformableMethod.new(self, method.to_sym, args)
end
- end
+ end
end
View
33 lib/delayed/performable_method.rb
@@ -1,40 +1,47 @@
module Delayed
- class PerformableMethod < Struct.new(:object, :method, :args)
- AR_STRING_FORMAT = /^AR\:([A-Z]\w+)\:(\d+)$/
-
+ class PerformableMethod < Struct.new(:object, :method, :args)
+ CLASS_STRING_FORMAT = /^CLASS\:([A-Z]\w+)$/
+ AR_STRING_FORMAT = /^AR\:([A-Z]\w+)\:(\d+)$/
+
def initialize(object, method, args)
raise NoMethodError, "undefined method `#{method}' for #{self.inspect}" unless object.respond_to?(method)
-
+
self.object = dump(object)
self.args = args.map { |a| dump(a) }
self.method = method.to_sym
end
-
+
def perform
load(object).send(method, *args.map{|a| load(a)})
rescue ActiveRecord::RecordNotFound
# We cannot do anything about objects which were deleted in the meantime
- true
- end
-
+ true
+ end
+
private
def load(arg)
case arg
- when AR_STRING_FORMAT then $1.constantize.find($2)
+ when CLASS_STRING_FORMAT then $1.constantize
+ when AR_STRING_FORMAT then $1.constantize.find($2)
else arg
end
end
-
+
def dump(arg)
case arg
+ when Class then class_to_string(arg)
when ActiveRecord::Base then ar_to_string(arg)
else arg
- end
+ end
end
-
+
def ar_to_string(obj)
"AR:#{obj.class}:#{obj.id}"
- end
+ end
+
+ def class_to_string(obj)
+ "CLASS:#{obj.name}"
+ end
end
end
View
21 lib/delayed/worker.rb
@@ -4,13 +4,15 @@ class Worker
def initialize(options={})
@quiet = options[:quiet]
- end
+ Delayed::Job.min_priority = options[:min_priority] if options.has_key?(:min_priority)
+ Delayed::Job.max_priority = options[:max_priority] if options.has_key?(:max_priority)
+ end
def start
- puts "*** Starting job worker #{Delayed::Job.worker_name}" unless @quiet
+ say "*** Starting job worker #{Delayed::Job.worker_name}"
- trap('TERM') { puts 'Exiting...' unless @quiet; $exit = true }
- trap('INT') { puts 'Exiting...' unless @quiet; $exit = true }
+ trap('TERM') { say 'Exiting...'; $exit = true }
+ trap('INT') { say 'Exiting...'; $exit = true }
loop do
result = nil
@@ -25,15 +27,18 @@ def start
if count.zero?
sleep(SLEEP)
- puts 'Waiting for more jobs...' unless @quiet
else
- status = "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last]
- RAILS_DEFAULT_LOGGER.info status
- puts status unless @quiet
+ say "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last]
end
break if $exit
+ end
+
+ def say(text)
+ puts text unless @quiet
+ RAILS_DEFAULT_LOGGER.info text
end
+
end
end
end
View
54 spec/database.rb
@@ -1,35 +1,37 @@
$:.unshift(File.dirname(__FILE__) + '/../lib')
-
-require 'rubygems'
-require 'active_record'
-require File.dirname(__FILE__) + '/../init'
+$:.unshift(File.dirname(__FILE__) + '/../../rspec/lib')
+
+require 'rubygems'
+require 'active_record'
-ActiveRecord::Base.logger = Logger.new(nil)
+require File.dirname(__FILE__) + '/../init'
+require 'spec'
+
+ActiveRecord::Base.logger = Logger.new('/tmp/dj.log')
ActiveRecord::Base.establish_connection(:adapter => 'sqlite3', :database => '/tmp/jobs.sqlite')
ActiveRecord::Migration.verbose = false
-
-def reset_db
- ActiveRecord::Schema.define do
-
- create_table :delayed_jobs, :force => true do |table|
- table.integer :priority, :default => 0
- table.integer :attempts, :default => 0
- table.text :handler
- table.string :last_error
- table.datetime :run_at
- table.datetime :locked_at
- table.string :locked_by
- table.timestamps
- end
-
- create_table :stories, :force => true do |table|
- table.string :text
- end
+ActiveRecord::Schema.define do
+
+ create_table :delayed_jobs, :force => true do |table|
+ table.integer :priority, :default => 0
+ table.integer :attempts, :default => 0
+ table.text :handler
+ table.string :last_error
+ table.datetime :run_at
+ table.datetime :locked_at
+ table.string :locked_by
+ table.timestamps
end
-end
-
+
+ create_table :stories, :force => true do |table|
+ table.string :text
+ end
+
+end
+
+
# Purely useful for test cases...
-class Story < ActiveRecord::Base
+class Story < ActiveRecord::Base
def tell; text; end
end
View
122 spec/delayed_method_spec.rb
@@ -1,119 +1,117 @@
require File.dirname(__FILE__) + '/database'
-if not defined?(:ActiveRecord)
- module ActiveRecord
- class RecordNotFound < StandardError
- end
- end
-end
-
-
class SimpleJob
- cattr_accessor :runs; self.runs = 0
+ cattr_accessor :runs; self.runs = 0
def perform; @@runs += 1; end
end
class RandomRubyObject
def say_hello
'hello'
- end
-end
+ end
+end
class ErrorObject
-
+
def throw
- raise ActiveRecord::RecordNotFound, '...'
+ raise ActiveRecord::RecordNotFound, '...'
false
end
-
-end
+
+end
class StoryReader
-
+
def read(story)
- "Epilog: #{story.tell}"
+ "Epilog: #{story.tell}"
end
-
+
end
class StoryReader
-
+
def read(story)
- "Epilog: #{story.tell}"
+ "Epilog: #{story.tell}"
end
-
+
end
-
describe 'random ruby objects' do
-
- before { reset_db }
+ before { Delayed::Job.delete_all }
it "should respond_to :send_later method" do
-
- RandomRubyObject.new.respond_to?(:send_later)
-
- end
-
+
+ RandomRubyObject.new.respond_to?(:send_later)
+
+ end
+
it "should raise a ArgumentError if send_later is called but the target method doesn't exist" do
lambda { RandomRubyObject.new.send_later(:method_that_deos_not_exist) }.should raise_error(NoMethodError)
end
-
- it "should add a new entry to the job table when send_later is called on it" do
+
+ it "should add a new entry to the job table when send_later is called on it" do
Delayed::Job.count.should == 0
-
+
RandomRubyObject.new.send_later(:to_s)
Delayed::Job.count.should == 1
end
-
+
+ it "should add a new entry to the job table when send_later is called on the class" do
+ Delayed::Job.count.should == 0
+
+ RandomRubyObject.send_later(:to_s)
+
+ Delayed::Job.count.should == 1
+ end
+
it "should run get the original method executed when the job is performed" do
-
+
RandomRubyObject.new.send_later(:say_hello)
-
- Delayed::Job.count.should == 1
- end
+
+ Delayed::Job.count.should == 1
+ end
it "should ignore ActiveRecord::RecordNotFound errors because they are permanent" do
-
- ErrorObject.new.send_later(:throw)
- Delayed::Job.count.should == 1
-
+ ErrorObject.new.send_later(:throw)
+
+ Delayed::Job.count.should == 1
+
output = nil
-
+
Delayed::Job.reserve do |e|
output = e.perform
end
-
+
output.should == true
-
- end
-
- it "should store the object as string if its an active record" do
- story = Story.create :text => 'Once upon...'
- story.send_later(:tell)
-
+
+ end
+
+ it "should store the object as string if its an active record" do
+ story = Story.create :text => 'Once upon...'
+ story.send_later(:tell)
+
job = Delayed::Job.find(:first)
job.payload_object.class.should == Delayed::PerformableMethod
- job.payload_object.object.should == 'AR:Story:1'
+ job.payload_object.object.should == "AR:Story:#{story.id}"
job.payload_object.method.should == :tell
- job.payload_object.args.should == []
+ job.payload_object.args.should == []
job.payload_object.perform.should == 'Once upon...'
- end
-
+ end
+
it "should store arguments as string if they an active record" do
-
- story = Story.create :text => 'Once upon...'
-
- reader = StoryReader.new
+
+ story = Story.create :text => 'Once upon...'
+
+ reader = StoryReader.new
reader.send_later(:read, story)
-
+
job = Delayed::Job.find(:first)
job.payload_object.class.should == Delayed::PerformableMethod
job.payload_object.method.should == :read
- job.payload_object.args.should == ['AR:Story:1']
- job.payload_object.perform.should == 'Epilog: Once upon...'
+ job.payload_object.args.should == ["AR:Story:#{story.id}"]
+ job.payload_object.perform.should == 'Epilog: Once upon...'
end
-
+
end
View
175 spec/job_spec.rb
@@ -1,149 +1,176 @@
require File.dirname(__FILE__) + '/database'
class SimpleJob
- cattr_accessor :runs; self.runs = 0
+ cattr_accessor :runs; self.runs = 0
def perform; @@runs += 1; end
-end
+end
class ErrorJob
- cattr_accessor :runs; self.runs = 0
- def perform; raise 'did not work'; end
+ cattr_accessor :runs; self.runs = 0
+ def perform; raise 'did not work'; end
end
describe Delayed::Job do
+ before do
+ Delayed::Job.max_priority = nil
+ Delayed::Job.min_priority = nil
+
+ Delayed::Job.delete_all
+ end
- before :each do
- reset_db
- end
+ before(:each) do
+ SimpleJob.runs = 0
+ end
it "should set run_at automatically" do
Delayed::Job.create(:payload_object => ErrorJob.new ).run_at.should_not == nil
- end
+ end
it "should raise ArgumentError when handler doesn't respond_to :perform" do
lambda { Delayed::Job.enqueue(Object.new) }.should raise_error(ArgumentError)
end
-
+
it "should increase count after enqueuing items" do
- Delayed::Job.enqueue SimpleJob.new
+ Delayed::Job.enqueue SimpleJob.new
Delayed::Job.count.should == 1
end
-
- it "should call perform on jobs when running work_off" do
+
+ it "should call perform on jobs when running work_off" do
SimpleJob.runs.should == 0
-
- Delayed::Job.enqueue SimpleJob.new
+
+ Delayed::Job.enqueue SimpleJob.new
Delayed::Job.work_off
-
- SimpleJob.runs.should == 1
- end
+
+ SimpleJob.runs.should == 1
+ end
it "should re-schedule by about 1 second at first and increment this more and more minutes when it fails to execute properly" do
- Delayed::Job.enqueue ErrorJob.new
- Delayed::Job.work_off(1)
-
-
- job = Delayed::Job.find(:first)
+ Delayed::Job.enqueue ErrorJob.new
+ Delayed::Job.work_off(1)
+
+ job = Delayed::Job.find(:first)
- job.last_error.should == 'did not work'
- job.attempts.should == 1
-
- job.run_at.should > Delayed::Job.db_time_now - 10.minutes
- job.run_at.should < Delayed::Job.db_time_now + 10.minutes
- end
-
-
+ job.last_error.should == 'did not work'
+ job.attempts.should == 1
+
+ job.run_at.should > Delayed::Job.db_time_now - 10.minutes
+ job.run_at.should < Delayed::Job.db_time_now + 10.minutes
+ end
+
it "should raise an DeserializationError when the job class is totally unknown" do
job = Delayed::Job.new
job['handler'] = "--- !ruby/object:JobThatDoesNotExist {}"
- lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
+ lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
end
it "should try to load the class when it is unknown at the time of the deserialization" do
- job = Delayed::Job.new
+ job = Delayed::Job.new
job['handler'] = "--- !ruby/object:JobThatDoesNotExist {}"
job.should_receive(:attempt_to_load).with('JobThatDoesNotExist').and_return(true)
-
- lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
- end
-
+
+ lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
+ end
+
it "should try include the namespace when loading unknown objects" do
- job = Delayed::Job.new
+ job = Delayed::Job.new
job['handler'] = "--- !ruby/object:Delayed::JobThatDoesNotExist {}"
- job.should_receive(:attempt_to_load).with('Delayed::JobThatDoesNotExist').and_return(true)
- lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
- end
-
-
+ job.should_receive(:attempt_to_load).with('Delayed::JobThatDoesNotExist').and_return(true)
+ lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
+ end
+
it "should also try to load structs when they are unknown (raises TypeError)" do
- job = Delayed::Job.new
+ job = Delayed::Job.new
job['handler'] = "--- !ruby/struct:JobThatDoesNotExist {}"
job.should_receive(:attempt_to_load).with('JobThatDoesNotExist').and_return(true)
-
- lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
- end
-
+
+ lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
+ end
+
it "should try include the namespace when loading unknown structs" do
- job = Delayed::Job.new
+ job = Delayed::Job.new
job['handler'] = "--- !ruby/struct:Delayed::JobThatDoesNotExist {}"
- job.should_receive(:attempt_to_load).with('Delayed::JobThatDoesNotExist').and_return(true)
- lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
+
+ job.should_receive(:attempt_to_load).with('Delayed::JobThatDoesNotExist').and_return(true)
+ lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
end
it "should be removed if it failed more than MAX_ATTEMPTS times" do
@job = Delayed::Job.create :payload_object => SimpleJob.new, :attempts => 50
- @job.should_receive(:destroy)
- @job.reschedule 'FAIL'
+ @job.should_receive(:destroy)
+ @job.reschedule 'FAIL'
end
-
+
describe "when another worker is already performing an task, it" do
-
+
before :each do
Delayed::Job.worker_name = 'worker1'
@job = Delayed::Job.create :payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => Delayed::Job.db_time_now - 5.minutes
end
-
- it "should not allow a second worker to get exclusive access" do
- lambda { @job.lock_exclusively! 4.hours, 'worker2' }.should raise_error(Delayed::Job::LockError)
+
+ it "should not allow a second worker to get exclusive access" do
+ lambda { @job.lock_exclusively! 4.hours, 'worker2' }.should raise_error(Delayed::Job::LockError)
end
- it "should not allow a second worker to get exclusive access if the timeout has passed" do
+ it "should not allow a second worker to get exclusive access if the timeout has passed" do
@job.lock_exclusively! 1.minute, 'worker2'
end
- it "should be able to get access to the task if it was started more then max_age ago" do
+ it "should be able to get access to the task if it was started more then max_age ago" do
@job.locked_at = 5.hours.ago
@job.save
- @job.lock_exclusively! 4.hours, 'worker2'
+ @job.lock_exclusively! 4.hours, 'worker2'
@job.reload
@job.locked_by.should == 'worker2'
@job.locked_at.should > 1.minute.ago
end
+
it "should be able to get exclusive access again when the worker name is the same" do
- @job.lock_exclusively! 5.minutes, 'worker1'
@job.lock_exclusively! 5.minutes, 'worker1'
- @job.lock_exclusively! 5.minutes, 'worker1'
+ @job.lock_exclusively! 5.minutes, 'worker1'
+ @job.lock_exclusively! 5.minutes, 'worker1'
end
- end
+ end
+
+ context "worker prioritization" do
-end
-
-
-
-
-
-
-
-
-
-
+ before(:each) do
+ Delayed::Job.max_priority = nil
+ Delayed::Job.min_priority = nil
+ end
+
+ it "should only work_off jobs that are >= min_priority" do
+ Delayed::Job.min_priority = -5
+ Delayed::Job.max_priority = 5
+ SimpleJob.runs.should == 0
+
+ Delayed::Job.enqueue SimpleJob.new, -10
+ Delayed::Job.enqueue SimpleJob.new, 0
+ Delayed::Job.work_off
+
+ SimpleJob.runs.should == 1
+ end
+
+ it "should only work_off jobs that are <= max_priority" do
+ Delayed::Job.min_priority = -5
+ Delayed::Job.max_priority = 5
+ SimpleJob.runs.should == 0
+
+ Delayed::Job.enqueue SimpleJob.new, 10
+ Delayed::Job.enqueue SimpleJob.new, 0
+ Delayed::Job.work_off
+ SimpleJob.runs.should == 1
+ end
+
+ end
+
+end
View
23 spec/story_spec.rb
@@ -1,18 +1,17 @@
-require File.dirname(__FILE__) + '/database'
+require File.dirname(__FILE__) + '/database'
describe "A story" do
-
- before do
- reset_db
- Story.create :text => "Once upon a time..."
+
+ before(:all) do
+ @story = Story.create :text => "Once upon a time..."
end
-
+
it "should be shared" do
- Story.find(:first).tell.should == 'Once upon a time...'
- end
-
+ @story.tell.should == 'Once upon a time...'
+ end
+
it "should not return its result if it storytelling is delayed" do
- Story.find(:first).send_later(:tell).should_not == 'Once upon a time...'
- end
-
+ @story.send_later(:tell).should_not == 'Once upon a time...'
+ end
+
end
View
2  tasks/jobs.rake
@@ -6,6 +6,6 @@ namespace :jobs do
desc "Start a delayed_job worker."
task :work => :environment do
- Delayed::Worker.new.start
+ Delayed::Worker.new.start(:min_priority => ENV['MIN_PRIORITY'], :max_priority => ENV['MAX_PRIORITY'])
end
end
Please sign in to comment.
Something went wrong with that request. Please try again.