Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: extract worker process into separate class [changelog skip] #2374

Merged
merged 6 commits into from
Sep 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
209 changes: 16 additions & 193 deletions lib/puma/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
require 'puma/runner'
require 'puma/util'
require 'puma/plugin'
require 'puma/cluster/worker_handle'
require 'puma/cluster/worker'

require 'time'

Expand All @@ -11,10 +13,6 @@ module Puma
# to boot and serve a Ruby application when puma "workers" are needed
# i.e. when using multi-processes. For example `$ puma -w 5`
#
# At the core of this class is running an instance of `Puma::Server` which
# gets created via the `start_server` method from the `Puma::Runner` class
# that this inherits from.
#
# An instance of this class will spawn the number of processes passed in
# via the `spawn_workers` method call. Each worker will have it's own
# instance of a `Puma::Server`.
Expand Down Expand Up @@ -61,79 +59,6 @@ def redirect_io
@workers.each { |x| x.hup }
end

class Worker
def initialize(idx, pid, phase, options)
@index = idx
@pid = pid
@phase = phase
@stage = :started
@signal = "TERM"
@options = options
@first_term_sent = nil
@started_at = Time.now
@last_checkin = Time.now
@last_status = {}
@term = false
end

attr_reader :index, :pid, :phase, :signal, :last_checkin, :last_status, :started_at

# @version 5.0.0
attr_writer :pid, :phase

def booted?
@stage == :booted
end

def boot!
@last_checkin = Time.now
@stage = :booted
end

def term?
@term
end

def ping!(status)
@last_checkin = Time.now
require 'json'
@last_status = JSON.parse(status, symbolize_names: true)
end

# @see Puma::Cluster#check_workers
# @version 5.0.0
def ping_timeout
@last_checkin +
(booted? ?
@options[:worker_timeout] :
@options[:worker_boot_timeout]
)
end

def term
begin
if @first_term_sent && (Time.now - @first_term_sent) > @options[:worker_shutdown_timeout]
@signal = "KILL"
else
@term ||= true
@first_term_sent ||= Time.now
end
Process.kill @signal, @pid if @pid
rescue Errno::ESRCH
end
end

def kill
@signal = 'KILL'
term
end

def hup
Process.kill "HUP", @pid
rescue Errno::ESRCH
end
end

def spawn_workers
diff = @options[:workers] - @workers.size
return if diff < 1
Expand All @@ -154,7 +79,7 @@ def spawn_workers
end

debug "Spawned worker: #{pid}"
@workers << Worker.new(idx, pid, @phase, @options)
@workers << WorkerHandle.new(idx, pid, @phase, @options)
end

if @options[:fork_worker] &&
Expand Down Expand Up @@ -248,113 +173,23 @@ def wakeup!
end

def worker(index, master)
title = "puma: cluster worker #{index}: #{master}"
title += " [#{@options[:tag]}]" if @options[:tag] && !@options[:tag].empty?
$0 = title

Signal.trap "SIGINT", "IGNORE"
Signal.trap "SIGCHLD", "DEFAULT"

fork_worker = @options[:fork_worker] && index == 0

@workers = []
if !@options[:fork_worker] || fork_worker
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I never realized this conditional was always true 😞

@master_read.close
@suicide_pipe.close
@fork_writer.close
end

Thread.new do
Puma.set_thread_name "worker check pipe"
IO.select [@check_pipe]
log "! Detected parent died, dying"
exit! 1
end
@master_read.close
@suicide_pipe.close
@fork_writer.close

# If we're not running under a Bundler context, then
# report the info about the context we will be using
if !ENV['BUNDLE_GEMFILE']
if File.exist?("Gemfile")
log "+ Gemfile in context: #{File.expand_path("Gemfile")}"
elsif File.exist?("gems.rb")
log "+ Gemfile in context: #{File.expand_path("gems.rb")}"
end
end

# Invoke any worker boot hooks so they can get
# things in shape before booting the app.
@launcher.config.run_hooks :before_worker_boot, index, @launcher.events

server = @server ||= start_server
restart_server = Queue.new << true << false

if fork_worker
restart_server.clear
worker_pids = []
Signal.trap "SIGCHLD" do
wakeup! if worker_pids.reject! do |p|
Process.wait(p, Process::WNOHANG) rescue true
end
end

Thread.new do
Puma.set_thread_name "worker fork pipe"
while (idx = @fork_pipe.gets)
idx = idx.to_i
if idx == -1 # stop server
if restart_server.length > 0
restart_server.clear
server.begin_restart(true)
@launcher.config.run_hooks :before_refork, nil, @launcher.events
nakayoshi_gc
end
elsif idx == 0 # restart server
restart_server << true << false
else # fork worker
worker_pids << pid = spawn_worker(idx, master)
@worker_write << "f#{pid}:#{idx}\n" rescue nil
end
end
end
end

Signal.trap "SIGTERM" do
@worker_write << "e#{Process.pid}\n" rescue nil
server.stop
restart_server << false
end

begin
@worker_write << "b#{Process.pid}:#{index}\n"
rescue SystemCallError, IOError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
STDERR.puts "Master seems to have exited, exiting."
return
end

Thread.new(@worker_write) do |io|
Puma.set_thread_name "stat payload"

while true
sleep Const::WORKER_CHECK_INTERVAL
begin
require 'json'
io << "p#{Process.pid}#{server.stats.to_json}\n"
rescue IOError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
break
end
end
pipes = { check_pipe: @check_pipe, worker_write: @worker_write }
if @options[:fork_worker]
pipes[:fork_pipe] = @fork_pipe
pipes[:wakeup] = @wakeup
end

server.run.join while restart_server.pop

# Invoke any worker shutdown hooks so they can prevent the worker
# exiting until any background operations are completed
@launcher.config.run_hooks :before_worker_shutdown, index, @launcher.events
ensure
@worker_write << "t#{Process.pid}\n" rescue nil
@worker_write.close
new_worker = Worker.new index: index,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty delicious interface here 👍

master: master,
launcher: @launcher,
pipes: pipes
new_worker.run
end

def restart
Expand Down Expand Up @@ -550,7 +385,7 @@ def run
@master_read, @worker_write = read, @wakeup

@launcher.config.run_hooks :before_fork, nil, @launcher.events
nakayoshi_gc
Puma::Util.nakayoshi_gc @events if @options[:nakayoshi_fork]

spawn_workers

Expand Down Expand Up @@ -655,17 +490,5 @@ def timeout_workers
end
end
end

# @version 5.0.0
def nakayoshi_gc
return unless @options[:nakayoshi_fork]
log "! Promoting existing objects to old generation..."
4.times { GC.start(full_mark: false) }
if GC.respond_to?(:compact)
log "! Compacting..."
GC.compact
end
log "! Friendly fork preparation complete."
end
end
end