Skip to content

Commit

Permalink
adding new un-finished specs
Browse files Browse the repository at this point in the history
  • Loading branch information
rares committed Nov 10, 2008
1 parent 55a89af commit 149f45f
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 7 deletions.
10 changes: 5 additions & 5 deletions lib/delayed/job.rb
Expand Up @@ -15,8 +15,8 @@ class Job < ActiveRecord::Base

# Conditions to find tasks that are locked by this process or one that has
# been created before now and is not currently locked.
NextTaskSQL = '(locked_by = ?) OR (run_at <= ? AND (locked_at IS NULL OR locked_at < ?))'
NextTaskOrder = 'priority DESC, run_at ASC'
NextTaskSQL = "(locked_by = ?) OR (run_at <= ? AND (locked_at IS NULL OR locked_at < ?))"
NextTaskOrder = "priority DESC, run_at ASC"
ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/

class LockError < StandardError
Expand Down Expand Up @@ -122,13 +122,13 @@ def self.reserve(max_run_time = 4.hours)
def lock_exclusively!(max_run_time, worker = worker_name)
now = self.class.db_time_now
transaction do
if locked_by != worker
affected_rows = if locked_by != worker
# We don't own this job so we will update the locked_by name and the locked_at
affected_rows = self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?)", id, (now - max_run_time.to_i)])
self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?)", id, (now - max_run_time.to_i)])
else
# We already own this job, this may happen if the job queue crashes.
# Simply resume and update the locked_at
affected_rows = self.class.update_all(["locked_at = ?", now], ["id = ? and (locked_by = ?)", id, worker])
self.class.update_all(["locked_at = ?", now], ["id = ? and (locked_by = ?)", id, worker])
end
raise LockError.new("Attempted to aquire exclusive lock failed") unless affected_rows == 1
end
Expand Down
17 changes: 15 additions & 2 deletions spec/job_spec.rb
Expand Up @@ -189,15 +189,28 @@ def perform; raise 'did not work'; end
Delayed::Job.find_available.length.should == 0
end

# TODO: verify that the datetime fields are not changed (test the Tx work).
it "should leave the queue in a consistent state if failure occurs trying to aquire a lock" do
SimpleJob.runs.should == 0
@job.stub!(:lock_exclusively!).with(:any_args).once.and_raise(Delayed::Job::LockError)
Delayed::Job.should_receive(:find_available).at_least(:once).and_return([@job])
Delayed::Job.should_receive(:find_available).once.and_return([@job])
Delayed::Job.work_off(1)
SimpleJob.runs.should == 0
Delayed::Job.find_available(5).length.should == 1
end

end

context "when clearing the queue of jobs" do
before(:each) do
# create some jobs here.
end

it "should remove only jobs created by this worker"

after(:each) do
# delete
end

end

end

0 comments on commit 149f45f

Please sign in to comment.