Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

fixing spec that was returning a false positive. adding spec for conc…

…urrency.
  • Loading branch information...
commit be23e260759b2a59a97018f3c592184c409e5df2 1 parent 149f45f
@rares rares authored
Showing with 29 additions and 30 deletions.
  1. +10 −11 lib/delayed/job.rb
  2. +19 −19 spec/job_spec.rb
View
21 lib/delayed/job.rb
@@ -15,7 +15,7 @@ class Job < ActiveRecord::Base
# Conditions to find tasks that are locked by this process or one that has
# been created before now and is not currently locked.
- NextTaskSQL = "(locked_by = ?) OR (run_at <= ? AND (locked_at IS NULL OR locked_at < ?))"
+ NextTaskSQL = "(locked_by = ?) or (run_at <= ? and (locked_at is null or locked_at < ?))"
NextTaskOrder = "priority DESC, run_at ASC"
ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/
@@ -121,17 +121,16 @@ def self.reserve(max_run_time = 4.hours)
# to the given job. It will rise a LockError if it cannot get this lock.
def lock_exclusively!(max_run_time, worker = worker_name)
now = self.class.db_time_now
- transaction do
- affected_rows = if locked_by != worker
- # We don't own this job so we will update the locked_by name and the locked_at
- self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?)", id, (now - max_run_time.to_i)])
- else
- # We already own this job, this may happen if the job queue crashes.
- # Simply resume and update the locked_at
- self.class.update_all(["locked_at = ?", now], ["id = ? and (locked_by = ?)", id, worker])
- end
- raise LockError.new("Attempted to aquire exclusive lock failed") unless affected_rows == 1
+ affected_rows = if locked_by != worker
+ # We don't own this job so we will update the locked_by name and the locked_at
+ self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?)", id, (now - max_run_time.to_i)])
+ else
+ # We already own this job, this may happen if the job queue crashes.
+ # Simply resume and update the locked_at
+ self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker])
end
+ raise LockError.new("Attempted to aquire exclusive lock failed") unless affected_rows == 1
+
self.locked_at = now
self.locked_by = worker
end
View
38 spec/job_spec.rb
@@ -104,7 +104,7 @@ def perform; raise 'did not work'; end
@job.reschedule 'FAIL'
end
- describe "when another worker is already performing an task, it" do
+ context "when another worker is already performing an task, it" do
before :each do
Delayed::Job.worker_name = 'worker1'
@@ -113,12 +113,10 @@ def perform; raise 'did not work'; end
it "should not allow a second worker to get exclusive access" do
lambda { @job.lock_exclusively! 4.hours, 'worker2' }.should raise_error(Delayed::Job::LockError)
- end
-
- it "should not allow a second worker to get exclusive access if the timeout has passed" do
-
- @job.lock_exclusively! 1.minute, 'worker2'
-
+ end
+
+ it "should allow a second worker to get exclusive access if the timeout has passed" do
+ lambda { @job.lock_exclusively! 1.minute, 'worker2' }.should_not raise_error(Delayed::Job::LockError)
end
it "should be able to get access to the task if it was started more then max_age ago" do
@@ -131,7 +129,6 @@ def perform; raise 'did not work'; end
@job.locked_at.should > 1.minute.ago
end
-
it "should be able to get exclusive access again when the worker name is the same" do
@job.lock_exclusively! 5.minutes, 'worker1'
@job.lock_exclusively! 5.minutes, 'worker1'
@@ -173,7 +170,7 @@ def perform; raise 'did not work'; end
end
- context "when retreiving jobs from the queue" do
+ context "when pulling jobs off the queue, it" do
before(:each) do
@job = Delayed::Job.create(
:payload_object => SimpleJob.new,
@@ -181,16 +178,15 @@ def perform; raise 'did not work'; end
:locked_at => Delayed::Job.db_time_now - 5.minutes)
end
- it "should process jobs that haven't been processed yet and remove them from the queue" do
+ it "should pull jobs off the queue that haven't been completed yet" do
Delayed::Job.find_available.length.should == 1
SimpleJob.runs.should == 0
Delayed::Job.work_off(1)
SimpleJob.runs.should == 1
Delayed::Job.find_available.length.should == 0
end
-
- # TODO: verify that the datetime fields are not changed (test the Tx work).
- it "should leave the queue in a consistent state if failure occurs trying to aquire a lock" do
+
+ it "should leave the queue in a consistent state if locking fails and not run the job" do
SimpleJob.runs.should == 0
@job.stub!(:lock_exclusively!).with(:any_args).once.and_raise(Delayed::Job::LockError)
Delayed::Job.should_receive(:find_available).once.and_return([@job])
@@ -200,15 +196,19 @@ def perform; raise 'did not work'; end
end
- context "when clearing the queue of jobs" do
+ context "while running alongside other workers with enqueued jobs, it" do
before(:each) do
- # create some jobs here.
+ Delayed::Job.worker_name = 'worker1'
+ Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => 5.hours.ago)
+ Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker2', :locked_at => 3.hours.ago)
+ Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => 2.hours.ago)
end
- it "should remove only jobs created by this worker"
-
- after(:each) do
- # delete
+ it "should remove only jobs created by the current worker" do
+ SimpleJob.runs.should == 0
+ Delayed::Job.work_off(3)
+ SimpleJob.runs.should == 2
+ Delayed::Job.find_available.length.should == 1
end
end
Please sign in to comment.
Something went wrong with that request. Please try again.