Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

refactor job object

- Move some class methods to instance methods
- Don't use exception to signal lock failure
- Add more explicit test cases for locking with multiple workers
  • Loading branch information...
commit 266fc15c12953a94a6d052281ce10059ddd5ebc1 1 parent ad27c3e
@dmag dmag authored
Showing with 115 additions and 64 deletions.
  1. +64 −43 lib/delayed/job.rb
  2. +3 −7 spec/delayed_method_spec.rb
  3. +48 −14 spec/job_spec.rb
View
107 lib/delayed/job.rb
@@ -3,6 +3,8 @@ module Delayed
class DeserializationError < StandardError
end
+ # A job object that is persisted to the database.
+ # Contains the work object as a YAML field.
class Job < ActiveRecord::Base
MAX_ATTEMPTS = 25
MAX_RUN_TIME = 4.hours
@@ -29,9 +31,7 @@ class Job < ActiveRecord::Base
self.min_priority = nil
self.max_priority = nil
- class LockError < StandardError
- end
-
+ # When a worker is exiting, make sure we don't have any locked jobs.
def self.clear_locks!
update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name])
end
@@ -60,6 +60,8 @@ def payload_object=(object)
self['handler'] = object.to_yaml
end
+ # Reschedule the job in the future (when a job fails).
+ # Uses an exponential scale depending on the number of failed attempts.
def reschedule(message, backtrace = [], time = nil)
if self.attempts < MAX_ATTEMPTS
time ||= Job.db_time_now + (attempts ** 4) + 5
@@ -75,6 +77,32 @@ def reschedule(message, backtrace = [], time = nil)
end
end
+
+ # Try to run one job. Returns true/false (work done/work failed) or nil if job can't be locked.
+ def run_with_lock(max_run_time, worker_name)
+ logger.info "* [JOB] aquiring lock on #{name}"
+ unless lock_exclusively!(max_run_time, worker_name)
+ # We did not get the lock, some other worker process must have
+ logger.warn "* [JOB] failed to aquire exclusive lock for #{name}"
+ return nil # no work done
+ end
+
+ begin
+ runtime = Benchmark.realtime do
+ invoke_job # TODO: raise error if takes longer than max_run_time
+ destroy
+ end
+ # TODO: warn if runtime > max_run_time ?
+ logger.info "* [JOB] #{name} completed after %.4f" % runtime
+ return true # did work
+ rescue Exception => e
+ reschedule e.message, e.backtrace
+ log_exception(e)
+ return false # work failed
+ end
+ end
+
+ # Add a job to the queue
def self.enqueue(*args, &block)
object = block_given? ? EvaledJob.new(&block) : args.shift
@@ -88,6 +116,8 @@ def self.enqueue(*args, &block)
Job.create(:payload_object => object, :priority => priority.to_i, :run_at => run_at)
end
+ # Find a few candidate jobs to run (in case some immediately get locked by others).
+ # Return in random order prevent everyone trying to do same head job at once.
def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME)
time_now = db_time_now
@@ -115,38 +145,22 @@ def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME)
records.sort_by { rand() }
end
- # Get the payload of the next job we can get an exclusive lock on.
+ # Run the next job we can get an exclusive lock on.
# If no jobs are left we return nil
- def self.reserve(max_run_time = MAX_RUN_TIME, &block)
+ def self.reserve_and_run_one_job(max_run_time = MAX_RUN_TIME)
- # We get up to 5 jobs from the db. In face we cannot get exclusive access to a job we try the next.
+ # We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next.
# this leads to a more even distribution of jobs across the worker processes
find_available(5, max_run_time).each do |job|
- begin
- logger.info "* [JOB] aquiring lock on #{job.name}"
- job.lock_exclusively!(max_run_time, worker_name)
- runtime = Benchmark.realtime do
- invoke_job(job.payload_object, &block)
- job.destroy
- end
- logger.info "* [JOB] #{job.name} completed after %.4f" % runtime
-
- return job
- rescue LockError
- # We did not get the lock, some other worker process must have
- logger.warn "* [JOB] failed to aquire exclusive lock for #{job.name}"
- rescue StandardError => e
- job.reschedule e.message, e.backtrace
- log_exception(job, e)
- return job
- end
+ t = job.run_with_lock(max_run_time, worker_name)
+ return t unless t == nil # return if we did work (good or bad)
end
- nil
+ nil # we didn't do any work, all 5 were not lockable
end
- # This method is used internally by reserve method to ensure exclusive access
- # to the given job. It will rise a LockError if it cannot get this lock.
+ # Lock this job for this worker.
+ # Returns true if we have the lock, false otherwise.
def lock_exclusively!(max_run_time, worker = worker_name)
now = self.class.db_time_now
affected_rows = if locked_by != worker
@@ -157,46 +171,50 @@ def lock_exclusively!(max_run_time, worker = worker_name)
# Simply resume and update the locked_at
self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker])
end
- raise LockError.new("Attempted to aquire exclusive lock failed") unless affected_rows == 1
-
- self.locked_at = now
- self.locked_by = worker
+ if affected_rows == 1
+ self.locked_at = now
+ self.locked_by = worker
+ return true
+ else
+ return false
+ end
end
+ # Unlock this job (note: not saved to DB)
def unlock
self.locked_at = nil
self.locked_by = nil
end
# This is a good hook if you need to report job processing errors in additional or different ways
- def self.log_exception(job, error)
- logger.error "* [JOB] #{job.name} failed with #{error.class.name}: #{error.message} - #{job.attempts} failed attempts"
+ def log_exception(error)
+ logger.error "* [JOB] #{name} failed with #{error.class.name}: #{error.message} - #{attempts} failed attempts"
logger.error(error)
end
+ # Do num jobs and return stats on success/failure.
+ # Exit early if interrupted.
def self.work_off(num = 100)
success, failure = 0, 0
num.times do
- job = self.reserve do |j|
- begin
- j.perform
+ case self.reserve_and_run_one_job
+ when true
success += 1
- rescue
+ when false
failure += 1
- raise
- end
+ else
+ break # leave if no work could be done
end
-
- break if job.nil?
+ break if $exit # leave if we're exiting
end
return [success, failure]
end
# Moved into its own method so that new_relic can trace it.
- def self.invoke_job(job, &block)
- block.call(job)
+ def invoke_job
+ payload_object.perform
end
private
@@ -227,6 +245,9 @@ def attempt_to_load(klass)
klass.constantize
end
+ # Get the current time (GMT or local depending on DB)
+ # Note: This does not ping the DB to get the time, so all your clients
+ # must have syncronized clocks.
def self.db_time_now
(ActiveRecord::Base.default_timezone == :utc) ? Time.now.utc : Time.now
end
View
10 spec/delayed_method_spec.rb
@@ -78,13 +78,9 @@ def read(story)
Delayed::Job.count.should == 1
- output = nil
+ Delayed::Job.reserve_and_run_one_job
- Delayed::Job.reserve do |e|
- output = e.perform
- end
-
- output.should == true
+ Delayed::Job.count.should == 0
end
@@ -129,4 +125,4 @@ def read(story)
job.payload_object.perform.should == 'Once upon...'
end
-end
+end
View
62 spec/job_spec.rb
@@ -184,11 +184,11 @@ def perform; @@runs += 1; end
end
it "should not allow a second worker to get exclusive access" do
- lambda { @job.lock_exclusively! 4.hours, 'worker2' }.should raise_error(Delayed::Job::LockError)
+ @job.lock_exclusively!(4.hours, 'worker2').should == false
end
it "should allow a second worker to get exclusive access if the timeout has passed" do
- lambda { @job.lock_exclusively! 1.minute, 'worker2' }.should_not raise_error(Delayed::Job::LockError)
+ @job.lock_exclusively!(1.minute, 'worker2').should == true
end
it "should be able to get access to the task if it was started more then max_age ago" do
@@ -283,7 +283,7 @@ def perform; @@runs += 1; end
it "should leave the queue in a consistent state and not run the job if locking fails" do
SimpleJob.runs.should == 0
- @job.stub!(:lock_exclusively!).with(any_args).once.and_raise(Delayed::Job::LockError)
+ @job.stub!(:lock_exclusively!).with(any_args).once.and_return(false)
Delayed::Job.should_receive(:find_available).once.and_return([@job])
Delayed::Job.work_off(1)
SimpleJob.runs.should == 0
@@ -291,21 +291,55 @@ def perform; @@runs += 1; end
end
- context "while running alongside other workers with enqueued jobs, it" do
+ context "while running alongside other workers that locked jobs, it" do
before(:each) do
Delayed::Job.worker_name = 'worker1'
- Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => (Delayed::Job.db_time_now - 3.minutes))
- Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker2', :locked_at => (Delayed::Job.db_time_now - 11.minutes))
- Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => (Delayed::Job.db_time_now - 2.minutes))
+ Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => (Delayed::Job.db_time_now - 1.minutes))
+ Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker2', :locked_at => (Delayed::Job.db_time_now - 1.minutes))
+ Delayed::Job.create(:payload_object => SimpleJob.new)
+ Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => (Delayed::Job.db_time_now - 1.minutes))
end
-
- it "should only find jobs if the lock has expired reguardless of the worker" do
- SimpleJob.runs.should == 0
- Delayed::Job.work_off(5)
- SimpleJob.runs.should == 2
- Delayed::Job.find_available(5, 10.minutes).length.should == 1
+
+ it "should ingore locked jobs from other workers" do
+ Delayed::Job.worker_name = 'worker3'
+ SimpleJob.runs.should == 0
+ Delayed::Job.work_off
+ SimpleJob.runs.should == 1 # runs the one open job
end
-
+
+ it "should find our own jobs regardless of locks" do
+ Delayed::Job.worker_name = 'worker1'
+ SimpleJob.runs.should == 0
+ Delayed::Job.work_off
+ SimpleJob.runs.should == 3 # runs open job plus worker1 jobs that were already locked
+ end
+ end
+
+ context "while running with locked and expired jobs, it" do
+ before(:each) do
+ Delayed::Job.worker_name = 'worker1'
+ exp_time = Delayed::Job.db_time_now - (1.minutes + Delayed::Job::MAX_RUN_TIME)
+ Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => exp_time)
+ Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker2', :locked_at => (Delayed::Job.db_time_now - 1.minutes))
+ Delayed::Job.create(:payload_object => SimpleJob.new)
+ Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => (Delayed::Job.db_time_now - 1.minutes))
+ end
+
+ it "should only find unlocked and expired jobs" do
+ Delayed::Job.worker_name = 'worker3'
+ SimpleJob.runs.should == 0
+ Delayed::Job.work_off
+ SimpleJob.runs.should == 2 # runs the one open job and one expired job
+ end
+
+ it "should ignore locks when finding our own jobs" do
+ Delayed::Job.worker_name = 'worker1'
+ SimpleJob.runs.should == 0
+ Delayed::Job.work_off
+ SimpleJob.runs.should == 3 # runs open job plus worker1 jobs
+ # This is useful in the case of a crash/restart on worker1, but make sure multiple workers on the same host have unique names!
+ end
+
end
end
Please sign in to comment.
Something went wrong with that request. Please try again.