Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions examples/container.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,33 @@
# Copyright, 2019, by Yuji Yaginuma.
# Copyright, 2022, by Anton Sozontov.

require "../lib/async/container"
require_relative "../lib/async/container"

Console.logger.debug!

container = Async::Container.new

Console.debug "Spawning 2 containers..."
Console.debug "Spawning 2 children..."

2.times do
container.spawn do |task|
Console.debug task, "Sleeping..."
sleep(2)
Console.debug task, "Waking up!"
container.spawn do |instance|
Signal.trap(:INT) {}
Signal.trap(:TERM) {}

Console.debug instance, "Sleeping..."
while true
sleep
end
Console.debug instance, "Waking up!"
end
end

Console.debug "Waiting for container..."
container.wait
begin
container.wait
rescue Interrupt
# Okay, done.
ensure
container.stop(true)
end
Console.debug "Finished."
85 changes: 85 additions & 0 deletions fixtures/async/container/a_container.rb
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,91 @@ module Container
expect(container.statistics).to have_attributes(failures: be > 0)
end
end

with "broken children" do
it "can handle children that ignore termination with SIGKILL fallback" do
# Test behavior that works for both processes (signals) and threads (exceptions)
container.spawn(restart: false) do |instance|
instance.ready!

# Ignore termination attempts in a way appropriate to the container type
if container.class.multiprocess?
# For multiprocess containers - ignore signals
Signal.trap(:INT) {}
Signal.trap(:TERM) {}
while true
sleep(0.1)
end
else
# For threaded containers - ignore exceptions
while true
begin
sleep(0.1)
rescue Async::Container::Interrupt, Async::Container::Terminate
# Ignore termination attempts
end
end
end
end

container.wait_until_ready

# Try to stop with a very short timeout to force escalation
start_time = Time.now
container.stop(0.1) # Very short timeout
end_time = Time.now

# Should stop successfully via SIGKILL/thread termination
expect(container.size).to be == 0

# Should not hang - escalation should work
expect(end_time - start_time).to be < 2.0
end

it "can handle unresponsive children that close pipes but don't exit" do
container.spawn(restart: false) do |instance|
instance.ready!

# Close communication pipe to simulate hung process:
begin
if instance.respond_to?(:out)
instance.out.close if instance.out && !instance.out.closed?
end
rescue
# Ignore close errors.
end

# Become unresponsive:
if container.class.multiprocess?
# For multiprocess containers - ignore signals and close file descriptors:
Signal.trap(:INT) {}
Signal.trap(:TERM) {}
(4..256).each do |fd|
begin
IO.for_fd(fd).close
rescue
# Ignore errors
end
end
loop {} # Tight loop
else
# For threaded containers - just become unresponsive
loop {} # Tight loop, no exception handling
end
end

container.wait_until_ready

# Should not hang even with unresponsive children
start_time = Time.now
container.stop(1.0)
end_time = Time.now

expect(container.size).to be == 0
# Should complete reasonably quickly via hang prevention
expect(end_time - start_time).to be < 5.0
end
end
end
end
end
35 changes: 25 additions & 10 deletions guides/getting-started/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,25 @@ container.wait
Console.debug "Finished."
```

### Stopping Child Processes

Containers provide three approaches for stopping child processes (or threads). When you call `container.stop()`, a progressive approach is used:

- **Interrupt** means **"Please start shutting down gracefully"**. This is the gentlest shutdown request, giving applications maximum time to finish current work and cleanup resources.

- **Terminate** means **"Shut down now"**. This is more urgent - the process should stop what it's doing and terminate promptly, but still has a chance to cleanup.

- **Kill** means **"Die immediately"**. This forcefully terminates the process with no cleanup opportunity. This is the method of last resort.

The escalation sequence follows this pattern:
1. interrupt → wait for timeout → still running?
2. terminate → wait for timeout → still running?
3. kill → process terminated.

This gives well-behaved processes multiple opportunities to shut down gracefully, while ensuring that unresponsive processes are eventually killed.

**Implementation Note:** For forked containers, these methods send Unix signals (`SIGINT`, `SIGTERM`, `SIGKILL`). For threaded containers, they use different mechanisms appropriate to threads. The container abstraction hides these implementation details.

## Controllers

The controller provides the life-cycle management for one or more containers of processes. It provides behaviour like starting, restarting, reloading and stopping. You can see some [example implementations in Falcon](https://github.com/socketry/falcon/blob/master/lib/falcon/controller/). If the process running the controller receives `SIGHUP` it will recreate the container gracefully.
Expand Down Expand Up @@ -72,16 +91,12 @@ controller.run
# If you send SIGHUP to this process, it will recreate the container.
```

