Skip to content

Commit

Permalink
Moved work_off to Delayed::Worker
Browse files Browse the repository at this point in the history
  • Loading branch information
David Genord II authored and bkeepers committed Oct 16, 2009
1 parent 15ec587 commit 31cbb07
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 131 deletions.
20 changes: 0 additions & 20 deletions lib/delayed/job.rb
Expand Up @@ -209,26 +209,6 @@ def log_exception(error)
logger.error(error)
end

# Do num jobs and return stats on success/failure.
# Exit early if interrupted.
def self.work_off(num = 100)
success, failure = 0, 0

num.times do
case self.reserve_and_run_one_job
when true
success += 1
when false
failure += 1
else
break # leave if no work could be done
end
break if $exit # leave if we're exiting
end

return [success, failure]
end

# Moved into its own method so that new_relic can trace it.
def invoke_job
payload_object.perform
Expand Down
22 changes: 21 additions & 1 deletion lib/delayed/worker.rb
Expand Up @@ -27,7 +27,7 @@ def start
result = nil

realtime = Benchmark.realtime do
result = Delayed::Job.work_off
result = work_off
end

count = result.sum
Expand All @@ -47,6 +47,26 @@ def start
Delayed::Job.clear_locks!
end

# Do num jobs and return stats on success/failure.
# Exit early if interrupted.
def work_off(num = 100)
success, failure = 0, 0

num.times do
case Delayed::Job.reserve_and_run_one_job
when true
success += 1
when false
failure += 1
else
break # leave if no work could be done
end
break if $exit # leave if we're exiting
end

return [success, failure]
end

def say(text)
puts text unless @quiet
logger.info text if logger
Expand Down
122 changes: 12 additions & 110 deletions spec/job_spec.rb
@@ -1,26 +1,5 @@
require File.dirname(__FILE__) + '/database'

class SimpleJob
cattr_accessor :runs; self.runs = 0
def perform; @@runs += 1; end
end

class ErrorJob
cattr_accessor :runs; self.runs = 0
def perform; raise 'did not work'; end
end

class LongRunningJob
def perform; sleep 250; end
end

module M
class ModuleJob
cattr_accessor :runs; self.runs = 0
def perform; @@runs += 1; end
end

end
require File.dirname(__FILE__) + '/sample_jobs'

describe Delayed::Job do
before do
Expand Down Expand Up @@ -65,11 +44,11 @@ def perform; @@runs += 1; end
Delayed::Job.first.run_at.should be_close(later, 1)
end

it "should call perform on jobs when running work_off" do
it "should call perform on jobs when running run_with_lock" do
SimpleJob.runs.should == 0

Delayed::Job.enqueue SimpleJob.new
Delayed::Job.work_off
job = Delayed::Job.enqueue SimpleJob.new
job.run_with_lock(Delayed::Job.max_run_time, Delayed::Job.worker_name)

SimpleJob.runs.should == 1
end
Expand All @@ -78,33 +57,33 @@ def perform; @@runs += 1; end
it "should work with eval jobs" do
$eval_job_ran = false

Delayed::Job.enqueue do <<-JOB
job = Delayed::Job.enqueue do <<-JOB
$eval_job_ran = true
JOB
end

Delayed::Job.work_off
job.run_with_lock(Delayed::Job.max_run_time, Delayed::Job.worker_name)

$eval_job_ran.should == true
end

it "should work with jobs in modules" do
M::ModuleJob.runs.should == 0

Delayed::Job.enqueue M::ModuleJob.new
Delayed::Job.work_off
job = Delayed::Job.enqueue M::ModuleJob.new
job.run_with_lock(Delayed::Job.max_run_time, Delayed::Job.worker_name)

M::ModuleJob.runs.should == 1
end

