Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Move #reschedule from Job to Worker. Refs #26

  • Loading branch information...
commit 7a5c8f4483cc21d1b1ddacda11e64f55acd03832 1 parent 02d561a
Brandon Keepers bkeepers authored
12 README.textile
Source Rendered
@@ -134,18 +134,16 @@ The default Worker.max_run_time is 4.hours. If your job takes longer than that,
134 134 make sure your job doesn't exceed this time. You should set this to the longest time you think the job could take.
135 135
136 136 By default, it will delete failed jobs (and it always deletes successful jobs). If you want to keep failed jobs, set
137   -Delayed::Job.destroy_failed_jobs = false. The failed jobs will be marked with non-null failed_at.
  137 +Delayed::Worker.destroy_failed_jobs = false. The failed jobs will be marked with non-null failed_at.
138 138
139 139 Here is an example of changing job parameters in Rails:
140 140
141 141 <pre>
142 142 # config/initializers/delayed_job_config.rb
143   -Delayed::Job.destroy_failed_jobs = false
144   -silence_warnings do
145   - Delayed::Worker.sleep_delay = 60
146   - Delayed::Worker.max_attempts = 3
147   - Delayed::Worker.max_run_time = 5.minutes
148   -end
  143 +Delayed::Worker.destroy_failed_jobs = false
  144 +Delayed::Worker.sleep_delay = 60
  145 +Delayed::Worker.max_attempts = 3
  146 +Delayed::Worker.max_run_time = 5.minutes
149 147 </pre>
150 148
151 149 h3. Cleaning up
23 lib/delayed/job.rb
@@ -10,12 +10,6 @@ class DeserializationError < StandardError
10 10 class Job < ActiveRecord::Base
11 11 set_table_name :delayed_jobs
12 12
13   - # By default failed jobs are destroyed after too many attempts.
14   - # If you want to keep them around (perhaps to inspect the reason
15   - # for the failure), set this to false.
16   - cattr_accessor :destroy_failed_jobs
17   - self.destroy_failed_jobs = true
18   -
19 13 named_scope :ready_to_run, lambda {|worker_name, max_run_time|
20 14 {:conditions => ['(run_at <= ? AND (locked_at IS NULL OR locked_at < ?) OR locked_by = ?) AND failed_at IS NULL', db_time_now, db_time_now - max_run_time, worker_name]}
21 15 }
@@ -52,23 +46,6 @@ def payload_object=(object)
52 46 self['handler'] = object.to_yaml
53 47 end
54 48
55   - # Reschedule the job in the future (when a job fails).
56   - # Uses an exponential scale depending on the number of failed attempts.
57   - def reschedule(message, backtrace = [], time = nil)
58   - self.last_error = message + "\n" + backtrace.join("\n")
59   -
60   - if (self.attempts += 1) < Worker.max_attempts
61   - time ||= Job.db_time_now + (attempts ** 4) + 5
62   -
63   - self.run_at = time
64   - self.unlock
65   - save!
66   - else
67   - logger.info "* [JOB] PERMANENTLY removing #{self.name} because of #{attempts} consecutive failures."
68   - destroy_failed_jobs ? destroy : update_attribute(:failed_at, Delayed::Job.db_time_now)
69   - end
70   - end
71   -
72 49 # Add a job to the queue
73 50 def self.enqueue(*args, &block)
74 51 object = block_given? ? EvaledJob.new(&block) : args.shift
57 lib/delayed/worker.rb
@@ -5,6 +5,11 @@ class Worker
5 5 self.max_attempts = 25
6 6 self.max_run_time = 4.hours
7 7
  8 + # By default failed jobs are destroyed after too many attempts. If you want to keep them around
  9 + # (perhaps to inspect the reason for the failure), set this to false.
  10 + cattr_accessor :destroy_failed_jobs
  11 + self.destroy_failed_jobs = true
  12 +
