Permalink
924 lines (784 sloc) 27 KB
require 'time'
require 'set'
require 'redis/distributed'
module Resque
# A Resque Worker processes jobs. On platforms that support fork(2),
# the worker will fork off a child to process each job. This ensures
# a clean slate when beginning the next job and cuts down on gradual
# memory growth as well as low level failures.
#
# It also ensures workers are always listening to signals from you,
# their master, and can react accordingly.
class Worker
include Resque::Helpers
extend Resque::Helpers
include Resque::Logging
@@all_heartbeat_threads = []
def self.kill_all_heartbeat_threads
@@all_heartbeat_threads.each(&:kill).each(&:join)
@@all_heartbeat_threads = []
end
def redis
Resque.redis
end
alias :data_store :redis
def self.redis
Resque.redis
end
def self.data_store
self.redis
end
# Given a Ruby object, returns a string suitable for storage in a
# queue.
def encode(object)
Resque.encode(object)
end
# Given a string, returns a Ruby object.
def decode(object)
Resque.decode(object)
end
attr_accessor :term_timeout
attr_accessor :pre_shutdown_timeout
attr_accessor :term_child_signal
# decide whether to use new_kill_child logic
attr_accessor :term_child
# should term kill workers gracefully (vs. immediately)
# Makes SIGTERM work like SIGQUIT
attr_accessor :graceful_term
# When set to true, forked workers will exit with `exit`, calling any `at_exit` code handlers that have been
# registered in the application. Otherwise, forked workers exit with `exit!`
attr_accessor :run_at_exit_hooks
attr_writer :fork_per_job
attr_writer :hostname
attr_writer :to_s
attr_writer :pid
# Returns an array of all worker objects.
def self.all
data_store.worker_ids.map { |id| find(id, :skip_exists => true) }.compact
end
# Returns an array of all worker objects currently processing
# jobs.
def self.working
names = all
return [] unless names.any?
reportedly_working = {}
begin
reportedly_working = data_store.workers_map(names).reject do |key, value|
value.nil? || value.empty?
end
rescue Redis::Distributed::CannotDistribute
names.each do |name|
value = data_store.get_worker_payload(name)
reportedly_working[name] = value unless value.nil? || value.empty?
end
end
reportedly_working.keys.map do |key|
worker = find(key.sub("worker:", ''), :skip_exists => true)
worker.job = worker.decode(reportedly_working[key])
worker
end.compact
end
# Returns a single worker object. Accepts a string id.
def self.find(worker_id, options = {})
skip_exists = options[:skip_exists]
if skip_exists || exists?(worker_id)
host, pid, queues_raw = worker_id.split(':')
queues = queues_raw.split(',')
worker = new(*queues)
worker.hostname = host
worker.to_s = worker_id
worker.pid = pid.to_i
worker
else
nil
end
end
# Alias of `find`
def self.attach(worker_id)
find(worker_id)
end
# Given a string worker id, return a boolean indicating whether the
# worker exists
def self.exists?(worker_id)
data_store.worker_exists?(worker_id)
end
# Workers should be initialized with an array of string queue
# names. The order is important: a Worker will check the first
# queue given for a job. If none is found, it will check the
# second queue name given. If a job is found, it will be
# processed. Upon completion, the Worker will again check the
# first queue given, and so forth. In this way the queue list
# passed to a Worker on startup defines the priorities of queues.
#
# If passed a single "*", this Worker will operate on all queues
# in alphabetical order. Queues can be dynamically added or
# removed without needing to restart workers using this method.
#
# Workers should have `#prepare` called after they are initialized
# if you are running work on the worker.
def initialize(*queues)
@shutdown = nil
@paused = nil
@before_first_fork_hook_ran = false
@heartbeat_thread = nil
@heartbeat_thread_signal = nil
verbose_value = ENV['LOGGING'] || ENV['VERBOSE']
self.verbose = verbose_value if verbose_value
self.very_verbose = ENV['VVERBOSE'] if ENV['VVERBOSE']
self.pre_shutdown_timeout = (ENV['RESQUE_PRE_SHUTDOWN_TIMEOUT'] || 0.0).to_f
self.term_timeout = (ENV['RESQUE_TERM_TIMEOUT'] || 4.0).to_f
self.term_child = ENV['TERM_CHILD']
self.graceful_term = ENV['GRACEFUL_TERM']
self.run_at_exit_hooks = ENV['RUN_AT_EXIT_HOOKS']
self.queues = queues
end
# Daemonizes the worker if ENV['BACKGROUND'] is set and writes
# the process id to ENV['PIDFILE'] if set. Should only be called
# once per worker.
def prepare
if ENV['BACKGROUND']
Process.daemon(true)
end
if ENV['PIDFILE']
File.open(ENV['PIDFILE'], 'w') { |f| f << pid }
end
self.reconnect if ENV['BACKGROUND']
end
WILDCARDS = ['*', '?', '{', '}', '[', ']'].freeze
def queues=(queues)
queues = queues.empty? ? (ENV["QUEUES"] || ENV['QUEUE']).to_s.split(',') : queues
@queues = queues.map { |queue| queue.to_s.strip }
@has_dynamic_queues = WILDCARDS.any? {|char| @queues.join.include?(char) }
validate_queues
end
# A worker must be given a queue, otherwise it won't know what to
# do with itself.
#
# You probably never need to call this.
def validate_queues
if @queues.nil? || @queues.empty?
raise NoQueueError.new("Please give each worker at least one queue.")
end
end
# Returns a list of queues to use when searching for a job.
# A splat ("*") means you want every queue (in alpha order) - this
# can be useful for dynamically adding new queues.
def queues
if @has_dynamic_queues
current_queues = Resque.queues
@queues.map { |queue| glob_match(current_queues, queue) }.flatten.uniq
else
@queues
end
end
def glob_match(list, pattern)
list.select do |queue|
File.fnmatch?(pattern, queue)
end.sort
end
# This is the main workhorse method. Called on a Worker instance,
# it begins the worker life cycle.
#
# The following events occur during a worker's life cycle:
#
# 1. Startup: Signals are registered, dead workers are pruned,
# and this worker is registered.
# 2. Work loop: Jobs are pulled from a queue and processed.
# 3. Teardown: This worker is unregistered.
#
# Can be passed a float representing the polling frequency.
# The default is 5 seconds, but for a semi-active site you may
# want to use a smaller value.
#
# Also accepts a block which will be passed the job as soon as it
# has completed processing. Useful for testing.
def work(interval = 5.0, &block)
interval = Float(interval)
startup
loop do
break if shutdown?
unless work_one_job(&block)
break if interval.zero?
log_with_severity :debug, "Sleeping for #{interval} seconds"
procline paused? ? "Paused" : "Waiting for #{queues.join(',')}"
sleep interval
end
end
unregister_worker
rescue Exception => exception
return if exception.class == SystemExit && !@child && run_at_exit_hooks
log_with_severity :error, "Failed to start worker : #{exception.inspect}"
unregister_worker(exception)
end
def work_one_job(job = nil, &block)
return false if paused?
return false unless job ||= reserve
working_on job
procline "Processing #{job.queue} since #{Time.now.to_i} [#{job.payload_class_name}]"
log_with_severity :info, "got: #{job.inspect}"
job.worker = self
if fork_per_job?
perform_with_fork(job, &block)
else
perform(job, &block)
end
done_working
true
end
# DEPRECATED. Processes a single job. If none is given, it will
# try to produce one. Usually run in the child.
def process(job = nil, &block)
return unless job ||= reserve
job.worker = self
working_on job
perform(job, &block)
ensure
done_working
end
# Reports the exception and marks the job as failed
def report_failed_job(job,exception)
log_with_severity :error, "#{job.inspect} failed: #{exception.inspect}"
begin
job.fail(exception)
rescue Object => exception
log_with_severity :error, "Received exception when reporting failure: #{exception.inspect}"
end
begin
failed!
rescue Object => exception
log_with_severity :error, "Received exception when increasing failed jobs counter (redis issue) : #{exception.inspect}"
end
end
# Processes a given job in the child.
def perform(job)
begin
if fork_per_job?
reconnect
run_hook :after_fork, job
end
job.perform
rescue Object => e
report_failed_job(job,e)
else
log_with_severity :info, "done: #{job.inspect}"
ensure
yield job if block_given?
end
end
# Attempts to grab a job off one of the provided queues. Returns
# nil if no job can be found.
def reserve
queues.each do |queue|
log_with_severity :debug, "Checking #{queue}"
if job = Resque.reserve(queue)
log_with_severity :debug, "Found job on #{queue}"
return job
end
end
nil
rescue Exception => e
log_with_severity :error, "Error reserving job: #{e.inspect}"
log_with_severity :error, e.backtrace.join("\n")
raise e
end
# Reconnect to Redis to avoid sharing a connection with the parent,
# retry up to 3 times with increasing delay before giving up.
def reconnect
tries = 0
begin
data_store.reconnect
rescue Redis::BaseConnectionError
if (tries += 1) <= 3
log_with_severity :error, "Error reconnecting to Redis; retrying"
sleep(tries)
retry
else
log_with_severity :error, "Error reconnecting to Redis; quitting"
raise
end
end
end
# Runs all the methods needed when a worker begins its lifecycle.
def startup
$0 = "resque: Starting"
enable_gc_optimizations
register_signal_handlers
start_heartbeat
prune_dead_workers
run_hook :before_first_fork
register_worker
# Fix buffering so we can `rake resque:work > resque.log` and
# get output from the child in there.
$stdout.sync = true
end
# Enables GC Optimizations if you're running REE.
# http://www.rubyenterpriseedition.com/faq.html#adapt_apps_for_cow
def enable_gc_optimizations
if GC.respond_to?(:copy_on_write_friendly=)
GC.copy_on_write_friendly = true
end
end
# Registers the various signal handlers a worker responds to.
#
# TERM: Shutdown immediately, stop processing jobs.
# INT: Shutdown immediately, stop processing jobs.
# QUIT: Shutdown after the current job has finished processing.
# USR1: Kill the forked child immediately, continue processing jobs.
# USR2: Don't process any new jobs
# CONT: Start processing jobs again after a USR2
def register_signal_handlers
trap('TERM') { graceful_term ? shutdown : shutdown! }
trap('INT') { shutdown! }
begin
trap('QUIT') { shutdown }
if term_child
trap('USR1') { new_kill_child }
else
trap('USR1') { kill_child }
end
trap('USR2') { pause_processing }
trap('CONT') { unpause_processing }
rescue ArgumentError
log_with_severity :warn, "Signals QUIT, USR1, USR2, and/or CONT not supported."
end
log_with_severity :debug, "Registered signals"
end
def unregister_signal_handlers
trap('TERM') do
trap('TERM') do
# Ignore subsequent term signals
end
raise TermException.new("SIGTERM")
end
trap('INT', 'DEFAULT')
begin
trap('QUIT', 'DEFAULT')
trap('USR1', 'DEFAULT')
trap('USR2', 'DEFAULT')
rescue ArgumentError
end
end
# Schedule this worker for shutdown. Will finish processing the
# current job.
def shutdown
log_with_severity :info, 'Exiting...'
@shutdown = true
end
# Kill the child and shutdown immediately.
# If not forking, abort this process.
def shutdown!
shutdown
if term_child
if fork_per_job?
new_kill_child
else
# Raise TermException in the same process
trap('TERM') do
# ignore subsequent terms
end
raise TermException.new("SIGTERM")
end
else
kill_child
end
end
# Should this worker shutdown as soon as current job is finished?
def shutdown?
@shutdown
end
# Kills the forked child immediately, without remorse. The job it
# is processing will not be completed.
def kill_child
if @child
log_with_severity :debug, "Killing child at #{@child}"
if `ps -o pid,state -p #{@child}`
Process.kill("KILL", @child) rescue nil
else
log_with_severity :debug, "Child #{@child} not found, restarting."
shutdown
end
end
end
def heartbeat
data_store.heartbeat(self)
end
def remove_heartbeat
data_store.remove_heartbeat(self)
end
def heartbeat!(time = data_store.server_time)
data_store.heartbeat!(self, time)
end
def self.all_heartbeats
data_store.all_heartbeats
end
# Returns a list of workers that have sent a heartbeat in the past, but which
# already expired (does NOT include workers that have never sent a heartbeat at all).
def self.all_workers_with_expired_heartbeats
workers = Worker.all
heartbeats = Worker.all_heartbeats
now = data_store.server_time
workers.select do |worker|
id = worker.to_s
heartbeat = heartbeats[id]
if heartbeat
seconds_since_heartbeat = (now - Time.parse(heartbeat)).to_i
seconds_since_heartbeat > Resque.prune_interval
else
false
end
end
end
def start_heartbeat
remove_heartbeat
@heartbeat_thread_signal = Resque::ThreadSignal.new
@heartbeat_thread = Thread.new do
loop do
heartbeat!
signaled = @heartbeat_thread_signal.wait_for_signal(Resque.heartbeat_interval)
break if signaled
end
end
@@all_heartbeat_threads << @heartbeat_thread
end
# Kills the forked child immediately with minimal remorse. The job it
# is processing will not be completed. Send the child a TERM signal,
# wait <term_timeout> seconds, and then a KILL signal if it has not quit
# If pre_shutdown_timeout has been set to a positive number, it will allow
# the child that many seconds before sending the aforementioned TERM and KILL.
def new_kill_child
if @child
unless child_already_exited?
if pre_shutdown_timeout && pre_shutdown_timeout > 0.0
log_with_severity :debug, "Waiting #{pre_shutdown_timeout.to_f}s for child process to exit"
return if wait_for_child_exit(pre_shutdown_timeout)
end
log_with_severity :debug, "Sending TERM signal to child #{@child}"
Process.kill("TERM", @child)
if wait_for_child_exit(term_timeout)
return
else
log_with_severity :debug, "Sending KILL signal to child #{@child}"
Process.kill("KILL", @child)
end
else
log_with_severity :debug, "Child #{@child} already quit."
end
end
rescue SystemCallError
log_with_severity :error, "Child #{@child} already quit and reaped."
end
def child_already_exited?
Process.waitpid(@child, Process::WNOHANG)
end
def wait_for_child_exit(timeout)
(timeout * 10).round.times do |i|
sleep(0.1)
return true if child_already_exited?
end
false
end
# are we paused?
def paused?
@paused
end
# Stop processing jobs after the current one has completed (if we're
# currently running one).
def pause_processing
log_with_severity :info, "USR2 received; pausing job processing"
run_hook :before_pause, self
@paused = true
end
# Start processing jobs again after a pause
def unpause_processing
log_with_severity :info, "CONT received; resuming job processing"
@paused = false
run_hook :after_pause, self
end
# Looks for any workers which should be running on this server
# and, if they're not, removes them from Redis.
#
# This is a form of garbage collection. If a server is killed by a
# hard shutdown, power failure, or something else beyond our
# control, the Resque workers will not die gracefully and therefore
# will leave stale state information in Redis.
#
# By checking the current Redis state against the actual
# environment, we can determine if Redis is old and clean it up a bit.
def prune_dead_workers
return unless data_store.acquire_pruning_dead_worker_lock(self, Resque.heartbeat_interval)
all_workers = Worker.all
unless all_workers.empty?
known_workers = worker_pids
all_workers_with_expired_heartbeats = Worker.all_workers_with_expired_heartbeats
end
all_workers.each do |worker|
# If the worker hasn't sent a heartbeat, remove it from the registry.
#
# If the worker hasn't ever sent a heartbeat, we won't remove it since
# the first heartbeat is sent before the worker is registred it means
# that this is a worker that doesn't support heartbeats, e.g., another
# client library or an older version of Resque. We won't touch these.
if all_workers_with_expired_heartbeats.include?(worker)
log_with_severity :info, "Pruning dead worker: #{worker}"
job_class = worker.job(false)['payload']['class'] rescue nil
worker.unregister_worker(PruneDeadWorkerDirtyExit.new(worker.to_s, job_class))
next
end
host, pid, worker_queues_raw = worker.id.split(':')
worker_queues = worker_queues_raw.split(",")
unless @queues.include?("*") || (worker_queues.to_set == @queues.to_set)
# If the worker we are trying to prune does not belong to the queues
# we are listening to, we should not touch it.
# Attempt to prune a worker from different queues may easily result in
# an unknown class exception, since that worker could easily be even
# written in different language.
next
end
next unless host == hostname
next if known_workers.include?(pid)
log_with_severity :debug, "Pruning dead worker: #{worker}"
worker.unregister_worker
end
end
# Registers ourself as a worker. Useful when entering the worker
# lifecycle on startup.
def register_worker
data_store.register_worker(self)
end
# Runs a named hook, passing along any arguments.
def run_hook(name, *args)
hooks = Resque.send(name)
return if hooks.empty?
return if name == :before_first_fork && @before_first_fork_hook_ran
msg = "Running #{name} hooks"
msg << " with #{args.inspect}" if args.any?
log_with_severity :info, msg
hooks.each do |hook|
args.any? ? hook.call(*args) : hook.call
@before_first_fork_hook_ran = true if name == :before_first_fork
end
end
def kill_background_threads
if @heartbeat_thread
@heartbeat_thread_signal.signal
@heartbeat_thread.join
end
end
# Unregisters ourself as a worker. Useful when shutting down.
def unregister_worker(exception = nil)
# If we're still processing a job, make sure it gets logged as a
# failure.
if (hash = processing) && !hash.empty?
job = Job.new(hash['queue'], hash['payload'])
# Ensure the proper worker is attached to this job, even if
# it's not the precise instance that died.
job.worker = self
begin
job.fail(exception || DirtyExit.new("Job still being processed"))
rescue RuntimeError => e
log_with_severity :error, e.message
end
end
kill_background_threads
data_store.unregister_worker(self) do
Stat.clear("processed:#{self}")
Stat.clear("failed:#{self}")
end
rescue Exception => exception_while_unregistering
message = exception_while_unregistering.message
if exception
message += "\nOriginal Exception (#{exception.class}): #{exception.message}"
message += "\n #{exception.backtrace.join(" \n")}" if exception.backtrace
end
fail(exception_while_unregistering.class,
message,
exception_while_unregistering.backtrace)
end
# Given a job, tells Redis we're working on it. Useful for seeing
# what workers are doing and when.
def working_on(job)
data = encode \
:queue => job.queue,
:run_at => Time.now.utc.iso8601,
:payload => job.payload
data_store.set_worker_payload(self,data)
end
# Called when we are done working - clears our `working_on` state
# and tells Redis we processed a job.
def done_working
data_store.worker_done_working(self) do
processed!
end
end
# How many jobs has this worker processed? Returns an int.
def processed
Stat["processed:#{self}"]
end
# Tell Redis we've processed a job.
def processed!
Stat << "processed"
Stat << "processed:#{self}"
end
# How many failed jobs has this worker seen? Returns an int.
def failed
Stat["failed:#{self}"]
end
# Tells Redis we've failed a job.
def failed!
Stat << "failed"
Stat << "failed:#{self}"
end
# What time did this worker start? Returns an instance of `Time`
def started
data_store.worker_start_time(self)
end
# Tell Redis we've started
def started!
data_store.worker_started(self)
end
# Returns a hash explaining the Job we're currently processing, if any.
def job(reload = true)
@job = nil if reload
@job ||= decode(data_store.get_worker_payload(self)) || {}
end
attr_writer :job
alias_method :processing, :job
# Boolean - true if working, false if not
def working?
state == :working
end
# Boolean - true if idle, false if not
def idle?
state == :idle
end
def fork_per_job?
return @fork_per_job if defined?(@fork_per_job)
@fork_per_job = ENV["FORK_PER_JOB"] != 'false' && Kernel.respond_to?(:fork)
end
# Returns a symbol representing the current worker state,
# which can be either :working or :idle
def state
data_store.get_worker_payload(self) ? :working : :idle
end
# Is this worker the same as another worker?
def ==(other)
to_s == other.to_s
end
def inspect
"#<Worker #{to_s}>"
end
# The string representation is the same as the id for this worker
# instance. Can be used with `Worker.find`.
def to_s
@to_s ||= "#{hostname}:#{pid}:#{@queues.join(',')}"
end
alias_method :id, :to_s
# chomp'd hostname of this worker's machine
def hostname
@hostname ||= Socket.gethostname
end
# Returns Integer PID of running worker
def pid
@pid ||= Process.pid
end
# Returns an Array of string pids of all the other workers on this
# machine. Useful when pruning dead workers on startup.
def worker_pids
if RUBY_PLATFORM =~ /solaris/
solaris_worker_pids
elsif RUBY_PLATFORM =~ /mingw32/
windows_worker_pids
else
linux_worker_pids
end
end
# Returns an Array of string pids of all the other workers on this
# machine. Useful when pruning dead workers on startup.
def windows_worker_pids
tasklist_output = `tasklist /FI "IMAGENAME eq ruby.exe" /FO list`.encode("UTF-8", Encoding.locale_charmap)
tasklist_output.split($/).select { |line| line =~ /^PID:/ }.collect { |line| line.gsub(/PID:\s+/, '') }
end
# Find Resque worker pids on Linux and OS X.
#
def linux_worker_pids
`ps -A -o pid,command | grep -E "[r]esque:work|[r]esque:\sStarting|[r]esque-[0-9]" | grep -v "resque-web"`.split("\n").map do |line|
line.split(' ')[0]
end
end
# Find Resque worker pids on Solaris.
#
# Returns an Array of string pids of all the other workers on this
# machine. Useful when pruning dead workers on startup.
def solaris_worker_pids
`ps -A -o pid,comm | grep "[r]uby" | grep -v "resque-web"`.split("\n").map do |line|
real_pid = line.split(' ')[0]
pargs_command = `pargs -a #{real_pid} 2>/dev/null | grep [r]esque | grep -v "resque-web"`
if pargs_command.split(':')[1] == " resque-#{Resque::Version}"
real_pid
end
end.compact
end
# Given a string, sets the procline ($0) and logs.
# Procline is always in the format of:
# RESQUE_PROCLINE_PREFIXresque-VERSION: STRING
def procline(string)
$0 = "#{ENV['RESQUE_PROCLINE_PREFIX']}resque-#{Resque::Version}: #{string}"
log_with_severity :debug, $0
end
def log(message)
info(message)
end
def log!(message)
debug(message)
end
attr_reader :verbose, :very_verbose
def verbose=(value);
if value && !very_verbose
Resque.logger.formatter = VerboseFormatter.new
Resque.logger.level = Logger::INFO
elsif !value
Resque.logger.formatter = QuietFormatter.new
end
@verbose = value
end
def very_verbose=(value)
if value
Resque.logger.formatter = VeryVerboseFormatter.new
Resque.logger.level = Logger::DEBUG
elsif !value && verbose
Resque.logger.formatter = VerboseFormatter.new
Resque.logger.level = Logger::INFO
else
Resque.logger.formatter = QuietFormatter.new
end
@very_verbose = value
end
private
def perform_with_fork(job, &block)
run_hook :before_fork, job
begin
@child = fork do
unregister_signal_handlers if term_child
perform(job, &block)
exit! unless run_at_exit_hooks
end
rescue NotImplementedError
@fork_per_job = false
perform(job, &block)
return
end
srand # Reseeding
procline "Forked #{@child} at #{Time.now.to_i}"
begin
Process.waitpid(@child)
rescue SystemCallError
nil
end
job.fail(DirtyExit.new("Child process received unhandled signal #{$?}", $?)) if $?.signaled?
@child = nil
end
def log_with_severity(severity, message)
Logging.log(severity, message)
end
end
end