Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions fixtures/async/container/a_container.rb
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,39 @@ module Container
expect(container).not.to be(:running?)
end
end

with "health_check_timeout:" do
let(:container) {subject.new(health_check_interval: 1.0)}

it "should not terminate a child process if it updates its state within the specified time" do
# We use #run here to hit the Hybrid container code path:
container.run(count: 1, health_check_timeout: 1.0) do |instance|
instance.ready!

10.times do
instance.ready!
sleep(0.5)
end
end

container.wait

expect(container.statistics).to have_attributes(failures: be == 0)
end

it "can terminate a child process if it does not update its state within the specified time" do
container.spawn(health_check_timeout: 1.0) do |instance|
instance.ready!

# This should trigger the health check - since restart is false, the process will be terminated:
sleep(2.0)
end

container.wait

expect(container.statistics).to have_attributes(failures: be > 0)
end
end
end
end
end
4 changes: 3 additions & 1 deletion lib/async/container/generic.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def self.run(*arguments, **options, &block)
UNNAMED = "Unnamed"

def initialize(**options)
@group = Group.new
@group = Group.new(**options)
@running = true

@state = {}
Expand All @@ -48,8 +48,10 @@ def initialize(**options)
@keyed = {}
end

# @attribute [Group] The group of running children instances.
attr :group

# @attribute [Hash(Child, Hash)] The state of each child instance.
attr :state

# A human readable representation of the container.
Expand Down
6 changes: 4 additions & 2 deletions lib/async/container/hybrid.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ class Hybrid < Forked
# @parameter count [Integer] The number of instances to start.
# @parameter forks [Integer] The number of processes to fork.
# @parameter threads [Integer] the number of threads to start.
def run(count: nil, forks: nil, threads: nil, **options, &block)
# @parameter health_check_timeout [Numeric] The timeout for health checks, in seconds. Passed into the child {Threaded} containers.
def run(count: nil, forks: nil, threads: nil, health_check_timeout: nil, **options, &block)
processor_count = Container.processor_count
count ||= processor_count ** 2
forks ||= [processor_count, count].min
Expand All @@ -25,7 +26,7 @@ def run(count: nil, forks: nil, threads: nil, **options, &block)
self.spawn(**options) do |instance|
container = Threaded.new

container.run(count: threads, **options, &block)
container.run(count: threads, health_check_timeout: health_check_timeout, **options, &block)

container.wait_until_ready
instance.ready!
Expand All @@ -34,6 +35,7 @@ def run(count: nil, forks: nil, threads: nil, **options, &block)
rescue Async::Container::Terminate
# Stop it immediately:
container.stop(false)
raise
ensure
# Stop it gracefully (also code path for Interrupt):
container.stop
Expand Down
6 changes: 3 additions & 3 deletions test/async/container/controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,18 @@ def controller.setup(container)
container.spawn(key: "test") do |instance|
instance.ready!

sleep(0.02)
sleep(0.2)

@output.write(".")
@output.flush

sleep(0.04)
sleep(0.4)
end

container.spawn do |instance|
instance.ready!

sleep(0.03)
sleep(0.3)

@output.write(",")
@output.flush
Expand Down
Loading