Permalink
Browse files

more specs around poping jobs off the queue.

  • Loading branch information...
1 parent 4b9b079 commit 5e8745cfddcf418bffad10d3b731415908d98755 @rares rares committed Nov 9, 2008
Showing with 28 additions and 28 deletions.
  1. +10 −21 lib/delayed/job.rb
  2. +18 −7 spec/job_spec.rb
View
@@ -84,10 +84,8 @@ def self.find_available(limit = 5)
ActiveRecord::Base.silence do
find(:all, :conditions => conditions, :order => NextTaskOrder, :limit => limit)
end
-
end
-
# Get the payload of the next job we can get an exclusive lock on.
# If no jobs are left we return nil
def self.reserve(max_run_time = 4.hours)
@@ -98,7 +96,7 @@ def self.reserve(max_run_time = 4.hours)
begin
logger.info "* [JOB] aquiring lock on #{job.name}"
job.lock_exclusively!(max_run_time, worker_name)
- runtime = Benchmark.realtime do
+ runtime = Benchmark.realtime do
yield job.payload_object
job.destroy
end
@@ -123,26 +121,17 @@ def self.reserve(max_run_time = 4.hours)
# to the given job. It will rise a LockError if it cannot get this lock.
def lock_exclusively!(max_run_time, worker = worker_name)
now = self.class.db_time_now
-
- affected_rows = if locked_by != worker
-
- # We don't own this job so we will update the locked_by name and the locked_at
- transaction do
- Job.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?)", id, (now - max_run_time.to_i)])
+ transaction do
+ if locked_by != worker
+ # We don't own this job so we will update the locked_by name and the locked_at
+ affected_rows = self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?)", id, (now - max_run_time.to_i)])
+ else
+ # We already own this job, this may happen if the job queue crashes.
+ # Simply resume and update the locked_at
+ affected_rows = self.class.update_all(["locked_at = ?", now], ["id = ? and (locked_by = ?)", id, worker])
end
- else
-
- # We already own this job, this may happen if the job queue crashes.
- # Simply resume and update the locked_at
- transaction do
- Job.update_all(["locked_at = ?", now], ["id = ? and (locked_by = ?)", id, worker])
- end
- end
-
- unless affected_rows == 1
- raise LockError, "Attempted to aquire exclusive lock failed"
+ raise LockError.new("Attempted to aquire exclusive lock failed") unless affected_rows == 1
end
-
self.locked_at = now
self.locked_by = worker
end
View
@@ -173,18 +173,29 @@ def perform; raise 'did not work'; end
end
- context "when retreiving jobs" do
+ context "when retreiving jobs from the queue" do
before(:each) do
- @simple_job = SimpleJob.new
- @job = Delayed::Job.create :payload_object => @simple_job, :locked_by => 'worker1', :locked_at => Delayed::Job.db_time_now - 5.minutes
+ @job = Delayed::Job.create(
+ :payload_object => SimpleJob.new,
+ :locked_by => 'worker1',
+ :locked_at => Delayed::Job.db_time_now - 5.minutes)
end
- it "should return jobs that haven't been processed yet" do
- SimpleJob.runs.should == 0
- # Delayed::Job.should_receive(:find_available).once.with(5).and_return([@job])
- Delayed::Job.should_receive(:reserve).once.and_yield(@job.payload_object)
+ it "should process jobs that haven't been processed yet and remove them from the queue" do
+ Delayed::Job.find_available.length.should == 1
+ SimpleJob.runs.should == 0
Delayed::Job.work_off(1)
SimpleJob.runs.should == 1
+ Delayed::Job.find_available.length.should == 0
+ end
+
+ it "should leave the queue in a consistent state if failure occurs trying to aquire a lock" do
+ SimpleJob.runs.should == 0
+ @job.stub!(:lock_exclusively!).with(:any_args).once.and_raise(Delayed::Job::LockError)
+ Delayed::Job.should_receive(:find_available).any_number_of_times.at_least(:once).and_return([@job])
+ Delayed::Job.work_off(1)
+ SimpleJob.runs.should == 0
+ Delayed::Job.find_available(5).length.should == 1
end
end

0 comments on commit 5e8745c

Please sign in to comment.