Skip to content

Commit

Permalink
Changes extracted from GitHub:
Browse files Browse the repository at this point in the history
- Add failed_at to schema
- Rather than destroying a job, set its failed_at when it retries too many times.
- Take failed_at into account when finding potential jobs
- Include the backtrace in the logged last_error
- Make MAX_RUN_TIMES a constract
  • Loading branch information
defunkt committed Nov 1, 2008
1 parent 7274142 commit a137def
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 57 deletions.
1 change: 1 addition & 0 deletions README.textile
Expand Up @@ -29,6 +29,7 @@ The library evolves around a delayed_jobs table which looks as follows:
table.string :last_error
table.datetime :run_at
table.datetime :locked_at
table.datetime :failed_at
table.string :locked_by
table.timestamps
end
Expand Down
76 changes: 41 additions & 35 deletions lib/delayed/job.rb
Expand Up @@ -6,12 +6,13 @@ class DeserializationError < StandardError

class Job < ActiveRecord::Base
MAX_ATTEMPTS = 25
MAX_RUN_TIME = 4.hours
set_table_name :delayed_jobs

cattr_accessor :worker_name
self.worker_name = "pid:#{Process.pid}"
NextTaskSQL = '`run_at` <= ? AND (`locked_at` IS NULL OR `locked_at` < ?) OR (`locked_by` = ?)'

NextTaskSQL = '(`run_at` <= ? AND (`locked_at` IS NULL OR `locked_at` < ?) OR (`locked_by` = ?)) AND `failed_at` IS NULL'
NextTaskOrder = 'priority DESC, run_at ASC'
ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/

Expand All @@ -22,33 +23,38 @@ def self.clear_locks!
connection.execute "UPDATE #{table_name} SET `locked_by`=NULL, `locked_at`=NULL WHERE `locked_by`=#{quote_value(worker_name)}"
end

def failed?
failed_at
end
alias_method :failed, :failed?

def payload_object
@payload_object ||= deserialize(self['handler'])
end
end

def name
text = handler.gsub(/\n/, ' ')
"#{id} (#{text.length > 40 ? "#{text[0..40]}..." : text})"
"#{id} (#{text.length > 40 ? "#{text[0..40]}..." : text})"
end

def payload_object=(object)
self['handler'] = object.to_yaml
end

def reschedule(message, time = nil)
def reschedule(message, backtrace = [], time = nil)
if self.attempts < MAX_ATTEMPTS
time ||= Job.db_time_now + (attempts ** 4) + 5

self.attempts += 1
self.run_at = time
self.last_error = message
self.last_error = message + "\n" + backtrace.join("\n")
self.unlock
save!
else
else
logger.info "* [JOB] PERMANENTLY removing #{self.name} because of #{attempts} consequetive failures."
destroy
update_attribute :failed_at, Time.now
end
end
end

def self.enqueue(object, priority = 0)
unless object.respond_to?(:perform)
Expand All @@ -58,35 +64,35 @@ def self.enqueue(object, priority = 0)
Job.create(:payload_object => object, :priority => priority)
end

def self.find_available(limit = 5, max_run_time = 4.hours)
def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME)
time_now = db_time_now
ActiveRecord::Base.silence do
find(:all, :conditions => [NextTaskSQL, time_now, time_now - max_run_time, worker_name], :order => NextTaskOrder, :limit => limit)
end
end

# Get the payload of the next job we can get an exclusive lock on.
# Get the payload of the next job we can get an exclusive lock on.
# If no jobs are left we return nil
def self.reserve(max_run_time = 4.hours)
# We get up to 5 jobs from the db. In face we cannot get exclusive access to a job we try the next.
# this leads to a more even distribution of jobs across the worker processes
def self.reserve(max_run_time = MAX_RUN_TIME)

# We get up to 5 jobs from the db. In face we cannot get exclusive access to a job we try the next.
# this leads to a more even distribution of jobs across the worker processes
find_available(5, max_run_time).each do |job|
begin
begin
logger.info "* [JOB] aquiring lock on #{job.name}"
job.lock_exclusively!(max_run_time, worker_name)
runtime = Benchmark.realtime do
yield job.payload_object
runtime = Benchmark.realtime do
yield job.payload_object
job.destroy
end
logger.info "* [JOB] #{job.name} completed after %.4f" % runtime
return job

return job
rescue LockError
# We did not get the lock, some other worker process must have
logger.warn "* [JOB] failed to aquire exclusive lock for #{job.name}"
rescue StandardError => e
job.reschedule e.message
rescue StandardError => e
job.reschedule e.message, e.backtrace
logger.error "* [JOB] #{job.name} failed with #{e.class.name}: #{e.message} - #{job.attempts} failed attempts"
logger.error(e)
return job
Expand All @@ -97,26 +103,26 @@ def self.reserve(max_run_time = 4.hours)
end

# This method is used internally by reserve method to ensure exclusive access
# to the given job. It will rise a LockError if it cannot get this lock.
# to the given job. It will rise a LockError if it cannot get this lock.
def lock_exclusively!(max_run_time, worker = worker_name)
now = self.class.db_time_now
affected_rows = if locked_by != worker

affected_rows = if locked_by != worker

# We don't own this job so we will update the locked_by name and the locked_at
connection.update(<<-end_sql, "#{self.class.name} Update to aquire exclusive lock")
UPDATE #{self.class.table_name}
SET `locked_at`=#{quote_value(now)}, `locked_by`=#{quote_value(worker)}
SET `locked_at`=#{quote_value(now)}, `locked_by`=#{quote_value(worker)}
WHERE #{self.class.primary_key} = #{quote_value(id)} AND (`locked_at` IS NULL OR `locked_at` < #{quote_value(now - max_run_time.to_i)})
end_sql

