From 2f9b70685f67dc7171eba1672ad46715adcaeb32 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Thu, 20 Feb 2025 11:50:38 +1300 Subject: [PATCH 1/3] Pass through options to `Group`. --- lib/async/container/generic.rb | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/async/container/generic.rb b/lib/async/container/generic.rb index 180ca7d..0e57d70 100644 --- a/lib/async/container/generic.rb +++ b/lib/async/container/generic.rb @@ -39,7 +39,7 @@ def self.run(*arguments, **options, &block) UNNAMED = "Unnamed" def initialize(**options) - @group = Group.new + @group = Group.new(**options) @running = true @state = {} @@ -48,8 +48,10 @@ def initialize(**options) @keyed = {} end + # @attribute [Group] The group of running children instances. attr :group + # @attribute [Hash(Child, Hash)] The state of each child instance. attr :state # A human readable representation of the container. From 9f316b982c14dfb4ae5941d28b9c72492a8b0bba Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Thu, 20 Feb 2025 11:49:50 +1300 Subject: [PATCH 2/3] Fix Hybrid health check handling. --- fixtures/async/container/a_container.rb | 33 +++++++++++++++++++++++++ lib/async/container/hybrid.rb | 6 +++-- 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/fixtures/async/container/a_container.rb b/fixtures/async/container/a_container.rb index 5e98148..4ff911c 100644 --- a/fixtures/async/container/a_container.rb +++ b/fixtures/async/container/a_container.rb @@ -176,6 +176,39 @@ module Container expect(container).not.to be(:running?) end end + + with "health_check_timeout:" do + let(:container) {subject.new(health_check_interval: 0.01)} + + it "should not terminate a child process if it updates its state within the specified time" do + # We use #run here to hit the Hybrid container code path: + container.run(count: 1, health_check_timeout: 0.02) do |instance| + instance.ready! + + 10.times do + instance.ready! + sleep(0.01) + end + end + + container.wait + + expect(container.statistics).to have_attributes(failures: be == 0) + end + + it "can terminate a child process if it does not update its state within the specified time" do + container.spawn(health_check_timeout: 0.01) do |instance| + instance.ready! + + # This should trigger the health check - since restart is false, the process will be terminated: + sleep(1) + end + + container.wait + + expect(container.statistics).to have_attributes(failures: be > 0) + end + end end end end diff --git a/lib/async/container/hybrid.rb b/lib/async/container/hybrid.rb index ea42f60..537eaf4 100644 --- a/lib/async/container/hybrid.rb +++ b/lib/async/container/hybrid.rb @@ -15,7 +15,8 @@ class Hybrid < Forked # @parameter count [Integer] The number of instances to start. # @parameter forks [Integer] The number of processes to fork. # @parameter threads [Integer] the number of threads to start. - def run(count: nil, forks: nil, threads: nil, **options, &block) + # @parameter health_check_timeout [Numeric] The timeout for health checks, in seconds. Passed into the child {Threaded} containers. + def run(count: nil, forks: nil, threads: nil, health_check_timeout: nil, **options, &block) processor_count = Container.processor_count count ||= processor_count ** 2 forks ||= [processor_count, count].min @@ -25,7 +26,7 @@ def run(count: nil, forks: nil, threads: nil, **options, &block) self.spawn(**options) do |instance| container = Threaded.new - container.run(count: threads, **options, &block) + container.run(count: threads, health_check_timeout: health_check_timeout, **options, &block) container.wait_until_ready instance.ready! @@ -34,6 +35,7 @@ def run(count: nil, forks: nil, threads: nil, **options, &block) rescue Async::Container::Terminate # Stop it immediately: container.stop(false) + raise ensure # Stop it gracefully (also code path for Interrupt): container.stop From 316bc9e6ed08110dd6e500af2e5e0870334e8f54 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Thu, 20 Feb 2025 12:31:57 +1300 Subject: [PATCH 3/3] Relax timing. --- fixtures/async/container/a_container.rb | 10 +++++----- test/async/container/controller.rb | 6 +++--- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/fixtures/async/container/a_container.rb b/fixtures/async/container/a_container.rb index 4ff911c..d28a339 100644 --- a/fixtures/async/container/a_container.rb +++ b/fixtures/async/container/a_container.rb @@ -178,16 +178,16 @@ module Container end with "health_check_timeout:" do - let(:container) {subject.new(health_check_interval: 0.01)} + let(:container) {subject.new(health_check_interval: 1.0)} it "should not terminate a child process if it updates its state within the specified time" do # We use #run here to hit the Hybrid container code path: - container.run(count: 1, health_check_timeout: 0.02) do |instance| + container.run(count: 1, health_check_timeout: 1.0) do |instance| instance.ready! 10.times do instance.ready! - sleep(0.01) + sleep(0.5) end end @@ -197,11 +197,11 @@ module Container end it "can terminate a child process if it does not update its state within the specified time" do - container.spawn(health_check_timeout: 0.01) do |instance| + container.spawn(health_check_timeout: 1.0) do |instance| instance.ready! # This should trigger the health check - since restart is false, the process will be terminated: - sleep(1) + sleep(2.0) end container.wait diff --git a/test/async/container/controller.rb b/test/async/container/controller.rb index 2f59a37..c5a0353 100644 --- a/test/async/container/controller.rb +++ b/test/async/container/controller.rb @@ -25,18 +25,18 @@ def controller.setup(container) container.spawn(key: "test") do |instance| instance.ready! - sleep(0.02) + sleep(0.2) @output.write(".") @output.flush - sleep(0.04) + sleep(0.4) end container.spawn do |instance| instance.ready! - sleep(0.03) + sleep(0.3) @output.write(",") @output.flush