diff --git a/fixtures/async/container/a_container.rb b/fixtures/async/container/a_container.rb index 5e98148..d28a339 100644 --- a/fixtures/async/container/a_container.rb +++ b/fixtures/async/container/a_container.rb @@ -176,6 +176,39 @@ module Container expect(container).not.to be(:running?) end end + + with "health_check_timeout:" do + let(:container) {subject.new(health_check_interval: 1.0)} + + it "should not terminate a child process if it updates its state within the specified time" do + # We use #run here to hit the Hybrid container code path: + container.run(count: 1, health_check_timeout: 1.0) do |instance| + instance.ready! + + 10.times do + instance.ready! + sleep(0.5) + end + end + + container.wait + + expect(container.statistics).to have_attributes(failures: be == 0) + end + + it "can terminate a child process if it does not update its state within the specified time" do + container.spawn(health_check_timeout: 1.0) do |instance| + instance.ready! + + # This should trigger the health check - since restart is false, the process will be terminated: + sleep(2.0) + end + + container.wait + + expect(container.statistics).to have_attributes(failures: be > 0) + end + end end end end diff --git a/lib/async/container/generic.rb b/lib/async/container/generic.rb index 180ca7d..0e57d70 100644 --- a/lib/async/container/generic.rb +++ b/lib/async/container/generic.rb @@ -39,7 +39,7 @@ def self.run(*arguments, **options, &block) UNNAMED = "Unnamed" def initialize(**options) - @group = Group.new + @group = Group.new(**options) @running = true @state = {} @@ -48,8 +48,10 @@ def initialize(**options) @keyed = {} end + # @attribute [Group] The group of running children instances. attr :group + # @attribute [Hash(Child, Hash)] The state of each child instance. attr :state # A human readable representation of the container. diff --git a/lib/async/container/hybrid.rb b/lib/async/container/hybrid.rb index ea42f60..537eaf4 100644 --- a/lib/async/container/hybrid.rb +++ b/lib/async/container/hybrid.rb @@ -15,7 +15,8 @@ class Hybrid < Forked # @parameter count [Integer] The number of instances to start. # @parameter forks [Integer] The number of processes to fork. # @parameter threads [Integer] the number of threads to start. - def run(count: nil, forks: nil, threads: nil, **options, &block) + # @parameter health_check_timeout [Numeric] The timeout for health checks, in seconds. Passed into the child {Threaded} containers. + def run(count: nil, forks: nil, threads: nil, health_check_timeout: nil, **options, &block) processor_count = Container.processor_count count ||= processor_count ** 2 forks ||= [processor_count, count].min @@ -25,7 +26,7 @@ def run(count: nil, forks: nil, threads: nil, **options, &block) self.spawn(**options) do |instance| container = Threaded.new - container.run(count: threads, **options, &block) + container.run(count: threads, health_check_timeout: health_check_timeout, **options, &block) container.wait_until_ready instance.ready! @@ -34,6 +35,7 @@ def run(count: nil, forks: nil, threads: nil, **options, &block) rescue Async::Container::Terminate # Stop it immediately: container.stop(false) + raise ensure # Stop it gracefully (also code path for Interrupt): container.stop diff --git a/test/async/container/controller.rb b/test/async/container/controller.rb index 2f59a37..c5a0353 100644 --- a/test/async/container/controller.rb +++ b/test/async/container/controller.rb @@ -25,18 +25,18 @@ def controller.setup(container) container.spawn(key: "test") do |instance| instance.ready! - sleep(0.02) + sleep(0.2) @output.write(".") @output.flush - sleep(0.04) + sleep(0.4) end container.spawn do |instance| instance.ready! - sleep(0.03) + sleep(0.3) @output.write(",") @output.flush