Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
tree: aa8e87b371
Fetching contributors…

Cannot retrieve contributors at this time

118 lines (95 sloc) 3.396 kb
module Delayed
class Worker
cattr_accessor :min_priority, :max_priority, :max_attempts, :max_run_time, :sleep_delay, :logger
self.sleep_delay = 5
self.max_attempts = 25
self.max_run_time = 4.hours
self.logger = if defined?(Merb::Logger)
Merb.logger
elsif defined?(RAILS_DEFAULT_LOGGER)
RAILS_DEFAULT_LOGGER
end
# name_prefix is ignored if name is set directly
attr_accessor :name_prefix
# Every worker has a unique name which by default is the pid of the process.
# There are some advantages to overriding this with something which survives worker retarts:
# Workers can safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.
def name
return @name unless @name.nil?
"#{@name_prefix}host:#{Socket.gethostname} pid:#{Process.pid}" rescue "#{@name_prefix}pid:#{Process.pid}"
end
# Sets the name of the worker.
# Setting the name to nil will reset the default worker name
def name=(val)
@name = val
end
def initialize(options={})
@quiet = options[:quiet]
self.class.min_priority = options[:min_priority] if options.has_key?(:min_priority)
self.class.max_priority = options[:max_priority] if options.has_key?(:max_priority)
end
def start
say "*** Starting job worker #{name}"
trap('TERM') { say 'Exiting...'; $exit = true }
trap('INT') { say 'Exiting...'; $exit = true }
loop do
result = nil
realtime = Benchmark.realtime do
result = work_off
end
count = result.sum
break if $exit
if count.zero?
sleep(@@sleep_delay)
else
say "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last]
end
break if $exit
end
ensure
Delayed::Job.clear_locks!(name)
end
def say(text, level = Logger::INFO)
puts text unless @quiet
logger.add level, text if logger
end
protected
# Run the next job we can get an exclusive lock on.
# If no jobs are left we return nil
def reserve_and_run_one_job(max_run_time = self.class.max_run_time)
# We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next.
# this leads to a more even distribution of jobs across the worker processes
job = Delayed::Job.find_available(name, 5, max_run_time).detect do |job|
if job.lock_exclusively!(max_run_time, name)
say "* [Worker(#{name})] acquired lock on #{job.name}"
true
else
say "* [Worker(#{name})] failed to acquire exclusive lock for #{job.name}", Logger::WARN
false
end
end
if job.nil?
nil # we didn't do any work, all 5 were not lockable
else
job.run(max_run_time)
end
end
# Do num jobs and return stats on success/failure.
# Exit early if interrupted.
def work_off(num = 100)
success, failure = 0, 0
num.times do
case reserve_and_run_one_job
when true
success += 1
when false
failure += 1
else
break # leave if no work could be done
end
break if $exit # leave if we're exiting
end
return [success, failure]
end
end
end
Jump to Line
Something went wrong with that request. Please try again.