Skip to content

Commit

Permalink
Slightly more useful Job#name method that can easily be used for insp…
Browse files Browse the repository at this point in the history
…ection
  • Loading branch information
Tobias Lütke committed Nov 26, 2008
1 parent aa843ea commit 916e9f2
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 11 deletions.
27 changes: 20 additions & 7 deletions lib/delayed/job.rb
Expand Up @@ -19,7 +19,7 @@ class Job < ActiveRecord::Base
# There are some advantages to overriding this with something which survives worker retarts:
# Workers can safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.
cattr_accessor :worker_name
self.worker_name = "pid:#{Process.pid}"
self.worker_name = "host:#{Socket.gethostname} pid:#{Process.pid}" rescue "pid:#{Process.pid}"

NextTaskSQL = '(run_at <= ? AND (locked_at IS NULL OR locked_at < ?) OR (locked_by = ?)) AND failed_at IS NULL'
NextTaskOrder = 'priority DESC, run_at ASC'
Expand All @@ -45,10 +45,16 @@ def failed?
def payload_object
@payload_object ||= deserialize(self['handler'])
end

def name
text = handler.gsub(/\n/, ' ')
"#{id} (#{text.length > 40 ? "#{text[0..40]}..." : text})"

def name
@name ||= begin
payload = payload_object
if payload.respond_to?(:display_name)
payload.display_name
else
payload.class.name
end
end
end

def payload_object=(object)
Expand Down Expand Up @@ -105,7 +111,7 @@ def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME)

# Get the payload of the next job we can get an exclusive lock on.
# If no jobs are left we return nil
def self.reserve(max_run_time = MAX_RUN_TIME)
def self.reserve(max_run_time = MAX_RUN_TIME, &block)

# We get up to 5 jobs from the db. In face we cannot get exclusive access to a job we try the next.
# this leads to a more even distribution of jobs across the worker processes
Expand All @@ -114,7 +120,7 @@ def self.reserve(max_run_time = MAX_RUN_TIME)
logger.info "* [JOB] aquiring lock on #{job.name}"
job.lock_exclusively!(max_run_time, worker_name)
runtime = Benchmark.realtime do
yield job.payload_object
invoke_job(job.payload_object, &block)
job.destroy
end
logger.info "* [JOB] #{job.name} completed after %.4f" % runtime
Expand Down Expand Up @@ -181,8 +187,15 @@ def self.work_off(num = 100)
end

return [success, failure]
end


# Moved into its own method so that new_relic can trace it.
def self.invoke_job(job, &block)
block.call(job)
end


private

def deserialize(source)
Expand Down
4 changes: 4 additions & 0 deletions lib/delayed/performable_method.rb
Expand Up @@ -10,6 +10,10 @@ def initialize(object, method, args)
self.args = args.map { |a| dump(a) }
self.method = method.to_sym
end

def display_name
"#{object}##{method}"
end

def perform
load(object).send(method, *args.map{|a| load(a)})
Expand Down
10 changes: 7 additions & 3 deletions lib/delayed/worker.rb
Expand Up @@ -3,7 +3,7 @@ class Worker
SLEEP = 5

def initialize(options={})
@quiet = options[:quiet]
@quiet = options[:quiet]
Delayed::Job.min_priority = options[:min_priority] if options.has_key?(:min_priority)
Delayed::Job.max_priority = options[:max_priority] if options.has_key?(:max_priority)
end
Expand All @@ -13,7 +13,8 @@ def start

trap('TERM') { say 'Exiting...'; $exit = true }
trap('INT') { say 'Exiting...'; $exit = true }



loop do
result = nil

Expand All @@ -32,7 +33,10 @@ def start
end

break if $exit
end
end

ensure
Delayed::Job..clear_locks!
end

def say(text)
Expand Down
13 changes: 12 additions & 1 deletion spec/job_spec.rb
Expand Up @@ -187,7 +187,18 @@ def perform; raise 'did not work'; end
@job.lock_exclusively! 5.minutes, 'worker1'
@job.lock_exclusively! 5.minutes, 'worker1'
end
end
end

context "#name" do
it "should be the class name of the job that was enqueued" do
Delayed::Job.create(:payload_object => ErrorJob.new ).name.should == 'ErrorJob'
end

it "should be the method that will be called if its a performable method object" do
Delayed::Job.send_later(:clear_locks!)
Delayed::Job.last.name.should == 'CLASS:Delayed::Job#clear_locks!'
end
end

context "worker prioritization" do

Expand Down

0 comments on commit 916e9f2

Please sign in to comment.