diff --git a/README.textile b/README.textile index cc78253b4..f6c3c5c6c 100644 --- a/README.textile +++ b/README.textile @@ -13,7 +13,8 @@ It is a direct extraction from Shopify where the job table is responsible for a * spam checks h2. Changes - + +* 1.7 Added failed_at column which can optionally be set after a certain amount of failed job attempts. By default failed job attempts are destroyed after about a month. * 1.6 Renamed locked_until to locked_at. We now store when we start a given job instead of how long it will be locked by the worker. This allows us to get a reading on how long a job took to execute. * 1.5 Job runners can now be run in parallel. Two new database columns are needed: locked_until and locked_by. This allows us to use pessimistic locking, which enables us to run as many worker processes as we need to speed up queue processing. * 1.0 Initial release @@ -29,6 +30,7 @@ The library evolves around a delayed_jobs table which looks as follows: table.string :last_error table.datetime :run_at table.datetime :locked_at + table.datetime :failed_at table.string :locked_by table.timestamps end diff --git a/lib/delayed/job.rb b/lib/delayed/job.rb index a03e44162..c5ceab030 100644 --- a/lib/delayed/job.rb +++ b/lib/delayed/job.rb @@ -6,18 +6,29 @@ class DeserializationError < StandardError class Job < ActiveRecord::Base MAX_ATTEMPTS = 25 + MAX_RUN_TIME = 4.hours set_table_name :delayed_jobs - cattr_accessor :worker_name, :min_priority, :max_priority - self.worker_name = "pid:#{Process.pid}" - self.min_priority = nil - self.max_priority = nil - - # Conditions to find tasks that are locked by this process or one that has - # been created before now and is not currently locked. - NextTaskSQL = "(locked_by = ?) or (run_at <= ? and (locked_at is null or locked_at < ?))" - NextTaskOrder = "priority DESC, run_at ASC" + # By default failed jobs are destroyed after too many attempts. + # If you want to keep them around (perhaps to inspect the reason + # for the failure), set this to false. + cattr_accessor :destroy_failed_jobs + self.destroy_failed_jobs = true + + # Every worker has a unique name which by default is the pid of the process. + # There are some advantages to overriding this with something which survives worker retarts: + # Workers can safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before. + cattr_accessor :worker_name + self.worker_name = "pid:#{Process.pid}" + + NextTaskSQL = '(run_at <= ? AND (locked_at IS NULL OR locked_at < ?) OR (locked_by = ?)) AND failed_at IS NULL' + NextTaskOrder = 'priority DESC, run_at ASC' + ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/ + + cattr_accessor :min_priority, :max_priority + self.min_priority = nil + self.max_priority = nil class LockError < StandardError end @@ -26,33 +37,38 @@ def self.clear_locks! update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name]) end + def failed? + failed_at + end + alias_method :failed, :failed? + def payload_object @payload_object ||= deserialize(self['handler']) - end - + end + def name text = handler.gsub(/\n/, ' ') - "#{id} (#{text.length > 40 ? "#{text[0..40]}..." : text})" + "#{id} (#{text.length > 40 ? "#{text[0..40]}..." : text})" end def payload_object=(object) self['handler'] = object.to_yaml end - def reschedule(message, time = nil) + def reschedule(message, backtrace = [], time = nil) if self.attempts < MAX_ATTEMPTS time ||= Job.db_time_now + (attempts ** 4) + 5 self.attempts += 1 self.run_at = time - self.last_error = message + self.last_error = message + "\n" + backtrace.join("\n") self.unlock save! - else + else logger.info "* [JOB] PERMANENTLY removing #{self.name} because of #{attempts} consequetive failures." - destroy + destroy_failed_jobs ? destroy : update_attribute(:failed_at, Time.now) end - end + end def self.enqueue(object, priority = 0) unless object.respond_to?(:perform) @@ -62,12 +78,13 @@ def self.enqueue(object, priority = 0) Job.create(:payload_object => object, :priority => priority.to_i) end - def self.find_available(limit = 5) + def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME) time_now = db_time_now sql = NextTaskSQL.dup - conditions = [worker_name, time_now, time_now] + + conditions = [time_now, time_now - max_run_time, worker_name] if self.min_priority sql << ' AND (priority >= ?)' @@ -78,38 +95,37 @@ def self.find_available(limit = 5) sql << ' AND (priority <= ?)' conditions << max_priority end - + conditions.unshift(sql) - + ActiveRecord::Base.silence do find(:all, :conditions => conditions, :order => NextTaskOrder, :limit => limit) end end - - # Get the payload of the next job we can get an exclusive lock on. + + # Get the payload of the next job we can get an exclusive lock on. # If no jobs are left we return nil - def self.reserve(max_run_time = 4.hours) - - # We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next. - # this leads to a more even distribution of jobs across the worker processes - find_available(5).each do |job| - begin + def self.reserve(max_run_time = MAX_RUN_TIME) + + # We get up to 5 jobs from the db. In face we cannot get exclusive access to a job we try the next. + # this leads to a more even distribution of jobs across the worker processes + find_available(5, max_run_time).each do |job| + begin logger.info "* [JOB] aquiring lock on #{job.name}" job.lock_exclusively!(max_run_time, worker_name) - runtime = Benchmark.realtime do - yield job.payload_object + runtime = Benchmark.realtime do + yield job.payload_object job.destroy end logger.info "* [JOB] #{job.name} completed after %.4f" % runtime - - return job + + return job rescue LockError # We did not get the lock, some other worker process must have logger.warn "* [JOB] failed to aquire exclusive lock for #{job.name}" rescue StandardError => e - job.reschedule e.message - logger.error "* [JOB] #{job.name} failed with #{e.class.name}: #{e.message} - #{job.attempts} failed attempts" - logger.error(e) + job.reschedule e.message, e.backtrace + log_exception(job, e) return job end end @@ -118,7 +134,7 @@ def self.reserve(max_run_time = 4.hours) end # This method is used internally by reserve method to ensure exclusive access - # to the given job. It will rise a LockError if it cannot get this lock. + # to the given job. It will rise a LockError if it cannot get this lock. def lock_exclusively!(max_run_time, worker = worker_name) now = self.class.db_time_now affected_rows = if locked_by != worker @@ -140,12 +156,18 @@ def unlock self.locked_by = nil end + # This is a good hook if you need to report job processing errors in additional or different ways + def self.log_exception(job, error) + logger.error "* [JOB] #{job.name} failed with #{error.class.name}: #{error.message} - #{job.attempts} failed attempts" + logger.error(error) + end + def self.work_off(num = 100) success, failure = 0, 0 num.times do - job = self.reserve do |j| + job = self.reserve do |j| begin j.perform success += 1 @@ -173,7 +195,7 @@ def deserialize(source) if handler.nil? if source =~ ParseObjectFromYaml - # Constantize the object so that ActiveSupport can attempt + # Constantize the object so that ActiveSupport can attempt # its auto loading magic. Will raise LoadError if not successful. attempt_to_load($1) @@ -185,18 +207,18 @@ def deserialize(source) if handler.is_a?(YAML::Object) - # Constantize the object so that ActiveSupport can attempt + # Constantize the object so that ActiveSupport can attempt # its auto loading magic. Will raise LoadError if not successful. attempt_to_load(handler.class) # If successful, retry the yaml.load handler = YAML.load(source) - return handler if handler.respond_to?(:perform) + return handler if handler.respond_to?(:perform) end raise DeserializationError, 'Job failed to load: Unknown handler. Try to manually require the appropiate file.' - rescue TypeError, LoadError, NameError => e + rescue TypeError, LoadError, NameError => e raise DeserializationError, "Job failed to load: #{e.message}. Try to manually require the required file." end @@ -217,4 +239,4 @@ def before_save end end -end \ No newline at end of file +end diff --git a/spec/database.rb b/spec/database.rb index 675c56efb..ea7fa3e35 100644 --- a/spec/database.rb +++ b/spec/database.rb @@ -21,6 +21,7 @@ table.datetime :run_at table.datetime :locked_at table.string :locked_by + table.datetime :failed_at table.timestamps end @@ -34,4 +35,4 @@ # Purely useful for test cases... class Story < ActiveRecord::Base def tell; text; end -end \ No newline at end of file +end diff --git a/spec/job_spec.rb b/spec/job_spec.rb index 06897b0c7..baa37d384 100644 --- a/spec/job_spec.rb +++ b/spec/job_spec.rb @@ -43,14 +43,15 @@ def perform; raise 'did not work'; end SimpleJob.runs.should == 1 end - - it "should re-schedule by about 1 second at first and increment this more and more minutes when it fails to execute properly" do + + it "should re-schedule by about 1 second at first and increment this more and more minutes when it fails to execute properly" do Delayed::Job.enqueue ErrorJob.new Delayed::Job.work_off(1) job = Delayed::Job.find(:first) - - job.last_error.should == 'did not work' + + job.last_error.should =~ /did not work/ + job.last_error.should =~ /job_spec.rb:10:in `perform'/ job.attempts.should == 1 job.run_at.should > Delayed::Job.db_time_now - 10.minutes @@ -59,7 +60,7 @@ def perform; raise 'did not work'; end it "should raise an DeserializationError when the job class is totally unknown" do - job = Delayed::Job.new + job = Delayed::Job.new job['handler'] = "--- !ruby/object:JobThatDoesNotExist {}" lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError) @@ -96,12 +97,34 @@ def perform; raise 'did not work'; end job.should_receive(:attempt_to_load).with('Delayed::JobThatDoesNotExist').and_return(true) lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError) - end + end + + it "should be failed if it failed more than MAX_ATTEMPTS times and we don't want to destroy jobs" do + default = Delayed::Job.destroy_failed_jobs + Delayed::Job.destroy_failed_jobs = false + + @job = Delayed::Job.create :payload_object => SimpleJob.new, :attempts => 50 + @job.reload.failed_at.should == nil + @job.reschedule 'FAIL' + @job.reload.failed_at.should_not == nil + + Delayed::Job.destroy_failed_jobs = default + end + + it "should be destroyed if it failed more than MAX_ATTEMPTS times and we want to destroy jobs" do + default = Delayed::Job.destroy_failed_jobs + Delayed::Job.destroy_failed_jobs = true - it "should be removed if it failed more than MAX_ATTEMPTS times" do @job = Delayed::Job.create :payload_object => SimpleJob.new, :attempts => 50 @job.should_receive(:destroy) @job.reschedule 'FAIL' + + Delayed::Job.destroy_failed_jobs = default + end + + it "should never find failed jobs" do + @job = Delayed::Job.create :payload_object => SimpleJob.new, :attempts => 50, :failed_at => Time.now + Delayed::Job.find_available(1).length.should == 0 end context "when another worker is already performing an task, it" do @@ -114,7 +137,7 @@ def perform; raise 'did not work'; end it "should not allow a second worker to get exclusive access" do lambda { @job.lock_exclusively! 4.hours, 'worker2' }.should raise_error(Delayed::Job::LockError) end - + it "should allow a second worker to get exclusive access if the timeout has passed" do lambda { @job.lock_exclusively! 1.minute, 'worker2' }.should_not raise_error(Delayed::Job::LockError) end @@ -129,7 +152,20 @@ def perform; raise 'did not work'; end @job.locked_at.should > 1.minute.ago end - it "should be able to get exclusive access again when the worker name is the same" do + it "should not be found by another worker" do + Delayed::Job.worker_name = 'worker2' + + Delayed::Job.find_available(1, 6.minutes).length.should == 0 + end + + it "should be found by another worker if the time has expired" do + Delayed::Job.worker_name = 'worker2' + + Delayed::Job.find_available(1, 4.minutes).length.should == 1 + end + + + it "should be able to get exclusive access again when the worker name is the same" do @job.lock_exclusively! 5.minutes, 'worker1' @job.lock_exclusively! 5.minutes, 'worker1' @job.lock_exclusively! 5.minutes, 'worker1' @@ -199,16 +235,16 @@ def perform; raise 'did not work'; end context "while running alongside other workers with enqueued jobs, it" do before(:each) do Delayed::Job.worker_name = 'worker1' - Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => 5.hours.ago) - Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker2', :locked_at => 3.hours.ago) - Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => 2.hours.ago) + Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => (Delayed::Job.db_time_now - 3.minutes)) + Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker2', :locked_at => (Delayed::Job.db_time_now - 11.minutes)) + Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => (Delayed::Job.db_time_now - 2.minutes)) end - it "should remove only jobs created by the current worker" do + it "should only find jobs if the lock has expired reguardless of the worker" do SimpleJob.runs.should == 0 - Delayed::Job.work_off(3) - SimpleJob.runs.should == 2 - Delayed::Job.find_available.length.should == 1 + Delayed::Job.work_off(5) + SimpleJob.runs.should == 2 + Delayed::Job.find_available(5, 10.minutes).length.should == 1 end end