diff --git a/examples/container.rb b/examples/container.rb index 55e9217..6089852 100755 --- a/examples/container.rb +++ b/examples/container.rb @@ -6,22 +6,33 @@ # Copyright, 2019, by Yuji Yaginuma. # Copyright, 2022, by Anton Sozontov. -require "../lib/async/container" +require_relative "../lib/async/container" Console.logger.debug! container = Async::Container.new -Console.debug "Spawning 2 containers..." +Console.debug "Spawning 2 children..." 2.times do - container.spawn do |task| - Console.debug task, "Sleeping..." - sleep(2) - Console.debug task, "Waking up!" + container.spawn do |instance| + Signal.trap(:INT) {} + Signal.trap(:TERM) {} + + Console.debug instance, "Sleeping..." + while true + sleep + end + Console.debug instance, "Waking up!" end end Console.debug "Waiting for container..." -container.wait +begin + container.wait +rescue Interrupt + # Okay, done. +ensure + container.stop(true) +end Console.debug "Finished." diff --git a/fixtures/async/container/a_container.rb b/fixtures/async/container/a_container.rb index 389f82f..d8b604c 100644 --- a/fixtures/async/container/a_container.rb +++ b/fixtures/async/container/a_container.rb @@ -246,6 +246,91 @@ module Container expect(container.statistics).to have_attributes(failures: be > 0) end end + + with "broken children" do + it "can handle children that ignore termination with SIGKILL fallback" do + # Test behavior that works for both processes (signals) and threads (exceptions) + container.spawn(restart: false) do |instance| + instance.ready! + + # Ignore termination attempts in a way appropriate to the container type + if container.class.multiprocess? + # For multiprocess containers - ignore signals + Signal.trap(:INT) {} + Signal.trap(:TERM) {} + while true + sleep(0.1) + end + else + # For threaded containers - ignore exceptions + while true + begin + sleep(0.1) + rescue Async::Container::Interrupt, Async::Container::Terminate + # Ignore termination attempts + end + end + end + end + + container.wait_until_ready + + # Try to stop with a very short timeout to force escalation + start_time = Time.now + container.stop(0.1) # Very short timeout + end_time = Time.now + + # Should stop successfully via SIGKILL/thread termination + expect(container.size).to be == 0 + + # Should not hang - escalation should work + expect(end_time - start_time).to be < 2.0 + end + + it "can handle unresponsive children that close pipes but don't exit" do + container.spawn(restart: false) do |instance| + instance.ready! + + # Close communication pipe to simulate hung process: + begin + if instance.respond_to?(:out) + instance.out.close if instance.out && !instance.out.closed? + end + rescue + # Ignore close errors. + end + + # Become unresponsive: + if container.class.multiprocess? + # For multiprocess containers - ignore signals and close file descriptors: + Signal.trap(:INT) {} + Signal.trap(:TERM) {} + (4..256).each do |fd| + begin + IO.for_fd(fd).close + rescue + # Ignore errors + end + end + loop {} # Tight loop + else + # For threaded containers - just become unresponsive + loop {} # Tight loop, no exception handling + end + end + + container.wait_until_ready + + # Should not hang even with unresponsive children + start_time = Time.now + container.stop(1.0) + end_time = Time.now + + expect(container.size).to be == 0 + # Should complete reasonably quickly via hang prevention + expect(end_time - start_time).to be < 5.0 + end + end end end end diff --git a/guides/getting-started/readme.md b/guides/getting-started/readme.md index 32fa693..a2b4fa4 100644 --- a/guides/getting-started/readme.md +++ b/guides/getting-started/readme.md @@ -39,6 +39,25 @@ container.wait Console.debug "Finished." ``` +### Stopping Child Processes + +Containers provide three approaches for stopping child processes (or threads). When you call `container.stop()`, a progressive approach is used: + +- **Interrupt** means **"Please start shutting down gracefully"**. This is the gentlest shutdown request, giving applications maximum time to finish current work and cleanup resources. + +- **Terminate** means **"Shut down now"**. This is more urgent - the process should stop what it's doing and terminate promptly, but still has a chance to cleanup. + +- **Kill** means **"Die immediately"**. This forcefully terminates the process with no cleanup opportunity. This is the method of last resort. + +The escalation sequence follows this pattern: +1. interrupt → wait for timeout → still running? +2. terminate → wait for timeout → still running? +3. kill → process terminated. + +This gives well-behaved processes multiple opportunities to shut down gracefully, while ensuring that unresponsive processes are eventually killed. + +**Implementation Note:** For forked containers, these methods send Unix signals (`SIGINT`, `SIGTERM`, `SIGKILL`). For threaded containers, they use different mechanisms appropriate to threads. The container abstraction hides these implementation details. + ## Controllers The controller provides the life-cycle management for one or more containers of processes. It provides behaviour like starting, restarting, reloading and stopping. You can see some [example implementations in Falcon](https://github.com/socketry/falcon/blob/master/lib/falcon/controller/). If the process running the controller receives `SIGHUP` it will recreate the container gracefully. @@ -72,16 +91,12 @@ controller.run # If you send SIGHUP to this process, it will recreate the container. ``` -## Signal Handling - -`SIGINT` is the reload signal. You may send this to a program to request that it reload its configuration. The default behavior is to gracefully reload the container. - -`SIGINT` is the interrupt signal. The terminal sends it to the foreground process when the user presses **ctrl-c**. The default behavior is to terminate the process, but it can be caught or ignored. The intention is to provide a mechanism for an orderly, graceful shutdown. - -`SIGQUIT` is the dump core signal. The terminal sends it to the foreground process when the user presses **ctrl-\\**. The default behavior is to terminate the process and dump core, but it can be caught or ignored. The intention is to provide a mechanism for the user to abort the process. You can look at `SIGINT` as "user-initiated happy termination" and `SIGQUIT` as "user-initiated unhappy termination." +### Controller Signal Handling -`SIGTERM` is the termination signal. The default behavior is to terminate the process, but it also can be caught or ignored. The intention is to kill the process, gracefully or not, but to first allow it a chance to cleanup. +Controllers are designed to run at the process level and are therefore responsible for processing signals. When your controller process receives these signals: -`SIGKILL` is the kill signal. The only behavior is to kill the process, immediately. As the process cannot catch the signal, it cannot cleanup, and thus this is a signal of last resort. +- `SIGHUP` → Gracefully reload the container (restart with new configuration). +- `SIGINT` → Begin graceful shutdown of the entire controller and all children. +- `SIGTERM` → Begin immediate shutdown of the controller and all children. -`SIGSTOP` is the pause signal. The only behavior is to pause the process; the signal cannot be caught or ignored. The shell uses pausing (and its counterpart, resuming via `SIGCONT`) to implement job control. +Ideally, do not send `SIGKILL` to a controller, as it will immediately terminate the controller without giving it a chance to gracefully shut down child processes. This can leave orphaned processes running and prevent proper cleanup. diff --git a/lib/async/container/error.rb b/lib/async/container/error.rb index 0758cae..299058d 100644 --- a/lib/async/container/error.rb +++ b/lib/async/container/error.rb @@ -21,6 +21,16 @@ def initialize end end + # Similar to {Terminate}, but represents `SIGKILL`. + class Kill < SignalException + SIGKILL = Signal.list["KILL"] + + # Create a new kill error. + def initialize + super(SIGKILL) + end + end + # Similar to {Interrupt}, but represents `SIGHUP`. class Restart < SignalException SIGHUP = Signal.list["HUP"] diff --git a/lib/async/container/forked.rb b/lib/async/container/forked.rb index c94f8f1..26ea627 100644 --- a/lib/async/container/forked.rb +++ b/lib/async/container/forked.rb @@ -231,20 +231,25 @@ def restart! # Wait for the child process to exit. # @asynchronous This method may block. # + # @parameter timeout [Numeric | Nil] Maximum time to wait before forceful termination. # @returns [::Process::Status] The process exit status. - def wait + def wait(timeout = 0.1) if @pid && @status.nil? Console.debug(self, "Waiting for process to exit...", pid: @pid) _, @status = ::Process.wait2(@pid, ::Process::WNOHANG) - while @status.nil? - sleep(0.1) + if @status.nil? + sleep(timeout) if timeout _, @status = ::Process.wait2(@pid, ::Process::WNOHANG) if @status.nil? - Console.warn(self) {"Process #{@pid} is blocking, has it exited?"} + Console.warn(self) {"Process #{@pid} is blocking, sending kill signal..."} + self.kill! + + # Wait for the process to exit: + _, @status = ::Process.wait2(@pid) end end end diff --git a/lib/async/container/group.rb b/lib/async/container/group.rb index d7c34f8..aa5836c 100644 --- a/lib/async/container/group.rb +++ b/lib/async/container/group.rb @@ -119,36 +119,78 @@ def terminate end end - # Stop all child processes using {#terminate}. - # @parameter timeout [Boolean | Numeric | Nil] If specified, invoke a graceful shutdown using {#interrupt} first. - def stop(timeout = 1) - Console.debug(self, "Stopping all processes...", timeout: timeout) - # Use a default timeout if not specified: - timeout = 1 if timeout == true + # Kill all running processes. + # This resumes the controlling fiber with an instance of {Kill}. + def kill + Console.info(self, "Sending kill to #{@running.size} running processes...") + @running.each_value do |fiber| + fiber.resume(Kill) + end + end + + private def wait_for_exit(clock, timeout) + while self.any? + duration = timeout - clock.total + + if duration >= 0 + self.wait_for_children(duration) + else + self.wait_for_children(0) + break + end + end + end + + # Stop all child processes with a multi-phase shutdown sequence. + # + # A graceful shutdown performs the following sequence: + # 1. Send SIGINT and wait up to `interrupt_timeout` seconds + # 2. Send SIGTERM and wait up to `terminate_timeout` seconds + # 3. Send SIGKILL and wait indefinitely for process cleanup + # + # If `graceful` is false, skips the SIGINT phase and goes directly to SIGTERM → SIGKILL. + # + # @parameter graceful [Boolean] Whether to send SIGINT first or skip directly to SIGTERM. + # @parameter interrupt_timeout [Numeric | Nil] Time to wait after SIGINT before escalating to SIGTERM. + # @parameter terminate_timeout [Numeric | Nil] Time to wait after SIGTERM before escalating to SIGKILL. + def stop(graceful = true, interrupt_timeout: 1, terminate_timeout: 1) + case graceful + when true + # Use defaults. + when false + interrupt_timeout = nil + when Numeric + interrupt_timeout = graceful + terminate_timeout = graceful + end - if timeout - start_time = Async::Clock.now + Console.debug(self, "Stopping all processes...", interrupt_timeout: interrupt_timeout, terminate_timeout: terminate_timeout) + + # If a timeout is specified, interrupt the children first: + if interrupt_timeout + clock = Async::Clock.start + # Interrupt the children: self.interrupt - while self.any? - duration = Async::Clock.now - start_time - remaining = timeout - duration - - if remaining >= 0 - self.wait_for_children(duration) - else - self.wait_for_children(0) - break - end - end + # Wait for the children to exit: + self.wait_for_exit(clock, interrupt_timeout) end - # Terminate all children: - self.terminate if any? + if terminate_timeout + clock = Async::Clock.start + + # If the children are still running, terminate them: + self.terminate + + # Wait for the children to exit: + self.wait_for_exit(clock, terminate_timeout) + end - # Wait for all children to exit: - self.wait + if any? + self.kill + self.wait + end end # Wait for a message in the specified {Channel}. @@ -165,6 +207,8 @@ def wait_for(channel) channel.interrupt! elsif result == Terminate channel.terminate! + elsif result == Kill + channel.kill! elsif result yield result elsif message = channel.receive @@ -184,7 +228,7 @@ def wait_for_children(duration = nil) # This log is a big noisy and doesn't really provide a lot of useful information. # Console.debug(self, "Waiting for children...", duration: duration, running: @running) - if !@running.empty? + unless @running.empty? # Maybe consider using a proper event loop here: if ready = self.select(duration) ready.each do |io| diff --git a/lib/async/container/threaded.rb b/lib/async/container/threaded.rb index 3b9595b..7c3f2f6 100644 --- a/lib/async/container/threaded.rb +++ b/lib/async/container/threaded.rb @@ -216,10 +216,20 @@ def restart! end # Wait for the thread to exit and return he exit status. + # @asynchronous This method may block. + # + # @parameter timeout [Numeric | Nil] Maximum time to wait before forceful termination. # @returns [Status] - def wait + def wait(timeout = 0.1) if @waiter - @waiter.join + Console.debug(self, "Waiting for thread to exit...", timeout: timeout) + + unless @waiter.join(timeout) + Console.warn(self) {"Thread #{@thread} is blocking, sending kill signal..."} + self.kill! + @waiter.join + end + @waiter = nil end diff --git a/releases.md b/releases.md index 06f5e86..939e16a 100644 --- a/releases.md +++ b/releases.md @@ -1,5 +1,17 @@ # Releases +## Unreleased + +### Production Reliability Improvements + +This release significantly improves container reliability by eliminating production hangs caused by unresponsive child processes. + +**SIGKILL Fallback Support**: Containers now automatically escalate to SIGKILL when child processes ignore SIGINT and SIGTERM signals. This prevents the critical production issue where containers would hang indefinitely waiting for uncooperative processes to exit. + +**Hang Prevention**: Individual child processes now have timeout-based hang prevention. If a process closes its notification pipe but doesn't actually exit, the container will detect this and escalate to SIGKILL after a reasonable timeout instead of hanging forever. + +**Improved Three-Phase Shutdown**: The `Group#stop()` method now uses a cleaner interrupt → terminate → kill escalation sequence with configurable timeouts for each phase, giving well-behaved processes multiple opportunities to shut down gracefully while ensuring unresponsive processes are eventually terminated. + ## v0.25.0 - Introduce `async:container:notify:log:ready?` task for detecting process readiness.