Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful shutdown #31

Merged
merged 8 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
72 changes: 72 additions & 0 deletions examples/grace/server.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#!/usr/bin/env ruby
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2024, by Samuel Williams.

require '../../lib/async/container'
require 'async/io/host_endpoint'

Console.logger.debug!

module SignalWrapper
def self.trap(signal, &block)
signal = signal

original = Signal.trap(signal) do
::Signal.trap(signal, original)
block.call
end
end
end

class Controller < Async::Container::Controller
def initialize(...)
super

@endpoint = Async::IO::Endpoint.tcp("localhost", 8080)
@bound_endpoint = nil
end

def start
Console.debug(self) {"Binding to #{@endpoint}"}
@bound_endpoint = Sync{@endpoint.bound}

super
end

def setup(container)
container.run count: 2, restart: true do |instance|
SignalWrapper.trap(:INT) do
Console.debug(self) {"Closing bound instance..."}
@bound_endpoint.close
end

Sync do |task|
Console.info(self) {"Starting bound instance..."}

instance.ready!

@bound_endpoint.accept do |peer|
while true
peer.write("#{Time.now.to_s.rjust(32)}: Hello World\n")
sleep 1
end
end
end
end
end

def stop(graceful = true)
super

if @bound_endpoint
@bound_endpoint.close
@bound_endpoint = nil
end
end
end

controller = Controller.new

controller.run
19 changes: 13 additions & 6 deletions lib/async/container/controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class Controller

# Initialize the controller.
# @parameter notify [Notify::Client] A client used for process readiness notifications.
def initialize(notify: Notify.open!, container_class: Container)
def initialize(notify: Notify.open!, container_class: Container, graceful_stop: true)
@container = nil
@container_class = container_class

Expand All @@ -35,6 +35,8 @@ def initialize(notify: Notify.open!, container_class: Container)
trap(SIGHUP) do
self.restart
end

@graceful_stop = graceful_stop
end

# The state of the controller.
Expand Down Expand Up @@ -96,7 +98,7 @@ def start

# Stop the container if it's running.
# @parameter graceful [Boolean] Whether to give the children instances time to shut down or to kill them immediately.
def stop(graceful = true)
def stop(graceful = @graceful_stop)
@container&.stop(graceful)
@container = nil
end
Expand Down Expand Up @@ -130,7 +132,7 @@ def restart
if container.failed?
@notify&.error!("Container failed to start!")

container.stop
container.stop(false)

raise SetupError, container
end
Expand All @@ -142,7 +144,7 @@ def restart

if old_container
Console.logger.debug(self, "Stopping old container...")
old_container&.stop
old_container&.stop(@graceful_stop)
end

@notify&.ready!
Expand All @@ -165,7 +167,9 @@ def reload

# Wait for all child processes to enter the ready state.
Console.logger.debug(self, "Waiting for startup...")

@container.wait_until_ready

Console.logger.debug(self, "Finished startup.")

if @container.failed?
Expand All @@ -182,14 +186,17 @@ def run
# I thought this was the default... but it doesn't always raise an exception unless you do this explicitly.
# We use `Thread.current.raise(...)` so that exceptions are filtered through `Thread.handle_interrupt` correctly.
interrupt_action = Signal.trap(:INT) do
# $stderr.puts "Received INT signal, terminating...", caller
::Thread.current.raise(Interrupt)
end

terminate_action = Signal.trap(:TERM) do
# $stderr.puts "Received TERM signal, terminating...", caller
::Thread.current.raise(Terminate)
end

hangup_action = Signal.trap(:HUP) do
# $stderr.puts "Received HUP signal, restarting...", caller
::Thread.current.raise(Hangup)
end

Expand All @@ -211,11 +218,11 @@ def run
end
end
rescue Interrupt
self.stop(true)
self.stop
rescue Terminate
self.stop(false)
ensure
self.stop(true)
self.stop(false)

# Restore the interrupt handler:
Signal.trap(:INT, interrupt_action)
Expand Down
5 changes: 5 additions & 0 deletions lib/async/container/group.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ def initialize
@queue = nil
end

