Skip to content

Commit

Permalink
Merge branch 'master' of git://github.com/defunkt/delayed_job
Browse files Browse the repository at this point in the history
Conflicts:

	lib/delayed/job.rb
	spec/database.rb
	spec/job_spec.rb
  • Loading branch information
Tobias Lütke committed Nov 12, 2008
2 parents e7e0a56 + 757f585 commit e81577c
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 56 deletions.
1 change: 1 addition & 0 deletions README.textile
Expand Up @@ -29,6 +29,7 @@ The library evolves around a delayed_jobs table which looks as follows:
table.string :last_error
table.datetime :run_at
table.datetime :locked_at
table.datetime :failed_at
table.string :locked_by
table.timestamps
end
Expand Down
101 changes: 61 additions & 40 deletions lib/delayed/job.rb
Expand Up @@ -6,16 +6,28 @@ class DeserializationError < StandardError

class Job < ActiveRecord::Base
MAX_ATTEMPTS = 25
MAX_RUN_TIME = 4.hours
set_table_name :delayed_jobs

cattr_accessor :worker_name, :min_priority, :max_priority
self.worker_name = "pid:#{Process.pid}"
self.min_priority = nil
self.max_priority = nil

NextTaskSQL = '(`locked_by` = ?) OR (`run_at` <= ? AND (`locked_at` IS NULL OR `locked_at` < ?))'
# By default failed jobs are destroyed after too many attempts.
# If you want to keep them around (perhaps to inspect the reason
# for the failure), set this to false.
cattr_accessor :destroy_failed_jobs
self.destroy_failed_jobs = true

# Every worker has a unique name which by default is the pid of the process.
# There are some advantages to overriding this with something which survives worker retarts:
# Workers can safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.
cattr_accessor :worker_name
self.worker_name = "pid:#{Process.pid}"

NextTaskSQL = '(`run_at` <= ? AND (`locked_at` IS NULL OR `locked_at` < ?) OR (`locked_by` = ?)) AND `failed_at` IS NULL'
NextTaskOrder = 'priority DESC, run_at ASC'
ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/

cattr_accessor :min_priority, :max_priority
self.min_priority = nil
self.max_priority = nil

class LockError < StandardError
end
Expand All @@ -24,33 +36,38 @@ def self.clear_locks!
connection.execute "UPDATE #{table_name} SET `locked_by`=NULL, `locked_at`=NULL WHERE `locked_by`=#{quote_value(worker_name)}"
end

def failed?
failed_at
end
alias_method :failed, :failed?

def payload_object
@payload_object ||= deserialize(self['handler'])
end
end

def name
text = handler.gsub(/\n/, ' ')
"#{id} (#{text.length > 40 ? "#{text[0..40]}..." : text})"
"#{id} (#{text.length > 40 ? "#{text[0..40]}..." : text})"
end

def payload_object=(object)
self['handler'] = object.to_yaml
end

def reschedule(message, time = nil)
def reschedule(message, backtrace = [], time = nil)
if self.attempts < MAX_ATTEMPTS
time ||= Job.db_time_now + (attempts ** 4) + 5

self.attempts += 1
self.run_at = time
self.last_error = message
self.last_error = message + "\n" + backtrace.join("\n")
self.unlock
save!
else
else
logger.info "* [JOB] PERMANENTLY removing #{self.name} because of #{attempts} consequetive failures."
destroy
destroy_failed_jobs ? destroy : update_attribute(:failed_at, Time.now)
end
end
end

def self.enqueue(object, priority = 0)
unless object.respond_to?(:perform)
Expand All @@ -60,12 +77,12 @@ def self.enqueue(object, priority = 0)
Job.create(:payload_object => object, :priority => priority.to_i)
end

def self.find_available(limit = 5)
def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME)

time_now = db_time_now

sql = NextTaskSQL.dup
conditions = [time_now, time_now, worker_name]
conditions = [time_now, time_now - max_run_time, worker_name]

if self.min_priority
sql << ' AND (`priority` >= ?)'
Expand All @@ -85,29 +102,33 @@ def self.find_available(limit = 5)
end


# Get the payload of the next job we can get an exclusive lock on.
# Get the payload of the next job we can get an exclusive lock on.
# If no jobs are left we return nil
def self.reserve(max_run_time = 4.hours)
# We get up to 5 jobs from the db. In face we cannot get exclusive access to a job we try the next.
# this leads to a more even distribution of jobs across the worker processes
find_available(5).each do |job|
begin
def self.reserve(max_run_time = MAX_RUN_TIME)

