Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Slightly more useful Job#name method that can easily be used for insp…

…ection
  • Loading branch information...
commit 916e9f2f8e7293f696ca795b1bc096eb04ed8412 1 parent aa843ea
@tobi tobi authored
View
27 lib/delayed/job.rb
@@ -19,7 +19,7 @@ class Job < ActiveRecord::Base
# There are some advantages to overriding this with something which survives worker retarts:
# Workers can safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.
cattr_accessor :worker_name
- self.worker_name = "pid:#{Process.pid}"
+ self.worker_name = "host:#{Socket.gethostname} pid:#{Process.pid}" rescue "pid:#{Process.pid}"
NextTaskSQL = '(run_at <= ? AND (locked_at IS NULL OR locked_at < ?) OR (locked_by = ?)) AND failed_at IS NULL'
NextTaskOrder = 'priority DESC, run_at ASC'
@@ -45,10 +45,16 @@ def failed?
def payload_object
@payload_object ||= deserialize(self['handler'])
end
-
- def name
- text = handler.gsub(/\n/, ' ')
- "#{id} (#{text.length > 40 ? "#{text[0..40]}..." : text})"
+
+ def name
+ @name ||= begin
+ payload = payload_object
+ if payload.respond_to?(:display_name)
+ payload.display_name
+ else
+ payload.class.name
+ end
+ end
end
def payload_object=(object)
@@ -105,7 +111,7 @@ def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME)
# Get the payload of the next job we can get an exclusive lock on.
# If no jobs are left we return nil
- def self.reserve(max_run_time = MAX_RUN_TIME)
+ def self.reserve(max_run_time = MAX_RUN_TIME, &block)
# We get up to 5 jobs from the db. In face we cannot get exclusive access to a job we try the next.
# this leads to a more even distribution of jobs across the worker processes
@@ -114,7 +120,7 @@ def self.reserve(max_run_time = MAX_RUN_TIME)
logger.info "* [JOB] aquiring lock on #{job.name}"
job.lock_exclusively!(max_run_time, worker_name)
runtime = Benchmark.realtime do
- yield job.payload_object
+ invoke_job(job.payload_object, &block)
job.destroy
end
logger.info "* [JOB] #{job.name} completed after %.4f" % runtime
@@ -181,8 +187,15 @@ def self.work_off(num = 100)
end
return [success, failure]
+ end
+
+
+ # Moved into its own method so that new_relic can trace it.
+ def self.invoke_job(job, &block)
+ block.call(job)
end
+
private
def deserialize(source)
View
4 lib/delayed/performable_method.rb
@@ -10,6 +10,10 @@ def initialize(object, method, args)
self.args = args.map { |a| dump(a) }
self.method = method.to_sym
end
+
+ def display_name
+ "#{object}##{method}"
+ end
def perform
load(object).send(method, *args.map{|a| load(a)})
View
10 lib/delayed/worker.rb
@@ -3,7 +3,7 @@ class Worker
SLEEP = 5
def initialize(options={})
- @quiet = options[:quiet]
+ @quiet = options[:quiet]
Delayed::Job.min_priority = options[:min_priority] if options.has_key?(:min_priority)
Delayed::Job.max_priority = options[:max_priority] if options.has_key?(:max_priority)
end
@@ -13,7 +13,8 @@ def start
trap('TERM') { say 'Exiting...'; $exit = true }
trap('INT') { say 'Exiting...'; $exit = true }
-
+
+
loop do
result = nil
@@ -32,7 +33,10 @@ def start
end
break if $exit
- end
+ end
+
+ ensure
+ Delayed::Job..clear_locks!
end
def say(text)
View
13 spec/job_spec.rb
@@ -187,7 +187,18 @@ def perform; raise 'did not work'; end
@job.lock_exclusively! 5.minutes, 'worker1'
@job.lock_exclusively! 5.minutes, 'worker1'
end
- end
+ end
+
+ context "#name" do
+ it "should be the class name of the job that was enqueued" do
+ Delayed::Job.create(:payload_object => ErrorJob.new ).name.should == 'ErrorJob'
+ end
+
+ it "should be the method that will be called if its a performable method object" do
+ Delayed::Job.send_later(:clear_locks!)
+ Delayed::Job.last.name.should == 'CLASS:Delayed::Job#clear_locks!'
+ end
+ end
context "worker prioritization" do
Please sign in to comment.
Something went wrong with that request. Please try again.