## Signal Handling

`SIGINT` is the reload signal. You may send this to a program to request that it reload its configuration. The default behavior is to gracefully reload the container.

`SIGINT` is the interrupt signal. The terminal sends it to the foreground process when the user presses **ctrl-c**. The default behavior is to terminate the process, but it can be caught or ignored. The intention is to provide a mechanism for an orderly, graceful shutdown.

`SIGQUIT` is the dump core signal. The terminal sends it to the foreground process when the user presses **ctrl-\\**. The default behavior is to terminate the process and dump core, but it can be caught or ignored. The intention is to provide a mechanism for the user to abort the process. You can look at `SIGINT` as "user-initiated happy termination" and `SIGQUIT` as "user-initiated unhappy termination."
### Controller Signal Handling

`SIGTERM` is the termination signal. The default behavior is to terminate the process, but it also can be caught or ignored. The intention is to kill the process, gracefully or not, but to first allow it a chance to cleanup.
Controllers are designed to run at the process level and are therefore responsible for processing signals. When your controller process receives these signals:

`SIGKILL` is the kill signal. The only behavior is to kill the process, immediately. As the process cannot catch the signal, it cannot cleanup, and thus this is a signal of last resort.
- `SIGHUP` → Gracefully reload the container (restart with new configuration).
- `SIGINT` → Begin graceful shutdown of the entire controller and all children.
- `SIGTERM` → Begin immediate shutdown of the controller and all children.

`SIGSTOP` is the pause signal. The only behavior is to pause the process; the signal cannot be caught or ignored. The shell uses pausing (and its counterpart, resuming via `SIGCONT`) to implement job control.
Ideally, do not send `SIGKILL` to a controller, as it will immediately terminate the controller without giving it a chance to gracefully shut down child processes. This can leave orphaned processes running and prevent proper cleanup.
10 changes: 10 additions & 0 deletions lib/async/container/error.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@ def initialize
end
end

# Similar to {Terminate}, but represents `SIGKILL`.
class Kill < SignalException
SIGKILL = Signal.list["KILL"]

# Create a new kill error.
def initialize
super(SIGKILL)
end
end

# Similar to {Interrupt}, but represents `SIGHUP`.
class Restart < SignalException
SIGHUP = Signal.list["HUP"]
Expand Down
13 changes: 9 additions & 4 deletions lib/async/container/forked.rb
Original file line number Diff line number Diff line change
Expand Up @@ -231,20 +231,25 @@ def restart!
# Wait for the child process to exit.
# @asynchronous This method may block.
#
# @parameter timeout [Numeric | Nil] Maximum time to wait before forceful termination.
# @returns [::Process::Status] The process exit status.
def wait
def wait(timeout = 0.1)
if @pid && @status.nil?
Console.debug(self, "Waiting for process to exit...", pid: @pid)

_, @status = ::Process.wait2(@pid, ::Process::WNOHANG)

while @status.nil?
sleep(0.1)
if @status.nil?
sleep(timeout) if timeout

_, @status = ::Process.wait2(@pid, ::Process::WNOHANG)

if @status.nil?
Console.warn(self) {"Process #{@pid} is blocking, has it exited?"}
Console.warn(self) {"Process #{@pid} is blocking, sending kill signal..."}
self.kill!

# Wait for the process to exit:
_, @status = ::Process.wait2(@pid)
end
end
end
Expand Down
92 changes: 68 additions & 24 deletions lib/async/container/group.rb
Original file line number Diff line number Diff line change
Expand Up @@ -119,36 +119,78 @@ def terminate
end
end

# Stop all child processes using {#terminate}.
# @parameter timeout [Boolean | Numeric | Nil] If specified, invoke a graceful shutdown using {#interrupt} first.
def stop(timeout = 1)
Console.debug(self, "Stopping all processes...", timeout: timeout)
# Use a default timeout if not specified:
timeout = 1 if timeout == true
# Kill all running processes.
# This resumes the controlling fiber with an instance of {Kill}.
def kill
Console.info(self, "Sending kill to #{@running.size} running processes...")
@running.each_value do |fiber|
fiber.resume(Kill)
end
end

private def wait_for_exit(clock, timeout)
while self.any?
duration = timeout - clock.total

