diff --git a/lib/async/container/group.rb b/lib/async/container/group.rb index 1cb9140..81466e8 100644 --- a/lib/async/container/group.rb +++ b/lib/async/container/group.rb @@ -100,9 +100,14 @@ def wait end end + private def each_running(&block) + # We create a copy of the values here, in case the block modifies the running set: + @running.values.each(&block) + end + # Perform a health check on all running processes. def health_check! - @running.each_value do |fiber| + each_running do |fiber| fiber.resume(:health_check!) end end @@ -111,7 +116,7 @@ def health_check! # This resumes the controlling fiber with an instance of {Interrupt}. def interrupt Console.info(self, "Sending interrupt to #{@running.size} running processes...") - @running.each_value do |fiber| + each_running do |fiber| fiber.resume(Interrupt) end end @@ -120,7 +125,7 @@ def interrupt # This resumes the controlling fiber with an instance of {Terminate}. def terminate Console.info(self, "Sending terminate to #{@running.size} running processes...") - @running.each_value do |fiber| + each_running do |fiber| fiber.resume(Terminate) end end @@ -129,7 +134,7 @@ def terminate # This resumes the controlling fiber with an instance of {Kill}. def kill Console.info(self, "Sending kill to #{@running.size} running processes...") - @running.each_value do |fiber| + each_running do |fiber| fiber.resume(Kill) end end diff --git a/releases.md b/releases.md index 7b049b1..8dc28f7 100644 --- a/releases.md +++ b/releases.md @@ -1,5 +1,9 @@ # Releases +## Unreleased + + - Fix race condition where `wait_for` could modify `@running` while it was being iterated over (`each_value`) during health checks. + ## v0.27.3 - Add log for starting child, including container statistics. diff --git a/test/async/container/group.rb b/test/async/container/group.rb new file mode 100644 index 0000000..896f0a7 --- /dev/null +++ b/test/async/container/group.rb @@ -0,0 +1,275 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2025, by Samuel Williams. + +require "async/container/group" +require "async/container/channel" + +describe Async::Container::Group do + let(:group) {Async::Container::Group.new} + + with "#size" do + it "returns zero for empty group" do + expect(group.size).to be == 0 + end + + it "returns the number of running processes" do + channel1 = Async::Container::Channel.new + channel2 = Async::Container::Channel.new + + fiber1 = Fiber.new {Fiber.yield} + fiber2 = Fiber.new {Fiber.yield} + + fiber1.resume + fiber2.resume + + group.running[channel1.in] = fiber1 + group.running[channel2.in] = fiber2 + + expect(group.size).to be == 2 + end + end + + with "#running?" do + it "returns false for empty group" do + expect(group).not.to be(:running?) + end + + it "returns true when processes are running" do + channel = Async::Container::Channel.new + fiber = Fiber.new {Fiber.yield} + fiber.resume + + group.running[channel.in] = fiber + + expect(group).to be(:running?) + end + end + + with "#any?" do + it "returns false for empty group" do + expect(group).not.to be(:any?) + end + + it "returns true when processes are running" do + channel = Async::Container::Channel.new + fiber = Fiber.new {Fiber.yield} + fiber.resume + + group.running[channel.in] = fiber + + expect(group).to be(:any?) + end + end + + with "#empty?" do + it "returns true for empty group" do + expect(group).to be(:empty?) + end + + it "returns false when processes are running" do + channel = Async::Container::Channel.new + fiber = Fiber.new {Fiber.yield} + fiber.resume + + group.running[channel.in] = fiber + + expect(group).not.to be(:empty?) + end + end + + with "#inspect" do + it "provides human-readable representation" do + expect(group.inspect).to be =~ /Async::Container::Group/ + expect(group.inspect).to be =~ /running=0/ + end + + it "shows the number of running processes" do + channel = Async::Container::Channel.new + fiber = Fiber.new {Fiber.yield} + fiber.resume + + group.running[channel.in] = fiber + + expect(group.inspect).to be =~ /running=1/ + end + end + + with "#health_check!" do + it "resumes all fibers with :health_check! message" do + messages = [] + + 2.times do + channel = Async::Container::Channel.new + fiber = Fiber.new do + result = Fiber.yield + messages << result + end + + fiber.resume + group.running[channel.in] = fiber + end + + group.health_check! + + expect(messages).to be == [:health_check!, :health_check!] + end + + it "does nothing for empty group" do + expect do + group.health_check! + end.not.to raise_exception + end + end + + with "#interrupt" do + it "resumes all fibers with Interrupt" do + messages = [] + + 2.times do + channel = Async::Container::Channel.new + fiber = Fiber.new do + result = Fiber.yield + messages << result + end + + fiber.resume + group.running[channel.in] = fiber + end + + group.interrupt + + expect(messages).to be == [Async::Container::Interrupt, Async::Container::Interrupt] + end + end + + with "#terminate" do + it "resumes all fibers with Terminate" do + messages = [] + + 2.times do + channel = Async::Container::Channel.new + fiber = Fiber.new do + result = Fiber.yield + messages << result + end + + fiber.resume + group.running[channel.in] = fiber + end + + group.terminate + + expect(messages).to be == [Async::Container::Terminate, Async::Container::Terminate] + end + end + + with "#kill" do + it "resumes all fibers with Kill" do + messages = [] + + 2.times do + channel = Async::Container::Channel.new + fiber = Fiber.new do + result = Fiber.yield + messages << result + end + + fiber.resume + group.running[channel.in] = fiber + end + + group.kill + + expect(messages).to be == [Async::Container::Kill, Async::Container::Kill] + end + end + + # Regression test for a bug where restarting a child during health check caused + # "RuntimeError: can't add a new key into hash during iteration" + # + # The scenario: + # - A container spawns children with `restart: true` and `health_check_timeout: N` + # - health_check! calls @running.each_value { |fiber| fiber.resume(:health_check!) } + # - A resumed fiber detects health check failure and kills the child + # - The spawn fiber's while loop continues (restart: true) and calls wait_for with a new child + # - wait_for tries to add to @running while health_check! is still iterating + # - This used to cause: RuntimeError: can't add a new key into hash during iteration + it "can restart child during health_check! iteration without error" do + channel1 = Async::Container::Channel.new + channel2 = Async::Container::Channel.new + + # Simulate the spawn fiber that restarts on health check failure + restart = true + fiber = Fiber.new do + while restart + result = Fiber.yield # Wait to be resumed + + if result == :health_check! + # Health check failed! Simulate the restart logic: + # The wait_for will return (simulated by breaking from this iteration) + # and the while loop continues, creating a new child + + # Simulate: child.kill! happens, wait_for returns + # Now the while loop continues and calls wait_for with new child + Fiber.new do + group.wait_for(channel2) do |msg| + # New child waiting + end + end.resume + + restart = false # Only do this once for the test + end + end + end + + # Start the fiber and add it to @running (simulating first wait_for call) + fiber.resume + group.running[channel1.in] = fiber + + # The fix ensures this doesn't raise RuntimeError during iteration + expect do + group.health_check! + end.not.to raise_exception + end + + # Regression test with multiple children where one restarts during health check + it "can handle one of multiple children restarting during health check" do + # Create two children, both with restart capability + 2.times do |i| + channel = Async::Container::Channel.new + + fiber = Fiber.new do + iteration = 0 + loop do + iteration += 1 + result = Fiber.yield + + # First child fails health check on first iteration + if i == 0 && iteration == 1 && result == :health_check! + # Simulate health check failure and restart: + # Kill the old child, create new one + new_channel = Async::Container::Channel.new + + # This mimics what happens in spawn's while @running loop + # after wait_for returns due to child being killed + group.wait_for(new_channel) do |msg| + # New child process + end + + break # Exit this child's loop + end + end + end + + fiber.resume + group.running[channel.in] = fiber + end + + # The fix ensures this doesn't raise RuntimeError when the first fiber restarts + expect do + group.health_check! + end.not.to raise_exception + end +end