Skip to content

Commit

Permalink
Move #reschedule from Job to Worker. Refs collectiveidea#26
Browse files Browse the repository at this point in the history
  • Loading branch information
bkeepers committed Dec 19, 2009
1 parent 02d561a commit 7a5c8f4
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 91 deletions.
12 changes: 5 additions & 7 deletions README.textile
Expand Up @@ -134,18 +134,16 @@ The default Worker.max_run_time is 4.hours. If your job takes longer than that,
make sure your job doesn't exceed this time. You should set this to the longest time you think the job could take.

By default, it will delete failed jobs (and it always deletes successful jobs). If you want to keep failed jobs, set
Delayed::Job.destroy_failed_jobs = false. The failed jobs will be marked with non-null failed_at.
Delayed::Worker.destroy_failed_jobs = false. The failed jobs will be marked with non-null failed_at.

Here is an example of changing job parameters in Rails:

<pre>
# config/initializers/delayed_job_config.rb
Delayed::Job.destroy_failed_jobs = false
silence_warnings do
Delayed::Worker.sleep_delay = 60
Delayed::Worker.max_attempts = 3
Delayed::Worker.max_run_time = 5.minutes
end
Delayed::Worker.destroy_failed_jobs = false
Delayed::Worker.sleep_delay = 60
Delayed::Worker.max_attempts = 3
Delayed::Worker.max_run_time = 5.minutes
</pre>

h3. Cleaning up
Expand Down
23 changes: 0 additions & 23 deletions lib/delayed/job.rb
Expand Up @@ -10,12 +10,6 @@ class DeserializationError < StandardError
class Job < ActiveRecord::Base
set_table_name :delayed_jobs

# By default failed jobs are destroyed after too many attempts.
# If you want to keep them around (perhaps to inspect the reason
# for the failure), set this to false.
cattr_accessor :destroy_failed_jobs
self.destroy_failed_jobs = true

named_scope :ready_to_run, lambda {|worker_name, max_run_time|
{:conditions => ['(run_at <= ? AND (locked_at IS NULL OR locked_at < ?) OR locked_by = ?) AND failed_at IS NULL', db_time_now, db_time_now - max_run_time, worker_name]}
}
Expand Down Expand Up @@ -52,23 +46,6 @@ def payload_object=(object)
self['handler'] = object.to_yaml
end

# Reschedule the job in the future (when a job fails).
# Uses an exponential scale depending on the number of failed attempts.
def reschedule(message, backtrace = [], time = nil)
self.last_error = message + "\n" + backtrace.join("\n")

if (self.attempts += 1) < Worker.max_attempts
time ||= Job.db_time_now + (attempts ** 4) + 5

self.run_at = time
self.unlock
save!
else
logger.info "* [JOB] PERMANENTLY removing #{self.name} because of #{attempts} consecutive failures."
destroy_failed_jobs ? destroy : update_attribute(:failed_at, Delayed::Job.db_time_now)
end
end

# Add a job to the queue
def self.enqueue(*args, &block)
object = block_given? ? EvaledJob.new(&block) : args.shift
Expand Down
57 changes: 39 additions & 18 deletions lib/delayed/worker.rb
Expand Up @@ -5,6 +5,11 @@ class Worker
self.max_attempts = 25
self.max_run_time = 4.hours

# By default failed jobs are destroyed after too many attempts. If you want to keep them around
# (perhaps to inspect the reason for the failure), set this to false.
cattr_accessor :destroy_failed_jobs
self.destroy_failed_jobs = true

self.logger = if defined?(Merb::Logger)
Merb.logger
elsif defined?(RAILS_DEFAULT_LOGGER)
Expand All @@ -14,9 +19,16 @@ class Worker
# name_prefix is ignored if name is set directly
attr_accessor :name_prefix

# Every worker has a unique name which by default is the pid of the process.
# There are some advantages to overriding this with something which survives worker retarts:
# Workers can safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.
def initialize(options={})
@quiet = options[:quiet]
self.class.min_priority = options[:min_priority] if options.has_key?(:min_priority)
self.class.max_priority = options[:max_priority] if options.has_key?(:max_priority)
end

# Every worker has a unique name which by default is the pid of the process. There are some
# advantages to overriding this with something which survives worker retarts: Workers can#
# safely resume working on tasks which are locked by themselves. The worker will assume that
# it crashed before.
def name
return @name unless @name.nil?
"#{@name_prefix}host:#{Socket.gethostname} pid:#{Process.pid}" rescue "#{@name_prefix}pid:#{Process.pid}"
Expand All @@ -28,12 +40,6 @@ def name=(val)
@name = val
end

def initialize(options={})
@quiet = options[:quiet]
self.class.min_priority = options[:min_priority] if options.has_key?(:min_priority)
self.class.max_priority = options[:max_priority] if options.has_key?(:max_priority)
end

def start
say "*** Starting job worker #{name}"

Expand Down Expand Up @@ -63,13 +69,6 @@ def start
ensure
Delayed::Job.clear_locks!(name)
end

def say(text, level = Logger::INFO)
puts text unless @quiet
logger.add level, text if logger
end

protected