# We get up to 5 jobs from the db. In face we cannot get exclusive access to a job we try the next.
# this leads to a more even distribution of jobs across the worker processes
find_available(5, max_run_time).each do |job|
begin
logger.info "* [JOB] aquiring lock on #{job.name}"
job.lock_exclusively!(max_run_time, worker_name)
runtime = Benchmark.realtime do
yield job.payload_object
runtime = Benchmark.realtime do
yield job.payload_object
job.destroy
end
logger.info "* [JOB] #{job.name} completed after %.4f" % runtime
return job

return job
rescue LockError
# We did not get the lock, some other worker process must have
logger.warn "* [JOB] failed to aquire exclusive lock for #{job.name}"
rescue StandardError => e
job.reschedule e.message
job.reschedule e.message, e.backtrace
log_exception(job, e)
rescue StandardError => e
job.reschedule e.message, e.backtrace
logger.error "* [JOB] #{job.name} failed with #{e.class.name}: #{e.message} - #{job.attempts} failed attempts"
logger.error(e)
return job
end
end
Expand All @@ -116,26 +137,26 @@ def self.reserve(max_run_time = 4.hours)
end

# This method is used internally by reserve method to ensure exclusive access
# to the given job. It will rise a LockError if it cannot get this lock.
# to the given job. It will rise a LockError if it cannot get this lock.
def lock_exclusively!(max_run_time, worker = worker_name)
now = self.class.db_time_now
affected_rows = if locked_by != worker

affected_rows = if locked_by != worker

# We don't own this job so we will update the locked_by name and the locked_at
connection.update(<<-end_sql, "#{self.class.name} Update to aquire exclusive lock")
UPDATE #{self.class.table_name}
SET `locked_at`=#{quote_value(now)}, `locked_by`=#{quote_value(worker)}
SET `locked_at`=#{quote_value(now)}, `locked_by`=#{quote_value(worker)}
WHERE #{self.class.primary_key} = #{quote_value(id)} AND (`locked_at` IS NULL OR `locked_at` < #{quote_value(now - max_run_time.to_i)})
end_sql

else

# We already own this job, this may happen if the job queue crashes.
# We already own this job, this may happen if the job queue crashes.
# Simply resume and update the locked_at
connection.update(<<-end_sql, "#{self.class.name} Update exclusive lock")
UPDATE #{self.class.table_name}
SET `locked_at`=#{quote_value(now)}
SET `locked_at`=#{quote_value(now)}
WHERE #{self.class.primary_key} = #{quote_value(id)} AND (`locked_by`=#{quote_value(worker)})
end_sql

Expand Down Expand Up @@ -165,7 +186,7 @@ def self.work_off(num = 100)

num.times do

job = self.reserve do |j|
job = self.reserve do |j|
begin
j.perform
success += 1
Expand Down Expand Up @@ -193,7 +214,7 @@ def deserialize(source)
if handler.nil?
if source =~ ParseObjectFromYaml

# Constantize the object so that ActiveSupport can attempt
# Constantize the object so that ActiveSupport can attempt
# its auto loading magic. Will raise LoadError if not successful.
attempt_to_load($1)

Expand All @@ -205,18 +226,18 @@ def deserialize(source)

if handler.is_a?(YAML::Object)

# Constantize the object so that ActiveSupport can attempt
# Constantize the object so that ActiveSupport can attempt
# its auto loading magic. Will raise LoadError if not successful.
attempt_to_load(handler.class)

# If successful, retry the yaml.load
handler = YAML.load(source)
return handler if handler.respond_to?(:perform)
return handler if handler.respond_to?(:perform)
end

raise DeserializationError, 'Job failed to load: Unknown handler. Try to manually require the appropiate file.'

rescue TypeError, LoadError, NameError => e
rescue TypeError, LoadError, NameError => e

raise DeserializationError, "Job failed to load: #{e.message}. Try to manually require the required file."
end
Expand All @@ -237,4 +258,4 @@ def before_save
end

end
end
end
3 changes: 2 additions & 1 deletion spec/database.rb
Expand Up @@ -21,6 +21,7 @@
table.datetime :run_at
table.datetime :locked_at
table.string :locked_by
table.datetime :failed_at
table.timestamps
end

