[cluster launcher] Fix ray down not stopping Docker containers on worker nodes for local clusters#62169
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces the get_all_node_ids method to the NodeProvider interface and implements it for LocalNodeProvider to ensure Docker containers are correctly stopped during teardown, even when local state is out of sync. The teardown logic is updated to use this method, and unit tests are added to verify the fix. Reviewer feedback suggests refactoring duplicated logic into a helper, using Pythonic list comprehensions for node filtering, correcting an invalid IP address in tests, and improving test robustness with set-based assertions.
| docker_workers = provider.get_all_node_ids( | ||
| {TAG_RAY_NODE_KIND: NODE_KIND_WORKER} | ||
| ) | ||
| if keep_min_workers: | ||
| min_workers = config.get("min_workers", 0) | ||
| if len(docker_workers) > min_workers: | ||
| docker_workers = random.sample( | ||
| docker_workers, len(docker_workers) - min_workers | ||
| ) | ||
| if workers_only: | ||
| docker_stop_nodes = docker_workers | ||
| else: | ||
| docker_heads = provider.get_all_node_ids( | ||
| {TAG_RAY_NODE_KIND: NODE_KIND_HEAD} | ||
| ) | ||
| docker_stop_nodes = docker_heads + docker_workers |
There was a problem hiding this comment.
This logic for determining which nodes to stop is very similar to the remaining_nodes function defined earlier in this file. This duplication could be avoided by abstracting the common logic into a helper function that accepts the node-retrieval function (e.g., provider.get_all_node_ids or provider.non_terminated_nodes) as an argument. This would improve maintainability and reduce code duplication. While refactoring remaining_nodes is outside the scope of this change, creating a helper here would be a good first step.
| workers = self.state.get() | ||
| matching_ips = [] | ||
| for worker_ip, info in workers.items(): | ||
| ok = True | ||
| for k, v in tag_filters.items(): | ||
| if info["tags"].get(k) != v: | ||
| ok = False | ||
| break | ||
| if ok: | ||
| matching_ips.append(worker_ip) | ||
| return matching_ips |
| "type": "local", | ||
| "head_ip": head_ip, | ||
| "worker_ips": worker_ips, | ||
| "external_head_ip": "0.0.0.0.3", |
There was a problem hiding this comment.
| assert provider.get_all_node_ids( | ||
| {TAG_RAY_NODE_KIND: NODE_KIND_HEAD} | ||
| ) == [head_ip] |
There was a problem hiding this comment.
The order of nodes returned by get_all_node_ids is not guaranteed, as it depends on dictionary iteration order. To make this assertion more robust, it's better to compare sets instead of lists.
| assert provider.get_all_node_ids( | |
| {TAG_RAY_NODE_KIND: NODE_KIND_HEAD} | |
| ) == [head_ip] | |
| assert set(provider.get_all_node_ids( | |
| {TAG_RAY_NODE_KIND: NODE_KIND_HEAD} | |
| )) == {head_ip} |
711a74c to
54d5082
Compare
76b4943 to
c1c436f
Compare
c1c436f to
af7e115
Compare
|
Hi, @edoakes |
|
@dev-miro26 thanks for the contribution. @rueian will review the PR shortly |
| matching_ips.append(worker_ip) | ||
| return matching_ips | ||
|
|
||
| def get_all_node_ids(self, tag_filters): |
There was a problem hiding this comment.
I think this should be named as something like nodes_for_teardown to make it clear that this is only for the teardown function.
| # started them and their Docker containers are still running. | ||
| # For cloud providers this adds nothing because get_all_node_ids | ||
| # delegates to non_terminated_nodes. | ||
| stale_terminated = set(provider.get_all_node_ids({})) - set( |
There was a problem hiding this comment.
Shouldn't we just fix the remaining_nodes function with our new method on the node provider? Do we really need the changes here?
ca65bd7 to
bd568a0
Compare
…orker nodes for local clusters `ray down` on an SSH Docker cluster stops the head container but skips workers. The root cause is that LocalNodeProvider on the invoking machine maintains a separate state file from the head node's autoscaler — workers are never marked as running in the local file, so `non_terminated_nodes` returns an empty list and the `docker stop` loop has nothing to iterate. Add `NodeProvider.get_all_node_ids(tag_filters)` which includes terminated nodes. The base class delegates to `non_terminated_nodes()` (no change for cloud providers). `LocalNodeProvider` overrides it to skip the terminated filter. `teardown_cluster` now uses this for the Docker stop phase. Extract `_collect_nodes(node_retrieval_fn)` helper to deduplicate the worker/head selection logic between `remaining_nodes()` and the Docker stop target list. Made-with: Cursor Signed-off-by: dev-miro26 <devmiro26@gmail.com>
… assertions, avoid double-sampling Signed-off-by: dev-miro26 <devmiro26@gmail.com>
…ress format Signed-off-by: dev-miro26 <devmiro26@gmail.com>
…rkers by setting workers to an empty list. This ensures proper cleanup during cluster teardown. Signed-off-by: dev-miro26 <devmiro26@gmail.com>
Signed-off-by: dev-miro26 <devmiro26@gmail.com>
…Update references in the teardown_cluster function and related tests to reflect this change, ensuring functionality remains intact for node identification during teardown processes. Signed-off-by: dev-miro26 <devmiro26@gmail.com>
…The method now reflects its purpose of including terminated nodes for teardown processes. Signed-off-by: dev-miro26 <devmiro26@gmail.com>
bd568a0 to
4c107c4
Compare
| # Start with A (which already respects --keep-min-workers and | ||
| # --workers-only). On top of that, include nodes the provider | ||
| # knows about but reports as terminated. This handles | ||
| # LocalNodeProvider where the invoking machine's state file | ||
| # marks workers as terminated even though the head's autoscaler | ||
| # started them and their Docker containers are still running. | ||
| # For cloud providers this adds nothing because nodes_for_teardown | ||
| # delegates to non_terminated_nodes. | ||
| stale_terminated = set(provider.nodes_for_teardown({})) - set( | ||
| provider.non_terminated_nodes({}) | ||
| ) | ||
| if workers_only: | ||
| stale_terminated -= set( | ||
| provider.nodes_for_teardown({TAG_RAY_NODE_KIND: NODE_KIND_HEAD}) | ||
| ) | ||
| docker_stop_nodes = list(set(A) | stale_terminated) |
There was a problem hiding this comment.
Are these changes necessary? Can't we use the nodes_for_teardown in the remaining_nodes?
There was a problem hiding this comment.
let me check and update asap
There was a problem hiding this comment.
remaining_nodes() is also used as the exit condition in the teardown loop
There was a problem hiding this comment.
Yes, but can't these be moved into the nodes_for_teardown of the local node provider?
There was a problem hiding this comment.
I think, we can't move this into nodes_for_teardown because the provide rdoes not know about the --keep-min-workers.
Moving it into the provider would break the abstraction.
And remaining_nodes() controls a while loop, if it returned terminated nodes, the loop would never stop.
Please let me know your opinion.
Thanks
There was a problem hiding this comment.
Could you also modify terminate_node to add some additional flags so that nodes_for_teardown won't return terminated nodes?
There was a problem hiding this comment.
Added teardown_complete flag to prevent re-targeting terminated nodes in nodes_for_teardown
Signed-off-by: dev-miro26 <devmiro26@gmail.com>
Signed-off-by: dev-miro26 <devmiro26@gmail.com>
… teardown Signed-off-by: dev-miro26 <devmiro26@gmail.com>
|
Please review my last changes again. |
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Reviewed by Cursor Bugbot for commit e742286. Configure here.
… remaining_nodes Signed-off-by: dev-miro26 <devmiro26@gmail.com>
…n nodes_for_teardown Signed-off-by: dev-miro26 <devmiro26@gmail.com>
| head = provider.non_terminated_nodes({TAG_RAY_NODE_KIND: NODE_KIND_HEAD}) | ||
|
|
||
| return head + workers | ||
| return _nodes_to_teardown(provider.non_terminated_nodes) |
There was a problem hiding this comment.
Can't this be _nodes_to_teardown(provider.nodes_for_teardown)?
…iltering Signed-off-by: dev-miro26 <devmiro26@gmail.com>
|
|
||
| def remaining_nodes(): | ||
| workers = provider.non_terminated_nodes({TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) | ||
| def _nodes_to_teardown(get_nodes): |
There was a problem hiding this comment.
Can you merge this back to remaining_nodes?
| all_teardown = set(_nodes_to_teardown(provider.nodes_for_teardown)) | ||
| docker_stop_nodes = list(set(A) | all_teardown) |
…odes Signed-off-by: dev-miro26 <devmiro26@gmail.com>
|
Hi, @rueian |
|
Thanks |

Description
ray downon an SSH Docker cluster stops the head container but skips all workers. Their Docker containers keep running indefinitely.Root cause: Two separate
LocalNodeProviderinstances maintain independent state files — one on the machine invokingray downand one on the head node (managed by the autoscaler). Workers are only ever marked"running"by the head's autoscaler in its own/tmp/ray/cluster-<name>.statefile. The invoking machine's state file initializes workers as"terminated"inClusterState.__init__and never receives those updates. Whenteardown_clustercallsremaining_nodes()→provider.non_terminated_nodes(), all workers are filtered out, so thedocker stoploop has nothing to iterate.Fix: Add
NodeProvider.get_all_node_ids(tag_filters)that returns all known node IDs regardless of state. The base class delegates tonon_terminated_nodes()(no behavior change for cloud providers that query live infrastructure).LocalNodeProvideroverrides it to skip thestate == "terminated"filter.teardown_clusternow usesget_all_node_idsto build the Docker stop target list, ensuring worker containers are stopped even when the local state file is stale.Related issues
Closes: #62058
Additional information
Files changed:
python/ray/autoscaler/node_provider.py— Addedget_all_node_ids()to baseNodeProviderclass (defaults tonon_terminated_nodes)python/ray/autoscaler/_private/local/node_provider.py—LocalNodeProvideroverride that includes terminated nodespython/ray/autoscaler/_private/commands.py—teardown_clusterDocker stop phase usesget_all_node_idsinstead ofremaining_nodes()python/ray/tests/test_coordinator_server.py— AddedtestGetAllNodeIdsIncludesTerminatedwith_make_local_providerhelperBackward compatibility: The
terminate_nodesloop is unchanged (still usesnon_terminated_nodes). For cloud providers (AWS, GCP, Azure, etc.),get_all_node_idsdelegates tonon_terminated_nodes, so behavior is identical. The only change is that Docker stop now targets all configured local nodes during teardown.