8 13 self.logger = if defined?(Merb::Logger)
9 14 Merb.logger
10 15 elsif defined?(RAILS_DEFAULT_LOGGER)
@@ -14,9 +19,16 @@ class Worker
14 19 # name_prefix is ignored if name is set directly
15 20 attr_accessor :name_prefix
16 21
17   - # Every worker has a unique name which by default is the pid of the process.
18   - # There are some advantages to overriding this with something which survives worker retarts:
19   - # Workers can safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.
  22 + def initialize(options={})
  23 + @quiet = options[:quiet]
  24 + self.class.min_priority = options[:min_priority] if options.has_key?(:min_priority)
  25 + self.class.max_priority = options[:max_priority] if options.has_key?(:max_priority)
  26 + end
  27 +
  28 + # Every worker has a unique name which by default is the pid of the process. There are some
  29 + # advantages to overriding this with something which survives worker retarts: Workers can#
  30 + # safely resume working on tasks which are locked by themselves. The worker will assume that
  31 + # it crashed before.
20 32 def name
21 33 return @name unless @name.nil?
22 34 "#{@name_prefix}host:#{Socket.gethostname} pid:#{Process.pid}" rescue "#{@name_prefix}pid:#{Process.pid}"
@@ -28,12 +40,6 @@ def name=(val)
28 40 @name = val
29 41 end
30 42
31   - def initialize(options={})
32   - @quiet = options[:quiet]
33   - self.class.min_priority = options[:min_priority] if options.has_key?(:min_priority)
34   - self.class.max_priority = options[:max_priority] if options.has_key?(:max_priority)
35   - end
36   -
37 43 def start
38 44 say "*** Starting job worker #{name}"
39 45
@@ -63,13 +69,6 @@ def start
63 69 ensure
64 70 Delayed::Job.clear_locks!(name)
65 71 end
66   -
67   - def say(text, level = Logger::INFO)
68   - puts text unless @quiet
69   - logger.add level, text if logger
70   - end
71   -
72   - protected
73 72
74 73 def run(job)
75 74 runtime = Benchmark.realtime do
@@ -84,11 +83,33 @@ def run(job)
84 83 return false # work failed
85 84 end
86 85
  86 + # Reschedule the job in the future (when a job fails).
  87 + # Uses an exponential scale depending on the number of failed attempts.
  88 + def reschedule(job, time = nil)
  89 + if (job.attempts += 1) < self.class.max_attempts
  90 + time ||= Job.db_time_now + (job.attempts ** 4) + 5
  91 + job.run_at = time
  92 + job.unlock
  93 + job.save!
  94 + else
  95 + say "* [JOB] PERMANENTLY removing #{job.name} because of #{job.attempts} consecutive failures.", Logger::INFO
  96 + self.class.destroy_failed_jobs ? job.destroy : job.update_attribute(:failed_at, Delayed::Job.db_time_now)
  97 + end
  98 + end
  99 +
  100 + def say(text, level = Logger::INFO)
  101 + puts text unless @quiet
  102 + logger.add level, text if logger
  103 + end
  104 +
  105 + protected
  106 +
87 107 def handle_failed_job(job, error)
88   - job.reschedule error.message, error.backtrace
  108 + job.last_error = error.message + "\n" + error.backtrace.join("\n")
89 109 say "* [JOB] #{name} failed with #{error.class.name}: #{error.message} - #{job.attempts} failed attempts", Logger::ERROR
  110 + reschedule(job)
90 111 end
91   -
  112 +
