Skip to content

Commit

Permalink
merging tobi/master in and fixing spec logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Rob Ares committed Nov 13, 2008
2 parents be23e26 + d3a4b0a commit 146642f
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 61 deletions.
4 changes: 3 additions & 1 deletion README.textile
Expand Up @@ -13,7 +13,8 @@ It is a direct extraction from Shopify where the job table is responsible for a
* spam checks

h2. Changes


* 1.7 Added failed_at column which can optionally be set after a certain amount of failed job attempts. By default failed job attempts are destroyed after about a month.
* 1.6 Renamed locked_until to locked_at. We now store when we start a given job instead of how long it will be locked by the worker. This allows us to get a reading on how long a job took to execute.
* 1.5 Job runners can now be run in parallel. Two new database columns are needed: locked_until and locked_by. This allows us to use pessimistic locking, which enables us to run as many worker processes as we need to speed up queue processing.
* 1.0 Initial release
Expand All @@ -29,6 +30,7 @@ The library evolves around a delayed_jobs table which looks as follows:
table.string :last_error
table.datetime :run_at
table.datetime :locked_at
table.datetime :failed_at
table.string :locked_by
table.timestamps
end
Expand Down
108 changes: 65 additions & 43 deletions lib/delayed/job.rb
Expand Up @@ -6,18 +6,29 @@ class DeserializationError < StandardError

class Job < ActiveRecord::Base
MAX_ATTEMPTS = 25
MAX_RUN_TIME = 4.hours
set_table_name :delayed_jobs

cattr_accessor :worker_name, :min_priority, :max_priority
self.worker_name = "pid:#{Process.pid}"
self.min_priority = nil
self.max_priority = nil

# Conditions to find tasks that are locked by this process or one that has
# been created before now and is not currently locked.
NextTaskSQL = "(locked_by = ?) or (run_at <= ? and (locked_at is null or locked_at < ?))"
NextTaskOrder = "priority DESC, run_at ASC"
# By default failed jobs are destroyed after too many attempts.
# If you want to keep them around (perhaps to inspect the reason
# for the failure), set this to false.
cattr_accessor :destroy_failed_jobs
self.destroy_failed_jobs = true

# Every worker has a unique name which by default is the pid of the process.
# There are some advantages to overriding this with something which survives worker retarts:
# Workers can safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.
cattr_accessor :worker_name
self.worker_name = "pid:#{Process.pid}"

NextTaskSQL = '(run_at <= ? AND (locked_at IS NULL OR locked_at < ?) OR (locked_by = ?)) AND failed_at IS NULL'
NextTaskOrder = 'priority DESC, run_at ASC'

ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/

cattr_accessor :min_priority, :max_priority
self.min_priority = nil
self.max_priority = nil

class LockError < StandardError
end
Expand All @@ -26,33 +37,38 @@ def self.clear_locks!
update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name])
end

def failed?
failed_at
end
alias_method :failed, :failed?

def payload_object
@payload_object ||= deserialize(self['handler'])
end
end

def name
text = handler.gsub(/\n/, ' ')
"#{id} (#{text.length > 40 ? "#{text[0..40]}..." : text})"
"#{id} (#{text.length > 40 ? "#{text[0..40]}..." : text})"
end

def payload_object=(object)
self['handler'] = object.to_yaml
end

def reschedule(message, time = nil)
def reschedule(message, backtrace = [], time = nil)
if self.attempts < MAX_ATTEMPTS
time ||= Job.db_time_now + (attempts ** 4) + 5

self.attempts += 1
self.run_at = time
self.last_error = message
self.last_error = message + "\n" + backtrace.join("\n")
self.unlock
save!
else
else
logger.info "* [JOB] PERMANENTLY removing #{self.name} because of #{attempts} consequetive failures."
destroy
destroy_failed_jobs ? destroy : update_attribute(:failed_at, Time.now)
end
end
end

def self.enqueue(object, priority = 0)
unless object.respond_to?(:perform)
Expand All @@ -62,12 +78,13 @@ def self.enqueue(object, priority = 0)
Job.create(:payload_object => object, :priority => priority.to_i)
end

def self.find_available(limit = 5)
def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME)

time_now = db_time_now

sql = NextTaskSQL.dup
conditions = [worker_name, time_now, time_now]

conditions = [time_now, time_now - max_run_time, worker_name]

if self.min_priority
sql << ' AND (priority >= ?)'
Expand All @@ -78,38 +95,37 @@ def self.find_available(limit = 5)
sql << ' AND (priority <= ?)'
conditions << max_priority
end

