Skip to content

Commit

Permalink
Moved worker_name to Delayed::Worker
Browse files Browse the repository at this point in the history
  • Loading branch information
David Genord II authored and bkeepers committed Oct 16, 2009
1 parent 15a8f50 commit 747fb3d
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 58 deletions.
6 changes: 3 additions & 3 deletions lib/delayed/command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ def run(worker_name = nil)
Delayed::Worker.logger = Rails.logger
ActiveRecord::Base.connection.reconnect!

Delayed::Job.worker_name = "#{worker_name} #{Delayed::Job.worker_name}"

Delayed::Worker.new(@options).start
worker = Delayed::Worker.new(@options)
worker.name = "#{worker_name} #{worker.name}"
worker.start
rescue => e
Rails.logger.fatal e
STDERR.puts e.message
Expand Down
28 changes: 3 additions & 25 deletions lib/delayed/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,6 @@ class Job < ActiveRecord::Base
cattr_accessor :destroy_failed_jobs
self.destroy_failed_jobs = true

# Every worker has a unique name which by default is the pid of the process.
# There are some advantages to overriding this with something which survives worker retarts:
# Workers can safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.
@@worker_name = nil

def self.worker_name
return @@worker_name unless @@worker_name.nil?
"host:#{Socket.gethostname} pid:#{Process.pid}" rescue "pid:#{Process.pid}"
end

def self.worker_name=(val)
@@worker_name = val
end

def worker_name
self.class.worker_name
end

def worker_name=(val)
@@worker_name = val
end

NextTaskSQL = '(run_at <= ? AND (locked_at IS NULL OR locked_at < ?) OR (locked_by = ?)) AND failed_at IS NULL'
NextTaskOrder = 'priority DESC, run_at ASC'

Expand All @@ -53,7 +31,7 @@ def worker_name=(val)
self.max_priority = nil

# When a worker is exiting, make sure we don't have any locked jobs.
def self.clear_locks!
def self.clear_locks!(worker_name)
update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name])
end

Expand Down Expand Up @@ -140,7 +118,7 @@ def self.enqueue(*args, &block)
end

# Find a few candidate jobs to run (in case some immediately get locked by others).
def self.find_available(limit = 5, max_run_time = max_run_time)
def self.find_available(worker_name, limit = 5, max_run_time = max_run_time)

time_now = db_time_now

Expand All @@ -167,7 +145,7 @@ def self.find_available(limit = 5, max_run_time = max_run_time)

# Lock this job for this worker.
# Returns true if we have the lock, false otherwise.
def lock_exclusively!(max_run_time, worker = worker_name)
def lock_exclusively!(max_run_time, worker)
now = self.class.db_time_now
affected_rows = if locked_by != worker
# We don't own this job so we will update the locked_by name and the locked_at
Expand Down
16 changes: 12 additions & 4 deletions lib/delayed/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,16 @@ def job_max_run_time
Delayed::Job.max_run_time
end

# Every worker has a unique name which by default is the pid of the process.
# There are some advantages to overriding this with something which survives worker retarts:
# Workers can safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.
def name
Delayed::Job.worker_name
return @name unless @name.nil?
"host:#{Socket.gethostname} pid:#{Process.pid}" rescue "pid:#{Process.pid}"
end

def name=(val)
@name = val
end

def initialize(options={})
Expand All @@ -26,7 +34,7 @@ def initialize(options={})
end

def start
say "*** Starting job worker #{Delayed::Job.worker_name}"
say "*** Starting job worker #{name}"

trap('TERM') { say 'Exiting...'; $exit = true }
trap('INT') { say 'Exiting...'; $exit = true }
Expand All @@ -52,7 +60,7 @@ def start
end

ensure
Delayed::Job.clear_locks!
Delayed::Job.clear_locks!(name)
end

def say(text)
Expand All @@ -68,7 +76,7 @@ def reserve_and_run_one_job(max_run_time = job_max_run_time)

# We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next.
# this leads to a more even distribution of jobs across the worker processes
Delayed::Job.find_available(5, max_run_time).each do |job|
Delayed::Job.find_available(name, 5, max_run_time).each do |job|
t = job.run_with_lock(max_run_time, name)
return t unless t == nil # return if we did work (good or bad)
end
Expand Down
2 changes: 1 addition & 1 deletion spec/delayed_method_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def read(story)

Delayed::Job.count.should == 1

job.run_with_lock(Delayed::Job.max_run_time, Delayed::Job.worker_name)
job.run_with_lock(Delayed::Job.max_run_time, 'worker')

Delayed::Job.count.should == 0

Expand Down
32 changes: 13 additions & 19 deletions spec/job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
SimpleJob.runs.should == 0

job = Delayed::Job.enqueue SimpleJob.new
job.run_with_lock(Delayed::Job.max_run_time, Delayed::Job.worker_name)
job.run_with_lock(Delayed::Job.max_run_time, 'worker')

SimpleJob.runs.should == 1
end
Expand All @@ -62,7 +62,7 @@
JOB
end

job.run_with_lock(Delayed::Job.max_run_time, Delayed::Job.worker_name)
job.run_with_lock(Delayed::Job.max_run_time, 'worker')

$eval_job_ran.should == true
end
Expand All @@ -71,14 +71,14 @@
M::ModuleJob.runs.should == 0

job = Delayed::Job.enqueue M::ModuleJob.new
job.run_with_lock(Delayed::Job.max_run_time, Delayed::Job.worker_name)
job.run_with_lock(Delayed::Job.max_run_time, 'worker')

M::ModuleJob.runs.should == 1
end