it "should re-schedule by about 1 second at first and increment this more and more minutes when it fails to execute properly" do
Delayed::Job.enqueue ErrorJob.new
Delayed::Job.work_off(1)
job = Delayed::Job.enqueue ErrorJob.new
job.run_with_lock(Delayed::Job.max_run_time, Delayed::Job.worker_name)

job = Delayed::Job.find(:first)

job.last_error.should =~ /did not work/
job.last_error.should =~ /job_spec.rb:10:in `perform'/
job.last_error.should =~ /sample_jobs.rb:8:in `perform'/
job.attempts.should == 1

job.run_at.should > Delayed::Job.db_time_now - 10.minutes
Expand Down Expand Up @@ -292,32 +271,7 @@ def perform; @@runs += 1; end
Delayed::Job.max_priority = nil
Delayed::Job.min_priority = nil
end

it "should only work_off jobs that are >= min_priority" do
Delayed::Job.min_priority = -5
Delayed::Job.max_priority = 5
SimpleJob.runs.should == 0

Delayed::Job.enqueue SimpleJob.new, -10
Delayed::Job.enqueue SimpleJob.new, 0
Delayed::Job.work_off

SimpleJob.runs.should == 1
end

it "should only work_off jobs that are <= max_priority" do
Delayed::Job.min_priority = -5
Delayed::Job.max_priority = 5
SimpleJob.runs.should == 0

Delayed::Job.enqueue SimpleJob.new, 10
Delayed::Job.enqueue SimpleJob.new, 0

Delayed::Job.work_off

SimpleJob.runs.should == 1
end

it "should fetch jobs ordered by priority" do
number_of_jobs = 10
number_of_jobs.times { Delayed::Job.enqueue SimpleJob.new, rand(10) }
Expand Down Expand Up @@ -345,62 +299,10 @@ def perform; @@runs += 1; end
it "should leave the queue in a consistent state and not run the job if locking fails" do
SimpleJob.runs.should == 0
@job.stub!(:lock_exclusively!).with(any_args).once.and_return(false)
Delayed::Job.should_receive(:find_available).once.and_return([@job])
Delayed::Job.work_off(1)
@job.run_with_lock(Delayed::Job.max_run_time, Delayed::Job.worker_name)
SimpleJob.runs.should == 0
end

end

context "while running alongside other workers that locked jobs, it" do
before(:each) do
Delayed::Job.worker_name = 'worker1'
Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => (Delayed::Job.db_time_now - 1.minutes))
Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker2', :locked_at => (Delayed::Job.db_time_now - 1.minutes))
Delayed::Job.create(:payload_object => SimpleJob.new)
Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => (Delayed::Job.db_time_now - 1.minutes))
end

it "should ingore locked jobs from other workers" do
Delayed::Job.worker_name = 'worker3'
SimpleJob.runs.should == 0
Delayed::Job.work_off
SimpleJob.runs.should == 1 # runs the one open job
end

it "should find our own jobs regardless of locks" do
Delayed::Job.worker_name = 'worker1'
SimpleJob.runs.should == 0
Delayed::Job.work_off
SimpleJob.runs.should == 3 # runs open job plus worker1 jobs that were already locked
end
end

context "while running with locked and expired jobs, it" do
before(:each) do
Delayed::Job.worker_name = 'worker1'
exp_time = Delayed::Job.db_time_now - (1.minutes + Delayed::Job::max_run_time)
Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => exp_time)
Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker2', :locked_at => (Delayed::Job.db_time_now - 1.minutes))
Delayed::Job.create(:payload_object => SimpleJob.new)
Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => (Delayed::Job.db_time_now - 1.minutes))
end

it "should only find unlocked and expired jobs" do
Delayed::Job.worker_name = 'worker3'
SimpleJob.runs.should == 0
Delayed::Job.work_off
SimpleJob.runs.should == 2 # runs the one open job and one expired job
end

it "should ignore locks when finding our own jobs" do
Delayed::Job.worker_name = 'worker1'
SimpleJob.runs.should == 0
Delayed::Job.work_off
SimpleJob.runs.should == 3 # runs open job plus worker1 jobs
# This is useful in the case of a crash/restart on worker1, but make sure multiple workers on the same host have unique names!
end