92 113 # Run the next job we can get an exclusive lock on.
93 114 # If no jobs are left we return nil
94 115 def reserve_and_run_one_job
40 spec/job_spec.rb
@@ -102,46 +102,6 @@
102 102 lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
103 103 end
104 104
105   - context "reschedule" do
106   - before do
107   - @job = Delayed::Job.create :payload_object => SimpleJob.new
108   - end
109   -
110   - context "and we want to destroy jobs" do
111   - before do
112   - Delayed::Job.destroy_failed_jobs = true
113   - end
114   -
115   - it "should be destroyed if it failed more than Worker.max_attempts times" do
116   - @job.should_receive(:destroy)
117   - Delayed::Worker.max_attempts.times { @job.reschedule 'FAIL' }
118   - end
119   -
120   - it "should not be destroyed if failed fewer than Worker.max_attempts times" do
121   - @job.should_not_receive(:destroy)
122   - (Delayed::Worker.max_attempts - 1).times { @job.reschedule 'FAIL' }
123   - end
124   - end
125   -
126   - context "and we don't want to destroy jobs" do
127   - before do
128   - Delayed::Job.destroy_failed_jobs = false
129   - end
130   -
131   - it "should be failed if it failed more than Worker.max_attempts times" do
132   - @job.reload.failed_at.should == nil
133   - Delayed::Worker.max_attempts.times { @job.reschedule 'FAIL' }
134   - @job.reload.failed_at.should_not == nil
135   - end
136   -
137   - it "should not be failed if it failed fewer than Worker.max_attempts times" do
138   - (Delayed::Worker.max_attempts - 1).times { @job.reschedule 'FAIL' }
139   - @job.reload.failed_at.should == nil
140   - end
141   -
142   - end
143   - end
144   -
145 105 it "should never find failed jobs" do
146 106 @job = Delayed::Job.create :payload_object => SimpleJob.new, :attempts => 50, :failed_at => Delayed::Job.db_time_now
147 107 Delayed::Job.find_available('worker', 1).length.should == 0
47 spec/worker_spec.rb
@@ -7,7 +7,6 @@ def job_create(opts = {})
7 7
8 8 before(:all) do
9 9 Delayed::Worker.send :public, :work_off
10   - Delayed::Worker.send :public, :run
11 10 end
12 11
13 12 before(:each) do
@@ -90,14 +89,14 @@ def job_create(opts = {})
90 89 describe "failed jobs" do
91 90 before do
92 91 # reset defaults
93   - Delayed::Job.destroy_failed_jobs = true
  92 + Delayed::Worker.destroy_failed_jobs = true
94 93 Delayed::Worker.max_attempts = 25
95 94
96 95 @job = Delayed::Job.enqueue ErrorJob.new
97 96 end
98 97
99 98 it "should record last_error when destroy_failed_jobs = false, max_attempts = 1" do
100   - Delayed::Job.destroy_failed_jobs = false
  99 + Delayed::Worker.destroy_failed_jobs = false
101 100 Delayed::Worker.max_attempts = 1
102 101 @worker.run(@job)
103 102 @job.reload
@@ -117,4 +116,46 @@ def job_create(opts = {})
117 116 @job.run_at.should < Delayed::Job.db_time_now + 10.minutes
118 117 end
119 118 end
  119 +
  120 + context "reschedule" do
  121 + before do
  122 + @job = Delayed::Job.create :payload_object => SimpleJob.new
  123 + end
  124 +
  125 + context "and we want to destroy jobs" do
  126 + before do
  127 + Delayed::Worker.destroy_failed_jobs = true
  128 + end
  129 +
  130 + it "should be destroyed if it failed more than Worker.max_attempts times" do
  131 + @job.should_receive(:destroy)
  132 + Delayed::Worker.max_attempts.times { @worker.reschedule(@job) }
  133 + end
  134 +
  135 + it "should not be destroyed if failed fewer than Worker.max_attempts times" do
  136 + @job.should_not_receive(:destroy)
  137 + (Delayed::Worker.max_attempts - 1).times { @worker.reschedule(@job) }
  138 + end
  139 + end
  140 +
  141 + context "and we don't want to destroy jobs" do
  142 + before do
  143 + Delayed::Worker.destroy_failed_jobs = false
  144 + end
  145 +
  146 + it "should be failed if it failed more than Worker.max_attempts times" do
  147 + @job.reload.failed_at.should == nil
  148 + Delayed::Worker.max_attempts.times { @worker.reschedule(@job) }
  149 + @job.reload.failed_at.should_not == nil
  150 + end
  151 +
  152 + it "should not be failed if it failed fewer than Worker.max_attempts times" do
  153 + (Delayed::Worker.max_attempts - 1).times { @worker.reschedule(@job) }
  154 + @job.reload.failed_at.should == nil
  155 + end
  156 +
  157 + end
  158 + end
  159 +
  160 +
120 161 end

0 comments on commit 7a5c8f4

Please sign in to comment.
Something went wrong with that request. Please try again.