conditions.unshift(sql)

ActiveRecord::Base.silence do
find(:all, :conditions => conditions, :order => NextTaskOrder, :limit => limit)
end
end
# Get the payload of the next job we can get an exclusive lock on.

# Get the payload of the next job we can get an exclusive lock on.
# If no jobs are left we return nil
def self.reserve(max_run_time = 4.hours)
# We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next.
# this leads to a more even distribution of jobs across the worker processes
find_available(5).each do |job|
begin
def self.reserve(max_run_time = MAX_RUN_TIME)

# We get up to 5 jobs from the db. In face we cannot get exclusive access to a job we try the next.
# this leads to a more even distribution of jobs across the worker processes
find_available(5, max_run_time).each do |job|
begin
logger.info "* [JOB] aquiring lock on #{job.name}"
job.lock_exclusively!(max_run_time, worker_name)
runtime = Benchmark.realtime do
yield job.payload_object
runtime = Benchmark.realtime do
yield job.payload_object
job.destroy
end
logger.info "* [JOB] #{job.name} completed after %.4f" % runtime
return job

return job
rescue LockError
# We did not get the lock, some other worker process must have
logger.warn "* [JOB] failed to aquire exclusive lock for #{job.name}"
rescue StandardError => e
job.reschedule e.message
logger.error "* [JOB] #{job.name} failed with #{e.class.name}: #{e.message} - #{job.attempts} failed attempts"
logger.error(e)
job.reschedule e.message, e.backtrace
log_exception(job, e)
return job
end
end
Expand All @@ -118,7 +134,7 @@ def self.reserve(max_run_time = 4.hours)
end

# This method is used internally by reserve method to ensure exclusive access
# to the given job. It will rise a LockError if it cannot get this lock.
# to the given job. It will rise a LockError if it cannot get this lock.
def lock_exclusively!(max_run_time, worker = worker_name)
now = self.class.db_time_now
affected_rows = if locked_by != worker
Expand All @@ -140,12 +156,18 @@ def unlock
self.locked_by = nil
end

# This is a good hook if you need to report job processing errors in additional or different ways
def self.log_exception(job, error)
logger.error "* [JOB] #{job.name} failed with #{error.class.name}: #{error.message} - #{job.attempts} failed attempts"
logger.error(error)
end

def self.work_off(num = 100)
success, failure = 0, 0

num.times do

job = self.reserve do |j|
job = self.reserve do |j|
begin
j.perform
success += 1
Expand Down Expand Up @@ -173,7 +195,7 @@ def deserialize(source)
if handler.nil?
if source =~ ParseObjectFromYaml

# Constantize the object so that ActiveSupport can attempt
# Constantize the object so that ActiveSupport can attempt
# its auto loading magic. Will raise LoadError if not successful.
attempt_to_load($1)

Expand All @@ -185,18 +207,18 @@ def deserialize(source)

if handler.is_a?(YAML::Object)

# Constantize the object so that ActiveSupport can attempt
# Constantize the object so that ActiveSupport can attempt
# its auto loading magic. Will raise LoadError if not successful.
attempt_to_load(handler.class)

# If successful, retry the yaml.load
handler = YAML.load(source)
return handler if handler.respond_to?(:perform)
return handler if handler.respond_to?(:perform)
end

raise DeserializationError, 'Job failed to load: Unknown handler. Try to manually require the appropiate file.'

rescue TypeError, LoadError, NameError => e
rescue TypeError, LoadError, NameError => e

raise DeserializationError, "Job failed to load: #{e.message}. Try to manually require the required file."
end
Expand All @@ -217,4 +239,4 @@ def before_save
end

end
end
end
3 changes: 2 additions & 1 deletion spec/database.rb
Expand Up @@ -21,6 +21,7 @@
table.datetime :run_at
table.datetime :locked_at
table.string :locked_by
table.datetime :failed_at
table.timestamps
end

Expand All @@ -34,4 +35,4 @@
# Purely useful for test cases...
class Story < ActiveRecord::Base
def tell; text; end
end
end
68 changes: 52 additions & 16 deletions spec/job_spec.rb
Expand Up @@ -43,14 +43,15 @@ def perform; raise 'did not work'; end

SimpleJob.runs.should == 1
end
it "should re-schedule by about 1 second at first and increment this more and more minutes when it fails to execute properly" do

