Skip to content

Commit

Permalink
Add phased worker upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
evanphx committed Feb 6, 2013
1 parent a3e76b8 commit 804a6e2
Showing 1 changed file with 79 additions and 5 deletions.
84 changes: 79 additions & 5 deletions lib/puma/cli.rb
Expand Up @@ -28,6 +28,7 @@ def initialize(argv, stdout=STDOUT, stderr=STDERR)
@stdout = stdout
@stderr = stderr

@phase = 0
@workers = []

@events = Events.new @stdout, @stderr
Expand All @@ -36,6 +37,7 @@ def initialize(argv, stdout=STDOUT, stderr=STDERR)
@status = nil

@restart = false
@phased_state = :idle

ENV['NEWRELIC_DISPATCHER'] ||= "puma"

Expand Down Expand Up @@ -483,6 +485,7 @@ def worker
$0 = "puma: cluster worker: #{@master_pid}"
Signal.trap "SIGINT", "IGNORE"

@master_read.close
@suicide_pipe.close

Thread.new do
Expand All @@ -503,7 +506,12 @@ def worker
server.stop
end

@worker_write << "b#{Process.pid}\n"

server.run.join

ensure
@worker_write.close
end

def stop_workers
Expand All @@ -519,12 +527,27 @@ def stop_workers
end
end

def start_phased_restart
@phase += 1
log "- Starting phased worker restart, phase: #{@phase}"
end

class Worker
def initialize(pid)
def initialize(pid, phase)
@pid = pid
@phase = phase
@stage = :started
end

attr_reader :pid
attr_reader :pid, :phase

def booted?
@stage == :booted
end

def boot!
@stage = :booted
end

def term
begin
Expand All @@ -540,10 +563,18 @@ def spawn_workers
diff.times do
pid = fork { worker }
debug "Spawned worker: #{pid}"
@workers << Worker.new(pid)
@workers << Worker.new(pid, @phase)
end

if diff > 0
@phased_state = :idle
end
end

def all_workers_booted?
@workers.count { |w| !w.booted? } == 0
end

def check_workers
while true
pid = Process.waitpid(-1, Process::WNOHANG)
Expand All @@ -553,6 +584,20 @@ def check_workers
end

spawn_workers

if @phased_state == :idle && all_workers_booted?
# If we're running at proper capacity, check to see if
# we need to phase any workers out (which will restart
# in the right phase).
#
w = @workers.find { |x| x.phase != @phase }

if w
@phased_state = :waiting
log "- Stopping #{w.pid} for phased upgrade..."
w.term
end
end
end

def run_cluster
Expand Down Expand Up @@ -590,6 +635,16 @@ def run_cluster
rescue Exception
end

phased_restart = false

begin
Signal.trap "SIGUSR1" do
phased_restart = true
write.write "!"
end
rescue Exception
end

# Used by the workers to detect if the master process dies.
# If select says that @check_pipe is ready, it's because the
# master has exited and @suicide_pipe has been automatically
Expand All @@ -607,6 +662,7 @@ def run_cluster

write_state

@master_read, @worker_write = read, write
spawn_workers

Signal.trap "SIGINT" do
Expand All @@ -619,10 +675,28 @@ def run_cluster
begin
res = IO.select([read], nil, nil, 5)

# drain read
read.read_nonblock(255) if res
if res
req = read.read_nonblock(1)

if req == "b"
pid = read.gets.to_i
w = @workers.find { |w| w.pid == pid }
if w
w.boot!
log "- Worker #{pid} booted, phase: #{w.phase}"
else
log "! Out-of-sync worker list, no #{pid} worker"
end
end
end

check_workers

if phased_restart
start_phased_restart
phased_restart = false
end

rescue Interrupt
stop = true
end
Expand Down

0 comments on commit 804a6e2

Please sign in to comment.