def run(job)
runtime = Benchmark.realtime do
Expand All @@ -84,11 +83,33 @@ def run(job)
return false # work failed
end

# Reschedule the job in the future (when a job fails).
# Uses an exponential scale depending on the number of failed attempts.
def reschedule(job, time = nil)
if (job.attempts += 1) < self.class.max_attempts
time ||= Job.db_time_now + (job.attempts ** 4) + 5
job.run_at = time
job.unlock
job.save!
else
say "* [JOB] PERMANENTLY removing #{job.name} because of #{job.attempts} consecutive failures.", Logger::INFO
self.class.destroy_failed_jobs ? job.destroy : job.update_attribute(:failed_at, Delayed::Job.db_time_now)
end
end

def say(text, level = Logger::INFO)
puts text unless @quiet
logger.add level, text if logger
end

protected

def handle_failed_job(job, error)
job.reschedule error.message, error.backtrace
job.last_error = error.message + "\n" + error.backtrace.join("\n")
say "* [JOB] #{name} failed with #{error.class.name}: #{error.message} - #{job.attempts} failed attempts", Logger::ERROR
reschedule(job)
end

# Run the next job we can get an exclusive lock on.
# If no jobs are left we return nil
def reserve_and_run_one_job
Expand Down
40 changes: 0 additions & 40 deletions spec/job_spec.rb
Expand Up @@ -102,46 +102,6 @@
lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
end

context "reschedule" do
before do
@job = Delayed::Job.create :payload_object => SimpleJob.new
end

context "and we want to destroy jobs" do
before do
Delayed::Job.destroy_failed_jobs = true
end

it "should be destroyed if it failed more than Worker.max_attempts times" do
@job.should_receive(:destroy)
Delayed::Worker.max_attempts.times { @job.reschedule 'FAIL' }
end

it "should not be destroyed if failed fewer than Worker.max_attempts times" do
@job.should_not_receive(:destroy)
(Delayed::Worker.max_attempts - 1).times { @job.reschedule 'FAIL' }
end
end

context "and we don't want to destroy jobs" do
before do
Delayed::Job.destroy_failed_jobs = false
end

it "should be failed if it failed more than Worker.max_attempts times" do
@job.reload.failed_at.should == nil
Delayed::Worker.max_attempts.times { @job.reschedule 'FAIL' }
@job.reload.failed_at.should_not == nil
end

it "should not be failed if it failed fewer than Worker.max_attempts times" do
(Delayed::Worker.max_attempts - 1).times { @job.reschedule 'FAIL' }
@job.reload.failed_at.should == nil
end

end
end

it "should never find failed jobs" do
@job = Delayed::Job.create :payload_object => SimpleJob.new, :attempts => 50, :failed_at => Delayed::Job.db_time_now
Delayed::Job.find_available('worker', 1).length.should == 0
Expand Down
47 changes: 44 additions & 3 deletions spec/worker_spec.rb
Expand Up @@ -7,7 +7,6 @@ def job_create(opts = {})

before(:all) do
Delayed::Worker.send :public, :work_off
Delayed::Worker.send :public, :run
end

before(:each) do
Expand Down Expand Up @@ -90,14 +89,14 @@ def job_create(opts = {})
describe "failed jobs" do
before do
# reset defaults
Delayed::Job.destroy_failed_jobs = true
Delayed::Worker.destroy_failed_jobs = true
Delayed::Worker.max_attempts = 25

@job = Delayed::Job.enqueue ErrorJob.new
end

it "should record last_error when destroy_failed_jobs = false, max_attempts = 1" do
Delayed::Job.destroy_failed_jobs = false
Delayed::Worker.destroy_failed_jobs = false
Delayed::Worker.max_attempts = 1
@worker.run(@job)
@job.reload
Expand All @@ -117,4 +116,46 @@ def job_create(opts = {})
@job.run_at.should < Delayed::Job.db_time_now + 10.minutes
end
end

context "reschedule" do
before do
@job = Delayed::Job.create :payload_object => SimpleJob.new
end

context "and we want to destroy jobs" do
before do
Delayed::Worker.destroy_failed_jobs = true
end

it "should be destroyed if it failed more than Worker.max_attempts times" do
@job.should_receive(:destroy)
Delayed::Worker.max_attempts.times { @worker.reschedule(@job) }
end

it "should not be destroyed if failed fewer than Worker.max_attempts times" do
@job.should_not_receive(:destroy)
(Delayed::Worker.max_attempts - 1).times { @worker.reschedule(@job) }
end
end

context "and we don't want to destroy jobs" do
before do
Delayed::Worker.destroy_failed_jobs = false
end

it "should be failed if it failed more than Worker.max_attempts times" do
@job.reload.failed_at.should == nil
Delayed::Worker.max_attempts.times { @worker.reschedule(@job) }
@job.reload.failed_at.should_not == nil
end

it "should not be failed if it failed fewer than Worker.max_attempts times" do
(Delayed::Worker.max_attempts - 1).times { @worker.reschedule(@job) }
@job.reload.failed_at.should == nil
end

end
end


end

0 comments on commit 7a5c8f4

Please sign in to comment.