it "should re-schedule by about 1 second at first and increment this more and more minutes when it fails to execute properly" do
Delayed::Job.enqueue ErrorJob.new
Delayed::Job.work_off(1)

job = Delayed::Job.find(:first)

job.last_error.should == 'did not work'

job.last_error.should =~ /did not work/
job.last_error.should =~ /job_spec.rb:10:in `perform'/
job.attempts.should == 1

job.run_at.should > Delayed::Job.db_time_now - 10.minutes
Expand All @@ -59,7 +60,7 @@ def perform; raise 'did not work'; end

it "should raise an DeserializationError when the job class is totally unknown" do

job = Delayed::Job.new
job = Delayed::Job.new
job['handler'] = "--- !ruby/object:JobThatDoesNotExist {}"

lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
Expand Down Expand Up @@ -96,12 +97,34 @@ def perform; raise 'did not work'; end

job.should_receive(:attempt_to_load).with('Delayed::JobThatDoesNotExist').and_return(true)
lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
end
end

it "should be failed if it failed more than MAX_ATTEMPTS times and we don't want to destroy jobs" do
default = Delayed::Job.destroy_failed_jobs
Delayed::Job.destroy_failed_jobs = false

@job = Delayed::Job.create :payload_object => SimpleJob.new, :attempts => 50
@job.reload.failed_at.should == nil
@job.reschedule 'FAIL'
@job.reload.failed_at.should_not == nil

Delayed::Job.destroy_failed_jobs = default
end

it "should be destroyed if it failed more than MAX_ATTEMPTS times and we want to destroy jobs" do
default = Delayed::Job.destroy_failed_jobs
Delayed::Job.destroy_failed_jobs = true

it "should be removed if it failed more than MAX_ATTEMPTS times" do
@job = Delayed::Job.create :payload_object => SimpleJob.new, :attempts => 50
@job.should_receive(:destroy)
@job.reschedule 'FAIL'

Delayed::Job.destroy_failed_jobs = default
end

it "should never find failed jobs" do
@job = Delayed::Job.create :payload_object => SimpleJob.new, :attempts => 50, :failed_at => Time.now
Delayed::Job.find_available(1).length.should == 0
end

context "when another worker is already performing an task, it" do
Expand All @@ -114,7 +137,7 @@ def perform; raise 'did not work'; end
it "should not allow a second worker to get exclusive access" do
lambda { @job.lock_exclusively! 4.hours, 'worker2' }.should raise_error(Delayed::Job::LockError)
end

it "should allow a second worker to get exclusive access if the timeout has passed" do
lambda { @job.lock_exclusively! 1.minute, 'worker2' }.should_not raise_error(Delayed::Job::LockError)
end
Expand All @@ -129,7 +152,20 @@ def perform; raise 'did not work'; end
@job.locked_at.should > 1.minute.ago
end

it "should be able to get exclusive access again when the worker name is the same" do
it "should not be found by another worker" do
Delayed::Job.worker_name = 'worker2'

Delayed::Job.find_available(1, 6.minutes).length.should == 0
end

it "should be found by another worker if the time has expired" do
Delayed::Job.worker_name = 'worker2'

Delayed::Job.find_available(1, 4.minutes).length.should == 1
end


it "should be able to get exclusive access again when the worker name is the same" do
@job.lock_exclusively! 5.minutes, 'worker1'
@job.lock_exclusively! 5.minutes, 'worker1'
@job.lock_exclusively! 5.minutes, 'worker1'
Expand Down Expand Up @@ -199,16 +235,16 @@ def perform; raise 'did not work'; end
context "while running alongside other workers with enqueued jobs, it" do
before(:each) do
Delayed::Job.worker_name = 'worker1'
Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => 5.hours.ago)
Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker2', :locked_at => 3.hours.ago)
Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => 2.hours.ago)
Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => (Delayed::Job.db_time_now - 3.minutes))
Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker2', :locked_at => (Delayed::Job.db_time_now - 11.minutes))
Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => (Delayed::Job.db_time_now - 2.minutes))
end

it "should remove only jobs created by the current worker" do
it "should only find jobs if the lock has expired reguardless of the worker" do
SimpleJob.runs.should == 0
Delayed::Job.work_off(3)
SimpleJob.runs.should == 2
Delayed::Job.find_available.length.should == 1
Delayed::Job.work_off(5)
SimpleJob.runs.should == 2
Delayed::Job.find_available(5, 10.minutes).length.should == 1
end

end
Expand Down

0 comments on commit 146642f

Please sign in to comment.