From a137def820b981e3308c001e538d029107cca760 Mon Sep 17 00:00:00 2001 From: Chris Wanstrath Date: Fri, 31 Oct 2008 18:43:28 -0700 Subject: [PATCH] Changes extracted from GitHub: - Add failed_at to schema - Rather than destroying a job, set its failed_at when it retries too many times. - Take failed_at into account when finding potential jobs - Include the backtrace in the logged last_error - Make MAX_RUN_TIMES a constract --- README.textile | 1 + lib/delayed/job.rb | 76 +++++++++++++++++++++++++--------------------- spec/database.rb | 3 +- spec/job_spec.rb | 49 +++++++++++++++++------------- 4 files changed, 72 insertions(+), 57 deletions(-) diff --git a/README.textile b/README.textile index 6d7b10c2b..3c7428c20 100644 --- a/README.textile +++ b/README.textile @@ -29,6 +29,7 @@ The library evolves around a delayed_jobs table which looks as follows: table.string :last_error table.datetime :run_at table.datetime :locked_at + table.datetime :failed_at table.string :locked_by table.timestamps end diff --git a/lib/delayed/job.rb b/lib/delayed/job.rb index 524305264..f5feef96f 100644 --- a/lib/delayed/job.rb +++ b/lib/delayed/job.rb @@ -6,12 +6,13 @@ class DeserializationError < StandardError class Job < ActiveRecord::Base MAX_ATTEMPTS = 25 + MAX_RUN_TIME = 4.hours set_table_name :delayed_jobs cattr_accessor :worker_name self.worker_name = "pid:#{Process.pid}" - - NextTaskSQL = '`run_at` <= ? AND (`locked_at` IS NULL OR `locked_at` < ?) OR (`locked_by` = ?)' + + NextTaskSQL = '(`run_at` <= ? AND (`locked_at` IS NULL OR `locked_at` < ?) OR (`locked_by` = ?)) AND `failed_at` IS NULL' NextTaskOrder = 'priority DESC, run_at ASC' ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/ @@ -22,33 +23,38 @@ def self.clear_locks! connection.execute "UPDATE #{table_name} SET `locked_by`=NULL, `locked_at`=NULL WHERE `locked_by`=#{quote_value(worker_name)}" end + def failed? + failed_at + end + alias_method :failed, :failed? + def payload_object @payload_object ||= deserialize(self['handler']) - end - + end + def name text = handler.gsub(/\n/, ' ') - "#{id} (#{text.length > 40 ? "#{text[0..40]}..." : text})" + "#{id} (#{text.length > 40 ? "#{text[0..40]}..." : text})" end def payload_object=(object) self['handler'] = object.to_yaml end - def reschedule(message, time = nil) + def reschedule(message, backtrace = [], time = nil) if self.attempts < MAX_ATTEMPTS time ||= Job.db_time_now + (attempts ** 4) + 5 self.attempts += 1 self.run_at = time - self.last_error = message + self.last_error = message + "\n" + backtrace.join("\n") self.unlock save! - else + else logger.info "* [JOB] PERMANENTLY removing #{self.name} because of #{attempts} consequetive failures." - destroy + update_attribute :failed_at, Time.now end - end + end def self.enqueue(object, priority = 0) unless object.respond_to?(:perform) @@ -58,35 +64,35 @@ def self.enqueue(object, priority = 0) Job.create(:payload_object => object, :priority => priority) end - def self.find_available(limit = 5, max_run_time = 4.hours) + def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME) time_now = db_time_now ActiveRecord::Base.silence do find(:all, :conditions => [NextTaskSQL, time_now, time_now - max_run_time, worker_name], :order => NextTaskOrder, :limit => limit) end end - # Get the payload of the next job we can get an exclusive lock on. + # Get the payload of the next job we can get an exclusive lock on. # If no jobs are left we return nil - def self.reserve(max_run_time = 4.hours) - - # We get up to 5 jobs from the db. In face we cannot get exclusive access to a job we try the next. - # this leads to a more even distribution of jobs across the worker processes + def self.reserve(max_run_time = MAX_RUN_TIME) + + # We get up to 5 jobs from the db. In face we cannot get exclusive access to a job we try the next. + # this leads to a more even distribution of jobs across the worker processes find_available(5, max_run_time).each do |job| - begin + begin logger.info "* [JOB] aquiring lock on #{job.name}" job.lock_exclusively!(max_run_time, worker_name) - runtime = Benchmark.realtime do - yield job.payload_object + runtime = Benchmark.realtime do + yield job.payload_object job.destroy end logger.info "* [JOB] #{job.name} completed after %.4f" % runtime - - return job + + return job rescue LockError # We did not get the lock, some other worker process must have logger.warn "* [JOB] failed to aquire exclusive lock for #{job.name}" - rescue StandardError => e - job.reschedule e.message + rescue StandardError => e + job.reschedule e.message, e.backtrace logger.error "* [JOB] #{job.name} failed with #{e.class.name}: #{e.message} - #{job.attempts} failed attempts" logger.error(e) return job @@ -97,26 +103,26 @@ def self.reserve(max_run_time = 4.hours) end # This method is used internally by reserve method to ensure exclusive access - # to the given job. It will rise a LockError if it cannot get this lock. + # to the given job. It will rise a LockError if it cannot get this lock. def lock_exclusively!(max_run_time, worker = worker_name) now = self.class.db_time_now - - affected_rows = if locked_by != worker + + affected_rows = if locked_by != worker # We don't own this job so we will update the locked_by name and the locked_at connection.update(<<-end_sql, "#{self.class.name} Update to aquire exclusive lock") UPDATE #{self.class.table_name} - SET `locked_at`=#{quote_value(now)}, `locked_by`=#{quote_value(worker)} + SET `locked_at`=#{quote_value(now)}, `locked_by`=#{quote_value(worker)} WHERE #{self.class.primary_key} = #{quote_value(id)} AND (`locked_at` IS NULL OR `locked_at` < #{quote_value(now - max_run_time.to_i)}) end_sql else - # We already own this job, this may happen if the job queue crashes. + # We already own this job, this may happen if the job queue crashes. # Simply resume and update the locked_at connection.update(<<-end_sql, "#{self.class.name} Update exclusive lock") UPDATE #{self.class.table_name} - SET `locked_at`=#{quote_value(now)} + SET `locked_at`=#{quote_value(now)} WHERE #{self.class.primary_key} = #{quote_value(id)} AND (`locked_by`=#{quote_value(worker)}) end_sql @@ -140,7 +146,7 @@ def self.work_off(num = 100) num.times do - job = self.reserve do |j| + job = self.reserve do |j| begin j.perform success += 1 @@ -168,7 +174,7 @@ def deserialize(source) if handler.nil? if source =~ ParseObjectFromYaml - # Constantize the object so that ActiveSupport can attempt + # Constantize the object so that ActiveSupport can attempt # its auto loading magic. Will raise LoadError if not successful. attempt_to_load($1) @@ -180,18 +186,18 @@ def deserialize(source) if handler.is_a?(YAML::Object) - # Constantize the object so that ActiveSupport can attempt + # Constantize the object so that ActiveSupport can attempt # its auto loading magic. Will raise LoadError if not successful. attempt_to_load(handler.class) # If successful, retry the yaml.load handler = YAML.load(source) - return handler if handler.respond_to?(:perform) + return handler if handler.respond_to?(:perform) end raise DeserializationError, 'Job failed to load: Unknown handler. Try to manually require the appropiate file.' - rescue TypeError, LoadError, NameError => e + rescue TypeError, LoadError, NameError => e raise DeserializationError, "Job failed to load: #{e.message}. Try to manually require the required file." end @@ -212,4 +218,4 @@ def before_save end end -end \ No newline at end of file +end diff --git a/spec/database.rb b/spec/database.rb index 74b28295d..096d6c550 100644 --- a/spec/database.rb +++ b/spec/database.rb @@ -20,6 +20,7 @@ table.string :last_error table.datetime :run_at table.datetime :locked_at + table.datetime :failed_at table.string :locked_by table.timestamps end @@ -33,4 +34,4 @@ # Purely useful for test cases... class Story < ActiveRecord::Base def tell; text; end -end \ No newline at end of file +end diff --git a/spec/job_spec.rb b/spec/job_spec.rb index 82d663e24..417bdf206 100644 --- a/spec/job_spec.rb +++ b/spec/job_spec.rb @@ -34,14 +34,15 @@ def perform; raise 'did not work'; end SimpleJob.runs.should == 1 end - - it "should re-schedule by about 1 second at first and increment this more and more minutes when it fails to execute properly" do + + it "should re-schedule by about 1 second at first and increment this more and more minutes when it fails to execute properly" do Delayed::Job.enqueue ErrorJob.new Delayed::Job.work_off(1) job = Delayed::Job.find(:first) - - job.last_error.should == 'did not work' + + job.last_error.should =~ /did not work/ + job.last_error.should =~ /job_spec.rb:10:in `perform'/ job.attempts.should == 1 job.run_at.should > Delayed::Job.db_time_now - 10.minutes @@ -50,7 +51,7 @@ def perform; raise 'did not work'; end it "should raise an DeserializationError when the job class is totally unknown" do - job = Delayed::Job.new + job = Delayed::Job.new job['handler'] = "--- !ruby/object:JobThatDoesNotExist {}" lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError) @@ -87,12 +88,18 @@ def perform; raise 'did not work'; end job.should_receive(:attempt_to_load).with('Delayed::JobThatDoesNotExist').and_return(true) lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError) - end + end - it "should be removed if it failed more than MAX_ATTEMPTS times" do + it "should be failed if it failed more than MAX_ATTEMPTS times" do @job = Delayed::Job.create :payload_object => SimpleJob.new, :attempts => 50 - @job.should_receive(:destroy) + @job.reload.failed.should == nil @job.reschedule 'FAIL' + @job.reload.failed.should_not == nil + end + + it "should never find failed jobs" do + @job = Delayed::Job.create :payload_object => SimpleJob.new, :attempts => 50, :failed_at => Time.now + Delayed::Job.find_available(1).length.should == 0 end describe "when another worker is already performing an task, it" do @@ -104,14 +111,14 @@ def perform; raise 'did not work'; end it "should not allow a second worker to get exclusive access" do lambda { @job.lock_exclusively! 4.hours, 'worker2' }.should raise_error(Delayed::Job::LockError) - end + end it "should not allow a second worker to get exclusive access if the timeout has passed" do - - @job.lock_exclusively! 1.minute, 'worker2' - - end - + + @job.lock_exclusively! 1.minute, 'worker2' + + end + it "should be able to get access to the task if it was started more then max_age ago" do @job.locked_at = 5.hours.ago @job.save @@ -121,24 +128,24 @@ def perform; raise 'did not work'; end @job.locked_by.should == 'worker2' @job.locked_at.should > 1.minute.ago end - + it "should not be found by another worker" do Delayed::Job.worker_name = 'worker2' - + Delayed::Job.find_available(1, 6.minutes).length.should == 0 end - + it "should be found by another worker if the time has expired" do Delayed::Job.worker_name = 'worker2' - + Delayed::Job.find_available(1, 4.minutes).length.should == 1 end - it "should be able to get exclusive access again when the worker name is the same" do + it "should be able to get exclusive access again when the worker name is the same" do @job.lock_exclusively! 5.minutes, 'worker1' @job.lock_exclusively! 5.minutes, 'worker1' @job.lock_exclusively! 5.minutes, 'worker1' - end + end end -end \ No newline at end of file +end