Skip to content
This repository
Browse code

merge

  • Loading branch information...
commit 0c084e5aa09f66830cd504e3d7272c55bf02cc79 2 parents 7931ef1 + 3e5c373
risk danger olson authored October 07, 2008
2  README.textile
Source Rendered
@@ -31,7 +31,7 @@ The library evolves around a delayed_jobs table which looks as follows:
31 31
     table.datetime :locked_at
32 32
     table.string   :locked_by
33 33
     table.timestamps
34  
-  end      
  34
+  end
35 35
   
36 36
 h2. Usage
37 37
 
67  lib/delayed/job.rb
... ...
@@ -1,9 +1,11 @@
  1
+
1 2
 module Delayed
2 3
 
3 4
   class DeserializationError < StandardError
4 5
   end
5 6
 
6 7
   class Job < ActiveRecord::Base
  8
+    MAX_ATTEMPTS = 25
7 9
     set_table_name :delayed_jobs
8 10
 
9 11
     cattr_accessor :worker_name
@@ -22,21 +24,31 @@ def self.clear_locks!
22 24
 
23 25
     def payload_object
24 26
       @payload_object ||= deserialize(self['handler'])
  27
+    end                 
  28
+    
  29
+    def name
  30
+      text = handler.gsub(/\n/, ' ')
  31
+      "#{id} (#{text.length > 40 ? "#{text[0..40]}..." : text})" 
25 32
     end
26 33
 
27 34
     def payload_object=(object)
28 35
       self['handler'] = object.to_yaml
29 36
     end
30 37
 
31  
-    def reshedule(message, time = nil)
32  
-      time ||= Job.db_time_now + (attempts ** 4).seconds + 5
33  
-
34  
-      self.attempts    += 1
35  
-      self.run_at       = time
36  
-      self.last_error   = message
37  
-      self.unlock
38  
-      save!
39  
-    end
  38
+    def reschedule(message, time = nil)
  39
+      if self.attempts < MAX_ATTEMPTS
  40
+        time ||= Job.db_time_now + (attempts ** 4) + 5
  41
+
  42
+        self.attempts    += 1
  43
+        self.run_at       = time
  44
+        self.last_error   = message  
  45
+        self.unlock
  46
+        save!
  47
+      else    
  48
+        logger.info "* [JOB] PERMANENTLY removing #{self.name} because of #{attempts} consequetive failures."
  49
+        destroy
  50
+      end
  51
+    end                                  
40 52
 
41 53
     def self.enqueue(object, priority = 0)
42 54
       unless object.respond_to?(:perform)
@@ -55,21 +67,28 @@ def self.find_available(limit = 5)
55 67
 
56 68
     # Get the payload of the next job we can get an exclusive lock on. 
57 69
     # If no jobs are left we return nil
58  
-    def self.reserve(max_run_time = 4.hours)
59  
-
60  
-      # We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next. 
61  
-      # This leads to a more even distribution of jobs across the worker processes 
62  
-      find_available(5).each do |job|
63  
-        begin
  70
+    def self.reserve(max_run_time = 4.hours)                                                                                 
  71
+                    
  72
+      # We get up to 5 jobs from the db. In face we cannot get exclusive access to a job we try the next. 
  73
+      # this leads to a more even distribution of jobs across the worker processes 
  74
+      find_available(5).each do |job|                       
  75
+        begin                                              
  76
+          logger.info "* [JOB] aquiring lock on #{job.name}"
64 77
           job.lock_exclusively!(max_run_time, worker_name)
65  
-          yield job.payload_object
66  
-          job.destroy
67  
-          return job
  78
+          runtime =  Benchmark.realtime do 
  79
+            yield job.payload_object 
  80
+            job.destroy
  81
+          end
  82
+          logger.info "* [JOB] #{job.name} completed after %.4f" % runtime
  83
+          
  84
+          return job                                     
68 85
         rescue LockError
69 86
           # We did not get the lock, some other worker process must have
70  
-          puts "failed to aquire exclusive lock for #{job.id}"
71  
-        rescue StandardError => e
72  
-          job.reshedule e.message
  87
+          logger.warn "* [JOB] failed to aquire exclusive lock for #{job.name}"
  88
+        rescue StandardError => e 
  89
+          job.reschedule e.message        
  90
+          logger.error "* [JOB] #{job.name} failed with #{e.class.name}: #{e.message} - #{job.attempts} failed attempts"
  91
+          logger.error(e)
73 92
           return job
74 93
         end
75 94
       end
@@ -81,14 +100,14 @@ def self.reserve(max_run_time = 4.hours)
81 100
     # to the given job. It will rise a LockError if it cannot get this lock. 
82 101
     def lock_exclusively!(max_run_time, worker = worker_name)
83 102
       now = self.class.db_time_now
84  
-
85  
-      affected_rows = if locked_by != worker
  103