else

# We already own this job, this may happen if the job queue crashes.
# We already own this job, this may happen if the job queue crashes.
# Simply resume and update the locked_at
connection.update(<<-end_sql, "#{self.class.name} Update exclusive lock")
UPDATE #{self.class.table_name}
SET `locked_at`=#{quote_value(now)}
SET `locked_at`=#{quote_value(now)}
WHERE #{self.class.primary_key} = #{quote_value(id)} AND (`locked_by`=#{quote_value(worker)})
end_sql

Expand All @@ -140,7 +146,7 @@ def self.work_off(num = 100)

num.times do

job = self.reserve do |j|
job = self.reserve do |j|
begin
j.perform
success += 1
Expand Down Expand Up @@ -168,7 +174,7 @@ def deserialize(source)
if handler.nil?
if source =~ ParseObjectFromYaml

# Constantize the object so that ActiveSupport can attempt
# Constantize the object so that ActiveSupport can attempt
# its auto loading magic. Will raise LoadError if not successful.
attempt_to_load($1)

Expand All @@ -180,18 +186,18 @@ def deserialize(source)

if handler.is_a?(YAML::Object)

# Constantize the object so that ActiveSupport can attempt
# Constantize the object so that ActiveSupport can attempt
# its auto loading magic. Will raise LoadError if not successful.
attempt_to_load(handler.class)

# If successful, retry the yaml.load
handler = YAML.load(source)
return handler if handler.respond_to?(:perform)
return handler if handler.respond_to?(:perform)
end

raise DeserializationError, 'Job failed to load: Unknown handler. Try to manually require the appropiate file.'

rescue TypeError, LoadError, NameError => e
rescue TypeError, LoadError, NameError => e

raise DeserializationError, "Job failed to load: #{e.message}. Try to manually require the required file."
end
Expand All @@ -212,4 +218,4 @@ def before_save
end

end
end
end
3 changes: 2 additions & 1 deletion spec/database.rb
Expand Up @@ -20,6 +20,7 @@
table.string :last_error
table.datetime :run_at
table.datetime :locked_at
table.datetime :failed_at
table.string :locked_by
table.timestamps
end
Expand All @@ -33,4 +34,4 @@
# Purely useful for test cases...
class Story < ActiveRecord::Base
def tell; text; end
end
end
49 changes: 28 additions & 21 deletions spec/job_spec.rb
Expand Up @@ -34,14 +34,15 @@ def perform; raise 'did not work'; end

SimpleJob.runs.should == 1
end
it "should re-schedule by about 1 second at first and increment this more and more minutes when it fails to execute properly" do

it "should re-schedule by about 1 second at first and increment this more and more minutes when it fails to execute properly" do
Delayed::Job.enqueue ErrorJob.new
Delayed::Job.work_off(1)

job = Delayed::Job.find(:first)

job.last_error.should == 'did not work'

job.last_error.should =~ /did not work/
job.last_error.should =~ /job_spec.rb:10:in `perform'/
job.attempts.should == 1

job.run_at.should > Delayed::Job.db_time_now - 10.minutes
Expand All @@ -50,7 +51,7 @@ def perform; raise 'did not work'; end

it "should raise an DeserializationError when the job class is totally unknown" do

job = Delayed::Job.new
job = Delayed::Job.new
job['handler'] = "--- !ruby/object:JobThatDoesNotExist {}"

lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
Expand Down Expand Up @@ -87,12 +88,18 @@ def perform; raise 'did not work'; end

job.should_receive(:attempt_to_load).with('Delayed::JobThatDoesNotExist').and_return(true)
lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
end
end

it "should be removed if it failed more than MAX_ATTEMPTS times" do
it "should be failed if it failed more than MAX_ATTEMPTS times" do
@job = Delayed::Job.create :payload_object => SimpleJob.new, :attempts => 50
@job.should_receive(:destroy)
@job.reload.failed.should == nil
@job.reschedule 'FAIL'
@job.reload.failed.should_not == nil
end

it "should never find failed jobs" do
@job = Delayed::Job.create :payload_object => SimpleJob.new, :attempts => 50, :failed_at => Time.now
Delayed::Job.find_available(1).length.should == 0
end

describe "when another worker is already performing an task, it" do
Expand All @@ -104,14 +111,14 @@ def perform; raise 'did not work'; end

it "should not allow a second worker to get exclusive access" do
lambda { @job.lock_exclusively! 4.hours, 'worker2' }.should raise_error(Delayed::Job::LockError)
end
end

it "should not allow a second worker to get exclusive access if the timeout has passed" do
@job.lock_exclusively! 1.minute, 'worker2'
end

@job.lock_exclusively! 1.minute, 'worker2'

end

it "should be able to get access to the task if it was started more then max_age ago" do
@job.locked_at = 5.hours.ago
@job.save
Expand All @@ -121,24 +128,24 @@ def perform; raise 'did not work'; end
@job.locked_by.should == 'worker2'
@job.locked_at.should > 1.minute.ago
end

it "should not be found by another worker" do
Delayed::Job.worker_name = 'worker2'

Delayed::Job.find_available(1, 6.minutes).length.should == 0
end

it "should be found by another worker if the time has expired" do
Delayed::Job.worker_name = 'worker2'

Delayed::Job.find_available(1, 4.minutes).length.should == 1
end


it "should be able to get exclusive access again when the worker name is the same" do
it "should be able to get exclusive access again when the worker name is the same" do
@job.lock_exclusively! 5.minutes, 'worker1'
@job.lock_exclusively! 5.minutes, 'worker1'
@job.lock_exclusively! 5.minutes, 'worker1'
end
end
end
end
end

0 comments on commit a137def

Please sign in to comment.