Skip to content

Commit

Permalink
more specs around poping jobs off the queue.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rob Ares committed Nov 9, 2008
1 parent 4b9b079 commit 5e8745c
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 28 deletions.
31 changes: 10 additions & 21 deletions lib/delayed/job.rb
Expand Up @@ -84,10 +84,8 @@ def self.find_available(limit = 5)
ActiveRecord::Base.silence do
find(:all, :conditions => conditions, :order => NextTaskOrder, :limit => limit)
end

end


# Get the payload of the next job we can get an exclusive lock on.
# If no jobs are left we return nil
def self.reserve(max_run_time = 4.hours)
Expand All @@ -98,7 +96,7 @@ def self.reserve(max_run_time = 4.hours)
begin
logger.info "* [JOB] aquiring lock on #{job.name}"
job.lock_exclusively!(max_run_time, worker_name)
runtime = Benchmark.realtime do
runtime = Benchmark.realtime do
yield job.payload_object
job.destroy
end
Expand All @@ -123,26 +121,17 @@ def self.reserve(max_run_time = 4.hours)
# to the given job. It will rise a LockError if it cannot get this lock.
def lock_exclusively!(max_run_time, worker = worker_name)
now = self.class.db_time_now

affected_rows = if locked_by != worker

# We don't own this job so we will update the locked_by name and the locked_at
transaction do
Job.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?)", id, (now - max_run_time.to_i)])
transaction do
if locked_by != worker
# We don't own this job so we will update the locked_by name and the locked_at
affected_rows = self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?)", id, (now - max_run_time.to_i)])
else
# We already own this job, this may happen if the job queue crashes.
# Simply resume and update the locked_at
affected_rows = self.class.update_all(["locked_at = ?", now], ["id = ? and (locked_by = ?)", id, worker])
end
else

# We already own this job, this may happen if the job queue crashes.
# Simply resume and update the locked_at
transaction do
Job.update_all(["locked_at = ?", now], ["id = ? and (locked_by = ?)", id, worker])
end
end

unless affected_rows == 1
raise LockError, "Attempted to aquire exclusive lock failed"
raise LockError.new("Attempted to aquire exclusive lock failed") unless affected_rows == 1
end

self.locked_at = now
self.locked_by = worker
end
Expand Down
25 changes: 18 additions & 7 deletions spec/job_spec.rb
Expand Up @@ -173,18 +173,29 @@ def perform; raise 'did not work'; end

end

context "when retreiving jobs" do
context "when retreiving jobs from the queue" do
before(:each) do
@simple_job = SimpleJob.new
@job = Delayed::Job.create :payload_object => @simple_job, :locked_by => 'worker1', :locked_at => Delayed::Job.db_time_now - 5.minutes
@job = Delayed::Job.create(
:payload_object => SimpleJob.new,
:locked_by => 'worker1',
:locked_at => Delayed::Job.db_time_now - 5.minutes)
end

it "should return jobs that haven't been processed yet" do
SimpleJob.runs.should == 0
# Delayed::Job.should_receive(:find_available).once.with(5).and_return([@job])
Delayed::Job.should_receive(:reserve).once.and_yield(@job.payload_object)
it "should process jobs that haven't been processed yet and remove them from the queue" do
Delayed::Job.find_available.length.should == 1
SimpleJob.runs.should == 0
Delayed::Job.work_off(1)
SimpleJob.runs.should == 1
Delayed::Job.find_available.length.should == 0
end

it "should leave the queue in a consistent state if failure occurs trying to aquire a lock" do
SimpleJob.runs.should == 0
@job.stub!(:lock_exclusively!).with(:any_args).once.and_raise(Delayed::Job::LockError)
Delayed::Job.should_receive(:find_available).any_number_of_times.at_least(:once).and_return([@job])
Delayed::Job.work_off(1)
SimpleJob.runs.should == 0
Delayed::Job.find_available(5).length.should == 1
end

end
Expand Down

0 comments on commit 5e8745c

Please sign in to comment.