Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Move #run from Job to Worker. Refs #26

  • Loading branch information...
commit 02d561ab5b5c2f244fb1cc996ab183fd94de1fdb 1 parent 86a638b
Brandon Keepers authored December 19, 2009
34  lib/delayed/job.rb
@@ -69,34 +69,6 @@ def reschedule(message, backtrace = [], time = nil)
69 69
       end
70 70
     end
71 71
 
72  
-
73  
-    # Try to lock and run job. Returns true/false (work done/work failed) or nil if job can't be locked.
74  
-    def run_with_lock(max_run_time, worker_name)
75  
-      logger.info "* [JOB] acquiring lock on #{name}"
76  
-      if lock_exclusively!(max_run_time, worker_name)
77  
-        run(max_run_time)
78  
-      else
79  
-        # We did not get the lock, some other worker process must have
80  
-        logger.warn "* [JOB] failed to acquire exclusive lock for #{name}"
81  
-        nil # no work done
82  
-      end
83  
-    end
84  
-
85  
-    # Try to run job. Returns true/false (work done/work failed)
86  
-    def run(max_run_time)
87  
-      runtime =  Benchmark.realtime do
88  
-        Timeout.timeout(max_run_time.to_i) { invoke_job }
89  
-        destroy
90  
-      end
91  
-      # TODO: warn if runtime > max_run_time ?
92  
-      logger.info "* [JOB] #{name} completed after %.4f" % runtime
93  
-      return true  # did work
94  
-    rescue Exception => e
95  
-      reschedule e.message, e.backtrace
96  
-      log_exception(e)
97  
-      return false  # work failed
98  
-    end
99  
-
100 72
     # Add a job to the queue
101 73
     def self.enqueue(*args, &block)
102 74
       object = block_given? ? EvaledJob.new(&block) : args.shift
@@ -149,12 +121,6 @@ def unlock
149 121
       self.locked_by    = nil
150 122
     end
151 123
 
152  
-    # This is a good hook if you need to report job processing errors in additional or different ways
153  
-    def log_exception(error)
154  
-      logger.error "* [JOB] #{name} failed with #{error.class.name}: #{error.message} - #{attempts} failed attempts"
155  
-      logger.error(error)
156  
-    end
157  
-
158 124
     # Moved into its own method so that new_relic can trace it.
159 125
     def invoke_job
160 126
       payload_object.perform
30  lib/delayed/worker.rb
@@ -70,15 +70,33 @@ def say(text, level = Logger::INFO)
70 70
     end
71 71
 
72 72
     protected
  73
+    
  74
+    def run(job)
  75
+      runtime =  Benchmark.realtime do
  76
+        Timeout.timeout(self.class.max_run_time.to_i) { job.invoke_job }
  77
+        job.destroy
  78
+      end
  79
+      # TODO: warn if runtime > max_run_time ?
  80
+      say "* [JOB] #{name} completed after %.4f" % runtime
  81
+      return true  # did work
  82
+    rescue Exception => e
  83
+      handle_failed_job(job, e)
  84
+      return false  # work failed
  85
+    end
  86
+    
  87
+    def handle_failed_job(job, error)
  88
+      job.reschedule error.message, error.backtrace
  89
+      say "* [JOB] #{name} failed with #{error.class.name}: #{error.message} - #{job.attempts} failed attempts", Logger::ERROR
  90
+    end
73 91
 
74 92
     # Run the next job we can get an exclusive lock on.
75 93
     # If no jobs are left we return nil
76  
-    def reserve_and_run_one_job(max_run_time = self.class.max_run_time)
  94
+    def reserve_and_run_one_job
77 95
 
78 96
       # We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next.
79 97
       # this leads to a more even distribution of jobs across the worker processes
80  
-      job = Delayed::Job.find_available(name, 5, max_run_time).detect do |job|
81  
-        if job.lock_exclusively!(max_run_time, name)
  98
+      job = Delayed::Job.find_available(name, 5, self.class.max_run_time).detect do |job|
  99
+        if job.lock_exclusively!(self.class.max_run_time, name)
82 100
           say "* [Worker(#{name})] acquired lock on #{job.name}"
83 101
           true
84 102
         else