if duration >= 0
self.wait_for_children(duration)
else
self.wait_for_children(0)
break
end
end
end

# Stop all child processes with a multi-phase shutdown sequence.
#
# A graceful shutdown performs the following sequence:
# 1. Send SIGINT and wait up to `interrupt_timeout` seconds
# 2. Send SIGTERM and wait up to `terminate_timeout` seconds
# 3. Send SIGKILL and wait indefinitely for process cleanup
#
# If `graceful` is false, skips the SIGINT phase and goes directly to SIGTERM → SIGKILL.
#
# @parameter graceful [Boolean] Whether to send SIGINT first or skip directly to SIGTERM.
# @parameter interrupt_timeout [Numeric | Nil] Time to wait after SIGINT before escalating to SIGTERM.
# @parameter terminate_timeout [Numeric | Nil] Time to wait after SIGTERM before escalating to SIGKILL.
def stop(graceful = true, interrupt_timeout: 1, terminate_timeout: 1)
case graceful
when true
# Use defaults.
when false
interrupt_timeout = nil
when Numeric
interrupt_timeout = graceful
terminate_timeout = graceful
end

if timeout
start_time = Async::Clock.now
Console.debug(self, "Stopping all processes...", interrupt_timeout: interrupt_timeout, terminate_timeout: terminate_timeout)

# If a timeout is specified, interrupt the children first:
if interrupt_timeout
clock = Async::Clock.start

# Interrupt the children:
self.interrupt

while self.any?
duration = Async::Clock.now - start_time
remaining = timeout - duration

if remaining >= 0
self.wait_for_children(duration)
else
self.wait_for_children(0)
break
end
end
# Wait for the children to exit:
self.wait_for_exit(clock, interrupt_timeout)
end

# Terminate all children:
self.terminate if any?
if terminate_timeout
clock = Async::Clock.start

# If the children are still running, terminate them:
self.terminate

# Wait for the children to exit:
self.wait_for_exit(clock, terminate_timeout)
end

# Wait for all children to exit:
self.wait
if any?
self.kill
self.wait
end
end

# Wait for a message in the specified {Channel}.
Expand All @@ -165,6 +207,8 @@ def wait_for(channel)
channel.interrupt!
elsif result == Terminate
channel.terminate!
elsif result == Kill
channel.kill!
elsif result
yield result
elsif message = channel.receive
Expand All @@ -184,7 +228,7 @@ def wait_for_children(duration = nil)
# This log is a big noisy and doesn't really provide a lot of useful information.
# Console.debug(self, "Waiting for children...", duration: duration, running: @running)

if !@running.empty?
unless @running.empty?
# Maybe consider using a proper event loop here:
if ready = self.select(duration)
ready.each do |io|
Expand Down
14 changes: 12 additions & 2 deletions lib/async/container/threaded.rb
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,20 @@ def restart!
end

# Wait for the thread to exit and return he exit status.
# @asynchronous This method may block.
#
# @parameter timeout [Numeric | Nil] Maximum time to wait before forceful termination.
# @returns [Status]
def wait
def wait(timeout = 0.1)
if @waiter
@waiter.join
Console.debug(self, "Waiting for thread to exit...", timeout: timeout)

unless @waiter.join(timeout)
Console.warn(self) {"Thread #{@thread} is blocking, sending kill signal..."}
self.kill!
@waiter.join
end

@waiter = nil
end

Expand Down
12 changes: 12 additions & 0 deletions releases.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# Releases

## Unreleased

### Production Reliability Improvements

This release significantly improves container reliability by eliminating production hangs caused by unresponsive child processes.

**SIGKILL Fallback Support**: Containers now automatically escalate to SIGKILL when child processes ignore SIGINT and SIGTERM signals. This prevents the critical production issue where containers would hang indefinitely waiting for uncooperative processes to exit.

**Hang Prevention**: Individual child processes now have timeout-based hang prevention. If a process closes its notification pipe but doesn't actually exit, the container will detect this and escalate to SIGKILL after a reasonable timeout instead of hanging forever.

**Improved Three-Phase Shutdown**: The `Group#stop()` method now uses a cleaner interrupt → terminate → kill escalation sequence with configurable timeouts for each phase, giving well-behaved processes multiple opportunities to shut down gracefully while ensuring unresponsive processes are eventually terminated.

## v0.25.0

- Introduce `async:container:notify:log:ready?` task for detecting process readiness.
Expand Down
Loading