Expand All @@ -34,4 +35,4 @@
# Purely useful for test cases...
class Story < ActiveRecord::Base
def tell; text; end
end
end
65 changes: 50 additions & 15 deletions spec/job_spec.rb
Expand Up @@ -43,14 +43,15 @@ def perform; raise 'did not work'; end

SimpleJob.runs.should == 1
end
it "should re-schedule by about 1 second at first and increment this more and more minutes when it fails to execute properly" do

it "should re-schedule by about 1 second at first and increment this more and more minutes when it fails to execute properly" do
Delayed::Job.enqueue ErrorJob.new
Delayed::Job.work_off(1)

job = Delayed::Job.find(:first)

job.last_error.should == 'did not work'

job.last_error.should =~ /did not work/
job.last_error.should =~ /job_spec.rb:10:in `perform'/
job.attempts.should == 1

job.run_at.should > Delayed::Job.db_time_now - 10.minutes
Expand All @@ -59,7 +60,7 @@ def perform; raise 'did not work'; end

it "should raise an DeserializationError when the job class is totally unknown" do

job = Delayed::Job.new
job = Delayed::Job.new
job['handler'] = "--- !ruby/object:JobThatDoesNotExist {}"

lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
Expand Down Expand Up @@ -96,12 +97,34 @@ def perform; raise 'did not work'; end

job.should_receive(:attempt_to_load).with('Delayed::JobThatDoesNotExist').and_return(true)
lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
end
end

it "should be failed if it failed more than MAX_ATTEMPTS times and we don't want to destroy jobs" do
default = Delayed::Job.destroy_failed_jobs
Delayed::Job.destroy_failed_jobs = false

@job = Delayed::Job.create :payload_object => SimpleJob.new, :attempts => 50
@job.reload.failed_at.should == nil
@job.reschedule 'FAIL'
@job.reload.failed_at.should_not == nil

Delayed::Job.destroy_failed_jobs = default
end

it "should be destroyed if it failed more than MAX_ATTEMPTS times and we want to destroy jobs" do
default = Delayed::Job.destroy_failed_jobs
Delayed::Job.destroy_failed_jobs = true

it "should be removed if it failed more than MAX_ATTEMPTS times" do
@job = Delayed::Job.create :payload_object => SimpleJob.new, :attempts => 50
@job.should_receive(:destroy)
@job.reschedule 'FAIL'

Delayed::Job.destroy_failed_jobs = default
end

it "should never find failed jobs" do
@job = Delayed::Job.create :payload_object => SimpleJob.new, :attempts => 50, :failed_at => Time.now
Delayed::Job.find_available(1).length.should == 0
end

describe "when another worker is already performing an task, it" do
Expand All @@ -113,14 +136,14 @@ def perform; raise 'did not work'; end

it "should not allow a second worker to get exclusive access" do
lambda { @job.lock_exclusively! 4.hours, 'worker2' }.should raise_error(Delayed::Job::LockError)
end
end

it "should not allow a second worker to get exclusive access if the timeout has passed" do
@job.lock_exclusively! 1.minute, 'worker2'
end

@job.lock_exclusively! 1.minute, 'worker2'

end

it "should be able to get access to the task if it was started more then max_age ago" do
@job.locked_at = 5.hours.ago
@job.save
Expand All @@ -131,8 +154,20 @@ def perform; raise 'did not work'; end
@job.locked_at.should > 1.minute.ago
end

it "should not be found by another worker" do
Delayed::Job.worker_name = 'worker2'

it "should be able to get exclusive access again when the worker name is the same" do
Delayed::Job.find_available(1, 6.minutes).length.should == 0
end

it "should be found by another worker if the time has expired" do
Delayed::Job.worker_name = 'worker2'

Delayed::Job.find_available(1, 4.minutes).length.should == 1
end


it "should be able to get exclusive access again when the worker name is the same" do
@job.lock_exclusively! 5.minutes, 'worker1'
@job.lock_exclusively! 5.minutes, 'worker1'
@job.lock_exclusively! 5.minutes, 'worker1'
Expand Down Expand Up @@ -173,4 +208,4 @@ def perform; raise 'did not work'; end

end

end
end

0 comments on commit e81577c

Please sign in to comment.