@@ -87,11 +105,7 @@ def reserve_and_run_one_job(max_run_time = self.class.max_run_time)
87 105
         end
88 106
       end
89 107
 
90  
-      if job.nil?
91  
-        nil # we didn't do any work, all 5 were not lockable
92  
-      else
93  
-        job.run(max_run_time)
94  
-      end
  108
+      run(job) if job
95 109
     end
96 110
 
97 111
     # Do num jobs and return stats on success/failure.
9  spec/delayed_method_spec.rb
@@ -73,15 +73,8 @@ def read(story)
73 73
   end
74 74
 
75 75
   it "should ignore ActiveRecord::RecordNotFound errors because they are permanent" do
76  
-
77 76
     job = ErrorObject.new.send_later(:throw)
78  
-
79  
-    Delayed::Job.count.should == 1
80  
-
81  
-    job.run_with_lock(Delayed::Worker.max_run_time, 'worker')
82  
-
83  
-    Delayed::Job.count.should == 0
84  
-
  77
+    lambda { job.invoke_job }.should_not raise_error
85 78
   end
86 79
 
87 80
   it "should store the object as string if its an active record" do
69  spec/job_spec.rb
@@ -43,16 +43,6 @@
43 43
     Delayed::Job.first.run_at.should be_close(later, 1)
44 44
   end
45 45
 
46  
-  it "should call perform on jobs when running run_with_lock" do
47  
-    SimpleJob.runs.should == 0
48  
-
49  
-    job = Delayed::Job.enqueue SimpleJob.new
50  
-    job.run_with_lock(Delayed::Worker.max_run_time, 'worker')
51  
-
52  
-    SimpleJob.runs.should == 1
53  
-  end
54  
-                     
55  
-                     
56 46
   it "should work with eval jobs" do
57 47
     $eval_job_ran = false
58 48
 
@@ -61,47 +51,16 @@
61 51
     JOB
62 52
     end
63 53
 
64  
-    job.run_with_lock(Delayed::Worker.max_run_time, 'worker')
  54
+    job.invoke_job
65 55
 
66 56
     $eval_job_ran.should == true
67 57
   end
68 58
                    
69 59
   it "should work with jobs in modules" do
70  
-    M::ModuleJob.runs.should == 0
71  
-
72 60
     job = Delayed::Job.enqueue M::ModuleJob.new
73  
-    job.run_with_lock(Delayed::Worker.max_run_time, 'worker')
74  
-
75  
-    M::ModuleJob.runs.should == 1
  61
+    lambda { job.invoke_job }.should change { M::ModuleJob.runs }.from(0).to(1)
76 62
   end
77 63
                    