it "should re-schedule by about 1 second at first and increment this more and more minutes when it fails to execute properly" do
job = Delayed::Job.enqueue ErrorJob.new
job.run_with_lock(Delayed::Job.max_run_time, Delayed::Job.worker_name)
job.run_with_lock(Delayed::Job.max_run_time, 'worker')

job = Delayed::Job.find(:first)

Expand Down Expand Up @@ -173,20 +173,19 @@

it "should fail after Job::max_run_time" do
@job = Delayed::Job.create :payload_object => LongRunningJob.new
@job.run_with_lock(1.second, Delayed::Job.worker_name)
@job.run_with_lock(1.second, 'worker')
@job.reload.last_error.should =~ /expired/
@job.attempts.should == 1
end

it "should never find failed jobs" do
@job = Delayed::Job.create :payload_object => SimpleJob.new, :attempts => 50, :failed_at => Delayed::Job.db_time_now
Delayed::Job.find_available(1).length.should == 0
Delayed::Job.find_available('worker', 1).length.should == 0
end

context "when another worker is already performing an task, it" do

before :each do
Delayed::Job.worker_name = 'worker1'
@job = Delayed::Job.create :payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => Delayed::Job.db_time_now - 5.minutes
end

Expand All @@ -209,28 +208,23 @@
end

it "should not be found by another worker" do
Delayed::Job.worker_name = 'worker2'

Delayed::Job.find_available(1, 6.minutes).length.should == 0
Delayed::Job.find_available('worker2', 1, 6.minutes).length.should == 0
end

it "should be found by another worker if the time has expired" do
Delayed::Job.worker_name = 'worker2'

Delayed::Job.find_available(1, 4.minutes).length.should == 1
Delayed::Job.find_available('worker2', 1, 4.minutes).length.should == 1
end

it "should be able to get exclusive access again when the worker name is the same" do
@job.lock_exclusively! 5.minutes, 'worker1'
@job.lock_exclusively! 5.minutes, 'worker1'
@job.lock_exclusively! 5.minutes, 'worker1'
@job.lock_exclusively!(5.minutes, 'worker1').should be_true
@job.lock_exclusively!(5.minutes, 'worker1').should be_true
@job.lock_exclusively!(5.minutes, 'worker1').should be_true
end
end

context "when another worker has worked on a task since the job was found to be available, it" do

before :each do
Delayed::Job.worker_name = 'worker1'
@job = Delayed::Job.create :payload_object => SimpleJob.new
@job_copy_for_worker_2 = Delayed::Job.find(@job.id)
end
Expand Down Expand Up @@ -275,7 +269,7 @@
it "should fetch jobs ordered by priority" do
number_of_jobs = 10
number_of_jobs.times { Delayed::Job.enqueue SimpleJob.new, rand(10) }
jobs = Delayed::Job.find_available(10)
jobs = Delayed::Job.find_available('worker', 10)
ordered = true
jobs[1..-1].each_index{ |i|
if (jobs[i].priority < jobs[i+1].priority)
Expand All @@ -299,7 +293,7 @@
it "should leave the queue in a consistent state and not run the job if locking fails" do
SimpleJob.runs.should == 0
@job.stub!(:lock_exclusively!).with(any_args).once.and_return(false)
@job.run_with_lock(Delayed::Job.max_run_time, Delayed::Job.worker_name)
@job.run_with_lock(Delayed::Job.max_run_time, 'worker')
SimpleJob.runs.should == 0
end

Expand Down
12 changes: 6 additions & 6 deletions spec/worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,22 @@ def job_create(opts = {})

context "while running alongside other workers that locked jobs, it" do
before(:each) do
Delayed::Job.worker_name = 'worker1'
@worker.name = 'worker1'
job_create(:locked_by => 'worker1', :locked_at => (Delayed::Job.db_time_now - 1.minutes))
job_create(:locked_by => 'worker2', :locked_at => (Delayed::Job.db_time_now - 1.minutes))
job_create
job_create(:locked_by => 'worker1', :locked_at => (Delayed::Job.db_time_now - 1.minutes))
end

it "should ingore locked jobs from other workers" do
Delayed::Job.worker_name = 'worker3'
@worker.name = 'worker3'
SimpleJob.runs.should == 0
@worker.work_off
SimpleJob.runs.should == 1 # runs the one open job
end

it "should find our own jobs regardless of locks" do
Delayed::Job.worker_name = 'worker1'
@worker.name = 'worker1'
SimpleJob.runs.should == 0
@worker.work_off
SimpleJob.runs.should == 3 # runs open job plus worker1 jobs that were already locked
Expand All @@ -71,7 +71,7 @@ def job_create(opts = {})

context "while running with locked and expired jobs, it" do
before(:each) do
Delayed::Job.worker_name = 'worker1'
@worker.name = 'worker1'
exp_time = Delayed::Job.db_time_now - (1.minutes + Delayed::Job::max_run_time)
job_create(:locked_by => 'worker1', :locked_at => exp_time)
job_create(:locked_by => 'worker2', :locked_at => (Delayed::Job.db_time_now - 1.minutes))
Expand All @@ -80,14 +80,14 @@ def job_create(opts = {})
end

it "should only find unlocked and expired jobs" do
Delayed::Job.worker_name = 'worker3'
@worker.name = 'worker3'
SimpleJob.runs.should == 0
@worker.work_off
SimpleJob.runs.should == 2 # runs the one open job and one expired job
end

it "should ignore locks when finding our own jobs" do
Delayed::Job.worker_name = 'worker1'
@worker.name = 'worker1'
SimpleJob.runs.should == 0
@worker.work_off
SimpleJob.runs.should == 3 # runs open job plus worker1 jobs
Expand Down

0 comments on commit 747fb3d

Please sign in to comment.