Skip to content
This repository
Browse code

more specs around poping jobs off the queue.

  • Loading branch information...
commit 5e8745cfddcf418bffad10d3b731415908d98755 1 parent 4b9b079
Rob Ares authored November 09, 2008
31  lib/delayed/job.rb
@@ -84,10 +84,8 @@ def self.find_available(limit = 5)
84 84
       ActiveRecord::Base.silence do
85 85
         find(:all, :conditions => conditions, :order => NextTaskOrder, :limit => limit)
86 86
       end
87  
-      
88 87
     end                                    
89 88
         
90  
-
91 89
     # Get the payload of the next job we can get an exclusive lock on. 
92 90
     # If no jobs are left we return nil
93 91
     def self.reserve(max_run_time = 4.hours)                                                                                 
@@ -98,7 +96,7 @@ def self.reserve(max_run_time = 4.hours)
98 96
         begin                                              
99 97
           logger.info "* [JOB] aquiring lock on #{job.name}"
100 98
           job.lock_exclusively!(max_run_time, worker_name)
101  
-          runtime =  Benchmark.realtime do 
  99
+          runtime = Benchmark.realtime do 
102 100
             yield job.payload_object 
103 101
             job.destroy
104 102
           end
@@ -123,26 +121,17 @@ def self.reserve(max_run_time = 4.hours)
123 121
     # to the given job. It will rise a LockError if it cannot get this lock. 
124 122
     def lock_exclusively!(max_run_time, worker = worker_name)
125 123
       now = self.class.db_time_now
126  
-      
127  
-      affected_rows = if locked_by != worker 
128  
-
129  
-        # We don't own this job so we will update the locked_by name and the locked_at
130  
-        transaction do
131  
-          Job.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?)", id, (now - max_run_time.to_i)])
  124
+      transaction do
  125
+        if locked_by != worker
  126
+          # We don't own this job so we will update the locked_by name and the locked_at
  127
+          affected_rows = self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?)", id, (now - max_run_time.to_i)])
  128
+        else
  129
+          # We already own this job, this may happen if the job queue crashes. 
  130
+          # Simply resume and update the locked_at
  131
+          affected_rows = self.class.update_all(["locked_at = ?", now], ["id = ? and (locked_by = ?)", id, worker])
132 132
         end
133  
-      else
134  
-
135  
-        # We already own this job, this may happen if the job queue crashes. 
136  
-        # Simply resume and update the locked_at
137  
-        transaction do
138  
-          Job.update_all(["locked_at = ?", now], ["id = ? and (locked_by = ?)", id, worker])
139  
-        end
140  
-      end
141  
-
142  
-      unless affected_rows == 1
143  
-        raise LockError, "Attempted to aquire exclusive lock failed"
  133
+        raise LockError.new("Attempted to aquire exclusive lock failed") unless affected_rows == 1
144 134
       end
145  
-
146 135
       self.locked_at    = now
147 136
       self.locked_by    = worker
148 137
     end
25  spec/job_spec.rb
@@ -173,18 +173,29 @@ def perform; raise 'did not work'; end
173 173
    
174 174
   end
175 175
   
176  
-  context "when retreiving jobs" do
  176
+  context "when retreiving jobs from the queue" do
177 177
     before(:each) do
178  
-      @simple_job = SimpleJob.new
179  
-      @job = Delayed::Job.create :payload_object => @simple_job, :locked_by => 'worker1', :locked_at => Delayed::Job.db_time_now - 5.minutes
  178
+      @job = Delayed::Job.create(
  179
+        :payload_object => SimpleJob.new, 
  180
+        :locked_by => 'worker1', 
  181
+        :locked_at => Delayed::Job.db_time_now - 5.minutes)
180 182
     end
181 183
   
182  
-    it "should return jobs that haven't been processed yet" do
183  
-      SimpleJob.runs.should == 0
184  
-      # Delayed::Job.should_receive(:find_available).once.with(5).and_return([@job])
185  
-      Delayed::Job.should_receive(:reserve).once.and_yield(@job.payload_object)            
  184
+    it "should process jobs that haven't been processed yet and remove them from the queue" do
  185
+      Delayed::Job.find_available.length.should == 1
  186
+      SimpleJob.runs.should == 0            
186 187
       Delayed::Job.work_off(1)
187 188
       SimpleJob.runs.should == 1
  189
+      Delayed::Job.find_available.length.should == 0
  190
+    end
  191
+    
  192
+    it "should leave the queue in a consistent state if failure occurs trying to aquire a lock" do
  193
+      SimpleJob.runs.should == 0     
  194
+      @job.stub!(:lock_exclusively!).with(:any_args).once.and_raise(Delayed::Job::LockError)
  195
+      Delayed::Job.should_receive(:find_available).any_number_of_times.at_least(:once).and_return([@job])
  196
+      Delayed::Job.work_off(1)
  197
+      SimpleJob.runs.should == 0
  198
+      Delayed::Job.find_available(5).length.should == 1
188 199
     end
189 200
   
190 201
   end

0 notes on commit 5e8745c

Please sign in to comment.
Something went wrong with that request. Please try again.