def inspect
"#<#{self.class} running=#{@running.size}>"
end

# @attribute [Hash(IO, Fiber)] the running tasks, indexed by IO.
attr :running

Expand Down Expand Up @@ -133,6 +137,7 @@ def wait_for(channel)
protected

def wait_for_children(duration = nil)
Console.debug(self, "Waiting for children...", duration: duration)
if !@running.empty?
# Maybe consider using a proper event loop here:
readable, _, _ = ::IO.select(@running.keys, nil, nil, duration)
Expand Down
2 changes: 1 addition & 1 deletion lib/async/container/notify/console.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ module Notify
# Implements a general process readiness protocol with output to the local console.
class Console < Client
# Open a notification client attached to the current console.
def self.open!(logger = ::Console.logger)
def self.open!(logger = ::Console)
self.new(logger)
end

Expand Down
4 changes: 2 additions & 2 deletions test/async/container/.bad.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@

require_relative '../../../lib/async/container/controller'

$stdout.sync = true

class Bad < Async::Container::Controller
def setup(container)
container.run(name: "bad", count: 1, restart: true) do |instance|
# Deliberately missing call to `instance.ready!`:
# instance.ready!

$stdout.puts "Ready..."
$stdout.flush

sleep
ensure
$stdout.puts "Exiting..."
$stdout.flush
end
end
end
Expand Down
6 changes: 3 additions & 3 deletions test/async/container/.dots.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@

require_relative '../../../lib/async/container/controller'

# Console.logger.debug!
$stdout.sync = true

class Dots < Async::Container::Controller
def setup(container)
container.run(name: "dots", count: 1, restart: true) do |instance|
instance.ready!

sleep 1
# This is to avoid race conditions in the controller in test conditions.
sleep 0.1

$stdout.write "."
$stdout.flush

sleep
rescue Async::Container::Interrupt
Expand Down
45 changes: 45 additions & 0 deletions test/async/container/.graceful.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#!/usr/bin/env ruby
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2020-2022, by Samuel Williams.

require_relative '../../../lib/async/container/controller'

$stdout.sync = true

class Graceful < Async::Container::Controller
def setup(container)
container.run(name: "graceful", count: 1, restart: true) do |instance|
instance.ready!

# This is to avoid race conditions in the controller in test conditions.
sleep 0.1

clock = Async::Clock.start

original_action = Signal.trap(:INT) do
# We ignore the int, but in practical applications you would want start a graceful shutdown.
$stdout.puts "Graceful shutdown...", clock.total

Signal.trap(:INT, original_action)
end

$stdout.puts "Ready...", clock.total

sleep
ensure
$stdout.puts "Exiting...", clock.total
end
end
end

controller = Graceful.new(graceful_stop: 1)

begin
controller.run
rescue Async::Container::Terminate
$stdout.puts "Terminated..."
rescue Interrupt
$stdout.puts "Interrupted..."
end
39 changes: 39 additions & 0 deletions test/async/container/controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,45 @@ def controller.setup(container)
end
end

with 'graceful controller' do
let(:controller_path) {File.expand_path(".graceful.rb", __dir__)}

let(:pipe) {IO.pipe}
let(:input) {pipe.first}
let(:output) {pipe.last}

let(:pid) {@pid}

def before
@pid = Process.spawn("bundle", "exec", controller_path, out: output)
output.close

super
end

def after
Process.kill(:TERM, @pid)
Process.wait(@pid)

super
end

it "has graceful shutdown" do
expect(input.gets).to be == "Ready...\n"
start_time = input.gets.to_f

Process.kill(:INT, @pid)

expect(input.gets).to be == "Graceful shutdown...\n"
graceful_shutdown_time = input.gets.to_f

expect(input.gets).to be == "Exiting...\n"
exit_time = input.gets.to_f

expect(exit_time - graceful_shutdown_time).to be >= 1.0
end
end

with 'bad controller' do
let(:controller_path) {File.expand_path(".bad.rb", __dir__)}

Expand Down