78  
-  it "should re-schedule by about 1 second at first and increment this more and more minutes when it fails to execute properly" do
79  
-    job = Delayed::Job.enqueue ErrorJob.new
80  
-    job.run_with_lock(Delayed::Worker.max_run_time, 'worker')
81  
-
82  
-    job = Delayed::Job.find(:first)
83  
-
84  
-    job.last_error.should =~ /did not work/
85  
-    job.last_error.should =~ /sample_jobs.rb:8:in `perform'/
86  
-    job.attempts.should == 1
87  
-
88  
-    job.run_at.should > Delayed::Job.db_time_now - 10.minutes
89  
-    job.run_at.should < Delayed::Job.db_time_now + 10.minutes
90  
-  end
91  
-
92  
-  it "should record last_error when destroy_failed_jobs = false, max_attempts = 1" do
93  
-    Delayed::Job.destroy_failed_jobs = false
94  
-    Delayed::Worker.max_attempts = 1
95  
-    job = Delayed::Job.enqueue ErrorJob.new
96  
-    job.run(1)
97  
-    job.reload
98  
-    job.last_error.should =~ /did not work/
99  
-    job.last_error.should =~ /job_spec.rb/
100  
-    job.attempts.should == 1
101  
-
102  
-    job.failed_at.should_not == nil
103  
-  end
104  
-
105 64
   it "should raise an DeserializationError when the job class is totally unknown" do
106 65
 
107 66
     job = Delayed::Job.new
@@ -183,13 +142,6 @@
183 142
     end
184 143
   end
185 144
   
186  
-  it "should fail after Worker.max_run_time" do
187  
-    @job = Delayed::Job.create :payload_object => LongRunningJob.new
188  
-    @job.run_with_lock(1.second, 'worker')
189  
-    @job.reload.last_error.should =~ /expired/
190  
-    @job.attempts.should == 1
191  
-  end
192  
-
193 145
   it "should never find failed jobs" do
194 146
     @job = Delayed::Job.create :payload_object => SimpleJob.new, :attempts => 50, :failed_at => Delayed::Job.db_time_now
195 147
     Delayed::Job.find_available('worker', 1).length.should == 0
@@ -294,23 +246,6 @@
294 246
    
295 247
   end
296 248
   
297  
-  context "when pulling jobs off the queue for processing, it" do
298  
-    before(:each) do
299  
-      @job = Delayed::Job.create(
300  
-        :payload_object => SimpleJob.new, 
301  
-        :locked_by => 'worker1', 
302  
-        :locked_at => Delayed::Job.db_time_now - 5.minutes)
303  
-    end
304  
-
305  
-    it "should leave the queue in a consistent state and not run the job if locking fails" do
306  
-      SimpleJob.runs.should == 0     
307  
-      @job.stub!(:lock_exclusively!).with(any_args).once.and_return(false)
308  
-      @job.run_with_lock(Delayed::Worker.max_run_time, 'worker')
309  
-      SimpleJob.runs.should == 0
310  
-    end
311  
-  
312  
-  end
313  
-  
314 249
   context "db_time_now" do
315 250
     it "should return time in current time zone if set" do
316 251
       Time.zone = 'Eastern Time (US & Canada)'
106  spec/worker_spec.rb
@@ -5,8 +5,9 @@ def job_create(opts = {})
5 5
     Delayed::Job.create(opts.merge(:payload_object => SimpleJob.new))
6 6
   end
7 7
 
8  
-  before do
9  
-    Delayed::Worker.class_eval('public :work_off')
  8
+  before(:all) do
  9
+    Delayed::Worker.send :public, :work_off
  10
+    Delayed::Worker.send :public, :run
10 11
   end
11 12
 
12 13
   before(:each) do
@@ -16,7 +17,22 @@ def job_create(opts = {})
16 17
     
17 18
     SimpleJob.runs = 0
18 19
   end
19  
-
  20
+  
  21
+  describe "running a job" do
  22
+    it "should fail after Worker.max_run_time" do
  23
+      begin
  24
+        old_max_run_time = Delayed::Worker.max_run_time
  25
+        Delayed::Worker.max_run_time = 1.second
  26
+        @job = Delayed::Job.create :payload_object => LongRunningJob.new
  27
+        @worker.run(@job)
  28
+        @job.reload.last_error.should =~ /expired/
  29
+        @job.attempts.should == 1
  30
+      ensure
  31
+        Delayed::Worker.max_run_time = old_max_run_time
  32
+      end
  33
+    end
  34
+  end
  35
+  
20 36
   context "worker prioritization" do
21 37
     before(:each) do
22 38
       @worker = Delayed::Worker.new(:max_priority => 5, :min_priority => -5, :quiet => true)
@@ -44,55 +60,61 @@ def job_create(opts = {})
44 60
     end
45 61
   end
46 62
 
47  
-  context "while running alongside other workers that locked jobs, it" do
  63
+  context "while running with locked and expired jobs" do
48 64
     before(:each) do
49 65
       @worker.name = 'worker1'
50  
-      job_create(:locked_by => 'worker1', :locked_at => (Delayed::Job.db_time_now - 1.minutes))
51  
-      job_create(:locked_by => 'worker2', :locked_at => (Delayed::Job.db_time_now - 1.minutes))
  66
+    end
  67
+    
  68
+    it "should not run jobs locked by another worker" do
  69
+      job_create(:locked_by => 'other_worker', :locked_at => (Delayed::Job.db_time_now - 1.minutes))
  70
+      lambda { @worker.work_off }.should_not change { SimpleJob.runs }
  71
+    end
  72
+    
  73
+    it "should run open jobs" do
52 74
       job_create
53  
-      job_create(:locked_by => 'worker1', :locked_at => (Delayed::Job.db_time_now - 1.minutes))
  75
+      lambda { @worker.work_off }.should change { SimpleJob.runs }.from(0).to(1)
54 76
     end
55  
-
56  
-    it "should ingore locked jobs from other workers" do
57  
-      @worker.name = 'worker3'
58  
-      SimpleJob.runs.should == 0
59  
-      @worker.work_off
60  
-      SimpleJob.runs.should == 1 # runs the one open job
  77
+    
  78
+    it "should run expired jobs" do
  79
+      expired_time = Delayed::Job.db_time_now - (1.minutes + Delayed::Worker.max_run_time)
  80
+      job_create(:locked_by => 'other_worker', :locked_at => expired_time)
  81
+      lambda { @worker.work_off }.should change { SimpleJob.runs }.from(0).to(1)
61 82
     end
62  
-
63  
-    it "should find our own jobs regardless of locks" do
64  
-      @worker.name = 'worker1'
65  
-      SimpleJob.runs.should == 0
66  
-      @worker.work_off
67  
-      SimpleJob.runs.should == 3 # runs open job plus worker1 jobs that were already locked
  83
+    
  84
+    it "should run own jobs" do
  85
+      job_create(:locked_by => @worker.name, :locked_at => (Delayed::Job.db_time_now - 1.minutes))
  86
+      lambda { @worker.work_off }.should change { SimpleJob.runs }.from(0).to(1)
68 87
     end
69 88
   end
70  
-
71  
-  context "while running with locked and expired jobs, it" do
72  
-    before(:each) do
73  
-      @worker.name = 'worker1'
74  
-      exp_time = Delayed::Job.db_time_now - (1.minutes + Delayed::Worker.max_run_time)
75  
-      job_create(:locked_by => 'worker1', :locked_at => exp_time)
76  
-      job_create(:locked_by => 'worker2', :locked_at => (Delayed::Job.db_time_now - 1.minutes))
77  
-      job_create
78  
-      job_create(:locked_by => 'worker1', :locked_at => (Delayed::Job.db_time_now - 1.minutes))
  89
+  
  90
+  describe "failed jobs" do
  91
+    before do
  92
+      # reset defaults
  93
+      Delayed::Job.destroy_failed_jobs = true
  94
+      Delayed::Worker.max_attempts = 25
  95
+
  96
+      @job = Delayed::Job.enqueue ErrorJob.new
79 97
     end
80 98
 
81  
-    it "should only find unlocked and expired jobs" do
82  
-      @worker.name = 'worker3'
83  
-      SimpleJob.runs.should == 0
84  
-      @worker.work_off
85  
-      SimpleJob.runs.should == 2 # runs the one open job and one expired job
  99
+    it "should record last_error when destroy_failed_jobs = false, max_attempts = 1" do
  100
+      Delayed::Job.destroy_failed_jobs = false
  101
+      Delayed::Worker.max_attempts = 1
  102
+      @worker.run(@job)
  103
+      @job.reload
  104
+      @job.last_error.should =~ /did not work/
  105
+      @job.last_error.should =~ /worker_spec.rb/
  106
+      @job.attempts.should == 1
  107
+      @job.failed_at.should_not be_nil
86 108
     end
87  
-
88  
-    it "should ignore locks when finding our own jobs" do
89  
-      @worker.name = 'worker1'
90  
-      SimpleJob.runs.should == 0
91  
-      @worker.work_off
92  
-      SimpleJob.runs.should == 3 # runs open job plus worker1 jobs
93  
-      # This is useful in the case of a crash/restart on worker1, but make sure multiple workers on the same host have unique names!
  109
+    
  110
+    it "should re-schedule jobs after failing" do
  111
+      @worker.run(@job)
  112
+      @job.reload
  113
+      @job.last_error.should =~ /did not work/
  114
+      @job.last_error.should =~ /sample_jobs.rb:8:in `perform'/
  115
+      @job.attempts.should == 1
  116
+      @job.run_at.should > Delayed::Job.db_time_now - 10.minutes
  117
+      @job.run_at.should < Delayed::Job.db_time_now + 10.minutes
94 118
     end
95  
-
96 119
   end
97  
-
98 120
 end

0 notes on commit 02d561a

Please sign in to comment.
Something went wrong with that request. Please try again.