Permalink
Browse files

Fixed a nasty bug that allowed several job runners to get the exclusi…

…ve lock to the same job
  • Loading branch information...
tobi committed Jul 23, 2008
1 parent 5998931 commit a6e374e0374e316d3030e8b51e2d4e3ac57f04d1
Showing with 68 additions and 34 deletions.
  1. +38 −19 lib/delayed/job.rb
  2. +1 −1 lib/delayed/performable_method.rb
  3. +29 −14 spec/job_spec.rb
View
@@ -1,9 +1,11 @@
+
module Delayed
class DeserializationError < StandardError
end
- class Job < ActiveRecord::Base
+ class Job < ActiveRecord::Base
+ MAX_ATTEMPTS = 25
set_table_name :delayed_jobs
cattr_accessor :worker_name
@@ -23,20 +25,31 @@ def self.clear_locks!
def payload_object
@payload_object ||= deserialize(self['handler'])
+ end
+
+ def name
+ text = handler.gsub(/\n/, ' ')
+ "#{id} (#{text.length > 40 ? "#{text[0..40]}..." : text})"
end
def payload_object=(object)
self['handler'] = object.to_yaml
end
- def reshedule(message, time = nil)
- time ||= Job.db_time_now + (attempts ** 4).seconds + 5
-
- self.attempts += 1
- self.run_at = time
- self.last_error = message
- self.unlock
- save!
+ def reschedule(message, time = nil)
+
+ if self.attempts < MAX_ATTEMPTS
+ time ||= Job.db_time_now + (attempts ** 4) + 5
+
+ self.attempts += 1
+ self.run_at = time
+ self.last_error = message
+ self.unlock
+ save!
+ else
+ logger.info "* [JOB] PERMANENTLY removing #{self.name} because of #{attempts} consequetive failures."
+ destroy
+ end
end
@@ -62,16 +75,23 @@ def self.reserve(max_run_time = 4.hours)
# We get up to 5 jobs from the db. In face we cannot get exclusive access to a job we try the next.
# this leads to a more even distribution of jobs across the worker processes
find_available(5).each do |job|
- begin
+ begin
+ logger.info "* [JOB] aquiring lock on #{job.name}"
job.lock_exclusively!(max_run_time, worker_name)
- yield job.payload_object
- job.destroy
+ runtime = Benchmark.realtime do
+ yield job.payload_object
+ job.destroy
+ end
+ logger.info "* [JOB] #{job.name} completed after %.4f" % runtime
+
return job
rescue LockError
# We did not get the lock, some other worker process must have
- puts "failed to aquire exclusive lock for #{job.id}"
+ logger.warn "* [JOB] failed to aquire exclusive lock for #{job.name}"
rescue StandardError => e
- job.reshedule e.message
+ job.reschedule e.message
+ logger.error "* [JOB] #{job.name} failed with #{e.class.name}: #{e.message} - #{job.attempts} failed attempts"
+ logger.error(e)
return job
end
end
@@ -85,13 +105,12 @@ def lock_exclusively!(max_run_time, worker = worker_name)
now = self.class.db_time_now
affected_rows = if locked_by != worker
-
-
+
# We don't own this job so we will update the locked_by name and the locked_at
connection.update(<<-end_sql, "#{self.class.name} Update to aquire exclusive lock")
UPDATE #{self.class.table_name}
SET `locked_at`=#{quote_value(now)}, `locked_by`=#{quote_value(worker)}
- WHERE #{self.class.primary_key} = #{quote_value(id)} AND (`locked_at` IS NULL OR `locked_at` < #{quote_value(now + max_run_time)})
+ WHERE #{self.class.primary_key} = #{quote_value(id)} AND (`locked_at` IS NULL OR `locked_at` < #{quote_value(now - max_run_time.to_i)})
end_sql
else
@@ -106,7 +125,7 @@ def lock_exclusively!(max_run_time, worker = worker_name)
end
- unless affected_rows == 1
+ unless affected_rows == 1
raise LockError, "Attempted to aquire exclusive lock failed"
end
@@ -186,7 +205,7 @@ def attempt_to_load(klass)
end
def self.db_time_now
- (ActiveRecord::Base.default_timezone == :utc) ? Time.now.utc : Time.now
+ (ActiveRecord::Base.default_timezone == :utc) ? Time.now.utc : Time.now
end
protected
@@ -14,7 +14,7 @@ def perform
load(object).send(method, *args.map{|a| load(a)})
rescue ActiveRecord::RecordNotFound
# We cannot do anything about objects which were deleted in the meantime
- true
+ true
end
private
View
@@ -40,14 +40,18 @@ def perform; raise 'did not work'; end
it "should re-schedule by about 1 second at first and increment this more and more minutes when it fails to execute properly" do
Delayed::Job.enqueue ErrorJob.new
- runner = Delayed::Job.work_off(1)
-
- job = Delayed::Job.find(:first)
- job.last_error.should == 'did not work'
- job.attempts.should == 1
- job.run_at.should > Time.now
- job.run_at.should < Time.now + 6.minutes
- end
+ Delayed::Job.work_off(1)
+
+
+ job = Delayed::Job.find(:first)
+
+ job.last_error.should == 'did not work'
+ job.attempts.should == 1
+
+ job.run_at.should > Delayed::Job.db_time_now - 10.minutes
+ job.run_at.should < Delayed::Job.db_time_now + 10.minutes
+ end
+
it "should raise an DeserializationError when the job class is totally unknown" do
@@ -89,18 +93,29 @@ def perform; raise 'did not work'; end
job.should_receive(:attempt_to_load).with('Delayed::JobThatDoesNotExist').and_return(true)
lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
end
-
+
+ it "should be removed if it failed more than MAX_ATTEMPTS times" do
+ @job = Delayed::Job.create :payload_object => SimpleJob.new, :attempts => 50
+ @job.should_receive(:destroy)
+ @job.reschedule 'FAIL'
+ end
describe "when another worker is already performing an task, it" do
before :each do
Delayed::Job.worker_name = 'worker1'
- @job = Delayed::Job.create :payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => Time.now.utc
+ @job = Delayed::Job.create :payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => Delayed::Job.db_time_now - 5.minutes
end
it "should not allow a second worker to get exclusive access" do
lambda { @job.lock_exclusively! 4.hours, 'worker2' }.should raise_error(Delayed::Job::LockError)
end
+
+ it "should not allow a second worker to get exclusive access if the timeout has passed" do
+
+ @job.lock_exclusively! 1.minute, 'worker2'
+
+ end
it "should be able to get access to the task if it was started more then max_age ago" do
@job.locked_at = 5.hours.ago
@@ -113,12 +128,12 @@ def perform; raise 'did not work'; end
end
it "should be able to get exclusive access again when the worker name is the same" do
- @job.lock_exclusively! Time.now + 20, 'worker1'
- @job.lock_exclusively! Time.now + 21, 'worker1'
- @job.lock_exclusively! Time.now + 22, 'worker1'
+ @job.lock_exclusively! 5.minutes, 'worker1'
+ @job.lock_exclusively! 5.minutes, 'worker1'
+ @job.lock_exclusively! 5.minutes, 'worker1'
end
end
-
+
end

0 comments on commit a6e374e

Please sign in to comment.