end

end
21 changes: 21 additions & 0 deletions spec/sample_jobs.rb
@@ -0,0 +1,21 @@
class SimpleJob
cattr_accessor :runs; self.runs = 0
def perform; @@runs += 1; end
end

class ErrorJob
cattr_accessor :runs; self.runs = 0
def perform; raise 'did not work'; end
end

class LongRunningJob
def perform; sleep 250; end
end

module M
class ModuleJob
cattr_accessor :runs; self.runs = 0
def perform; @@runs += 1; end
end

end
95 changes: 95 additions & 0 deletions spec/worker_spec.rb
@@ -0,0 +1,95 @@
require File.dirname(__FILE__) + '/database'
require File.dirname(__FILE__) + '/sample_jobs'

describe Delayed::Worker do
def job_create(opts = {})
Delayed::Job.create(opts.merge(:payload_object => SimpleJob.new))
end

before(:each) do
@worker = Delayed::Worker.new(:max_priority => nil, :min_priority => nil)

Delayed::Job.delete_all

SimpleJob.runs = 0
end

context "worker prioritization" do
before(:each) do
@worker = Delayed::Worker.new(:max_priority => 5, :min_priority => -5)
end

it "should only work_off jobs that are >= min_priority" do
SimpleJob.runs.should == 0

job_create(:priority => -10)
job_create(:priority => 0)
@worker.work_off

SimpleJob.runs.should == 1
end

it "should only work_off jobs that are <= max_priority" do
SimpleJob.runs.should == 0

job_create(:priority => 10)
job_create(:priority => 0)

@worker.work_off

SimpleJob.runs.should == 1
end
end

context "while running alongside other workers that locked jobs, it" do
before(:each) do
Delayed::Job.worker_name = 'worker1'
job_create(:locked_by => 'worker1', :locked_at => (Delayed::Job.db_time_now - 1.minutes))
job_create(:locked_by => 'worker2', :locked_at => (Delayed::Job.db_time_now - 1.minutes))
job_create
job_create(:locked_by => 'worker1', :locked_at => (Delayed::Job.db_time_now - 1.minutes))
end

it "should ingore locked jobs from other workers" do
Delayed::Job.worker_name = 'worker3'
SimpleJob.runs.should == 0
@worker.work_off
SimpleJob.runs.should == 1 # runs the one open job
end

it "should find our own jobs regardless of locks" do
Delayed::Job.worker_name = 'worker1'
SimpleJob.runs.should == 0
@worker.work_off
SimpleJob.runs.should == 3 # runs open job plus worker1 jobs that were already locked
end
end

context "while running with locked and expired jobs, it" do
before(:each) do
Delayed::Job.worker_name = 'worker1'
exp_time = Delayed::Job.db_time_now - (1.minutes + Delayed::Job::max_run_time)
job_create(:locked_by => 'worker1', :locked_at => exp_time)
job_create(:locked_by => 'worker2', :locked_at => (Delayed::Job.db_time_now - 1.minutes))
job_create
job_create(:locked_by => 'worker1', :locked_at => (Delayed::Job.db_time_now - 1.minutes))
end

it "should only find unlocked and expired jobs" do
Delayed::Job.worker_name = 'worker3'
SimpleJob.runs.should == 0
@worker.work_off
SimpleJob.runs.should == 2 # runs the one open job and one expired job
end

it "should ignore locks when finding our own jobs" do
Delayed::Job.worker_name = 'worker1'
SimpleJob.runs.should == 0
@worker.work_off
SimpleJob.runs.should == 3 # runs open job plus worker1 jobs
# This is useful in the case of a crash/restart on worker1, but make sure multiple workers on the same host have unique names!
end

end

end

0 comments on commit 31cbb07

Please sign in to comment.