+      
  104
+      affected_rows = if locked_by != worker 
86 105
 
87 106
         # We don't own this job so we will update the locked_by name and the locked_at
88 107
         connection.update(<<-end_sql, "#{self.class.name} Update to aquire exclusive lock")
89 108
           UPDATE #{self.class.table_name}
90 109
           SET `locked_at`=#{quote_value(now)}, `locked_by`=#{quote_value(worker)} 
91  
-          WHERE #{self.class.primary_key} = #{quote_value(id)} AND (`locked_at` IS NULL OR `locked_at` < #{quote_value(now + max_run_time)})
  110
+          WHERE #{self.class.primary_key} = #{quote_value(id)} AND (`locked_at` IS NULL OR `locked_at` < #{quote_value(now - max_run_time.to_i)})
92 111
         end_sql
93 112
 
94 113
       else
43  spec/job_spec.rb
@@ -34,16 +34,18 @@ def perform; raise 'did not work'; end
34 34
 
35 35
     SimpleJob.runs.should == 1
36 36
   end
37  
-
38  
-  it "should re-schedule by about 1 second at first and increment this more and more minutes when it fails to execute properly" do
  37
+  
  38
+  it "should re-schedule by about 1 second at first and increment this more and more minutes when it fails to execute properly" do            
39 39
     Delayed::Job.enqueue ErrorJob.new
40  
-    runner = Delayed::Job.work_off(1)
  40
+    Delayed::Job.work_off(1)
41 41
 
42 42
     job = Delayed::Job.find(:first)
  43
+        
43 44
     job.last_error.should == 'did not work'
44 45
     job.attempts.should == 1
45  
-    job.run_at.should > Time.now
46  
-    job.run_at.should < Time.now + 6.minutes
  46
+
  47
+    job.run_at.should > Delayed::Job.db_time_now - 10.minutes
  48
+    job.run_at.should < Delayed::Job.db_time_now + 10.minutes
47 49
   end
48 50
 
49 51
   it "should raise an DeserializationError when the job class is totally unknown" do
@@ -82,21 +84,34 @@ def perform; raise 'did not work'; end
82 84
   it "should try include the namespace when loading unknown structs" do
83 85
     job = Delayed::Job.new
84 86
     job['handler'] = "--- !ruby/struct:Delayed::JobThatDoesNotExist {}"
  87
+
85 88
     job.should_receive(:attempt_to_load).with('Delayed::JobThatDoesNotExist').and_return(true)
86 89
     lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
  90
+  end                  
  91
+
  92
+  it "should be removed if it failed more than MAX_ATTEMPTS times" do
  93
+    @job = Delayed::Job.create :payload_object => SimpleJob.new, :attempts => 50
  94
+    @job.should_receive(:destroy)
  95
+    @job.reschedule 'FAIL'
87 96
   end
88 97
 
89 98
   describe  "when another worker is already performing an task, it" do
90 99
 
91 100
     before :each do
92 101
       Delayed::Job.worker_name = 'worker1'
93  
-      @job = Delayed::Job.create :payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => Time.now.utc
  102
+      @job = Delayed::Job.create :payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => Delayed::Job.db_time_now - 5.minutes
94 103
     end
95 104
 
96 105
     it "should not allow a second worker to get exclusive access" do
97 106
       lambda { @job.lock_exclusively! 4.hours, 'worker2' }.should raise_error(Delayed::Job::LockError)
98  
-    end
99  
-
  107
+    end      
  108
+
  109
+    it "should not allow a second worker to get exclusive access if the timeout has passed" do
  110
+      
  111
+      @job.lock_exclusively! 1.minute, 'worker2' 
  112
+      
  113
+    end      
  114
+    
100 115
     it "should be able to get access to the task if it was started more then max_age ago" do
101 116
       @job.locked_at = 5.hours.ago
102 117
       @job.save
@@ -107,11 +122,11 @@ def perform; raise 'did not work'; end
107 122
       @job.locked_at.should > 1.minute.ago
108 123
     end
109 124
 
110  
-    it "should be able to get exclusive access again when the worker name is the same" do
111  
-      @job.lock_exclusively! Time.now + 20, 'worker1'
112  
-      @job.lock_exclusively! Time.now + 21, 'worker1'
113  
-      @job.lock_exclusively! Time.now + 22, 'worker1'
114  
-    end
115  
-  end
116 125
 
  126
+    it "should be able to get exclusive access again when the worker name is the same" do      
  127
+      @job.lock_exclusively! 5.minutes, 'worker1'
  128
+      @job.lock_exclusively! 5.minutes, 'worker1'
  129
+      @job.lock_exclusively! 5.minutes, 'worker1'
  130
+    end                                        
  131
+  end
117 132
 end

0 notes on commit 0c084e5

Please sign in to comment.
Something went wrong with that request. Please try again.