Skip to content

Commit

Permalink
Refactor work loop
Browse files Browse the repository at this point in the history
  • Loading branch information
fw42 committed Jul 8, 2016
1 parent 0691ac6 commit a4ee6bc
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 39 deletions.
92 changes: 53 additions & 39 deletions lib/resque/worker.rb
Expand Up @@ -219,47 +219,12 @@ def glob_match(pattern)
# has completed processing. Useful for testing.
def work(interval = 5.0, &block)
interval = Float(interval)
$0 = "resque: Starting"
startup

loop do
break if shutdown?

if not paused? and job = reserve
log_with_severity :info, "got: #{job.inspect}"
job.worker = self
working_on job

procline "Processing #{job.queue} since #{Time.now.to_i} [#{job.payload_class_name}]"
if fork_per_job?
run_hook :before_fork, job
begin
@child = fork do
unregister_signal_handlers if term_child
perform(job, &block)
exit! unless run_at_exit_hooks
end
rescue NotImplementedError
@fork_per_job = false
end
if @child
srand # Reseeding
procline "Forked #{@child} at #{Time.now.to_i}"
begin
Process.waitpid(@child)
rescue SystemCallError
nil
end
job.fail(DirtyExit.new("Child process received unhandled signal #{$?.stopsig}")) if $?.signaled?
@child = nil
end
end
unless fork_per_job?
perform(job, &block)
end

done_working
else
unless work_one_job(&block)
break if interval.zero?
log_with_severity :debug, "Sleeping for #{interval} seconds"
procline paused? ? "Paused" : "Waiting for #{queues.join(',')}"
Expand All @@ -269,11 +234,29 @@ def work(interval = 5.0, &block)

unregister_worker
rescue Exception => exception
unless exception.class == SystemExit && !@child && run_at_exit_hooks
log_with_severity :error, "Failed to start worker : #{exception.inspect}"
return if exception.class == SystemExit && !@child && run_at_exit_hooks
log_with_severity :error, "Failed to start worker : #{exception.inspect}"
unregister_worker(exception)
end

unregister_worker(exception)
def work_one_job(job = nil, &block)
return false if paused?
return false unless job ||= reserve

working_on job
procline "Processing #{job.queue} since #{Time.now.to_i} [#{job.payload_class_name}]"

log_with_severity :info, "got: #{job.inspect}"
job.worker = self

if fork_per_job?
perform_with_fork(job, &block)
else
perform(job, &block)
end

done_working
true
end

# DEPRECATED. Processes a single job. If none is given, it will
Expand Down Expand Up @@ -303,6 +286,7 @@ def report_failed_job(job,exception)
end
end


# Processes a given job in the child.
def perform(job)
begin
Expand Down Expand Up @@ -358,6 +342,8 @@ def reconnect

# Runs all the methods needed when a worker begins its lifecycle.
def startup
$0 = "resque: Starting"

enable_gc_optimizations
register_signal_handlers
start_heartbeat
Expand Down Expand Up @@ -864,6 +850,34 @@ def very_verbose=(value)

private

def perform_with_fork(job, &block)
run_hook :before_fork, job

begin
@child = fork do
unregister_signal_handlers if term_child
perform(job, &block)
exit! unless run_at_exit_hooks
end
rescue NotImplementedError
@fork_per_job = false
perform(job, &block)
return
end

srand # Reseeding
procline "Forked #{@child} at #{Time.now.to_i}"

begin
Process.waitpid(@child)
rescue SystemCallError
nil
end

job.fail(DirtyExit.new("Child process received unhandled signal #{$?.stopsig}")) if $?.signaled?
@child = nil
end

def log_with_severity(severity, message)
Logging.log(severity, message)
end
Expand Down
22 changes: 22 additions & 0 deletions test/worker_test.rb
Expand Up @@ -318,6 +318,28 @@ def self.perform
assert_equal 0, Resque.size(:high)
end

it "can work off one job" do
Resque::Job.create(:jobs, GoodJob)
assert_equal 2, Resque.size(:jobs)
assert_equal true, @worker.work_one_job
assert_equal 1, Resque.size(:jobs)

job = Resque::Job.new(:jobs, {'class' => 'GoodJob'})
assert_equal 1, Resque.size(:jobs)
assert_equal true, @worker.work_one_job(job)
assert_equal 1, Resque.size(:jobs)

@worker.pause_processing
@worker.work_one_job
assert_equal 1, Resque.size(:jobs)

@worker.unpause_processing
assert_equal true, @worker.work_one_job
assert_equal 0, Resque.size(:jobs)

assert_equal false, @worker.work_one_job
end

it "the queues method avoids unnecessary calls to smembers" do
worker = Resque::Worker.new(:critical, :high)
Resque.redis.expects(:smembers).at_most_once
Expand Down

0 comments on commit a4ee6bc

Please sign in to comment.