Skip to content

Commit

Permalink
[autoscaler] Handle node type key change/deletion (#16691)
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitriGekhtman committed Jul 6, 2021
1 parent 25666ff commit a27a817
Show file tree
Hide file tree
Showing 7 changed files with 266 additions and 55 deletions.
33 changes: 17 additions & 16 deletions doc/source/cluster/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ Syntax
:ref:`auth <cluster-configuration-auth-type>`
:ref:`available_node_types <cluster-configuration-available-node-types>`:
:ref:`node_types <cluster-configuration-node-types-type>`
:ref:`worker_nodes <cluster-configuration-worker-nodes>`:
:ref:`node_config <cluster-configuration-node-config-type>`
:ref:`head_node_type <cluster-configuration-head-node-type>`: str
:ref:`file_mounts <cluster-configuration-file-mounts>`:
:ref:`file_mounts <cluster-configuration-file-mounts-type>`
Expand Down Expand Up @@ -137,7 +135,12 @@ Provider
Node types
~~~~~~~~~~

The nodes types object's keys represent the names of the different node types.
The ``available_nodes_types`` object's keys represent the names of the different node types.

Deleting a node type from ``available_node_types`` and updating with :ref:`ray up<ray-up-doc>` will cause the autoscaler to scale down all nodes of that type.
In particular, changing the key of a node type object will
result in removal of nodes corresponding to the old key; nodes with the new key name will then be
created according to cluster configuration and Ray resource demands.

.. parsed-literal::
<node_type_1_name>:
Expand All @@ -160,6 +163,11 @@ The nodes types object's keys represent the names of the different node types.
Node config
~~~~~~~~~~~

Cloud-specific configuration for nodes of a given node type.

Modifying the ``node_config`` and updating with :ref:`ray up<ray-up-doc>` will cause the autoscaler to scale down all existing nodes of the node type;
nodes with the newly applied ``node_config`` will then be created according to cluster configuration and Ray resource demands.

.. tabs::
.. group-tab:: AWS

Expand Down Expand Up @@ -320,7 +328,7 @@ Authentication credentials that Ray will use to launch nodes.
~~~~~~~~~~~~~~~~~~~~~~~~

Tells the autoscaler the allowed node types and the resources they provide.
The key is the name of the node type, which is just for debugging purposes.
Each node type is identified by a user-specified key.

* **Required:** No
* **Importance:** High
Expand Down Expand Up @@ -358,24 +366,17 @@ The key is the name of the node type, which is just for debugging purposes.

The key for one of the node types in :ref:`available_node_types <cluster-configuration-available-node-types>`. This node type will be used to launch the head node.

If the field ``head_node_type`` is changed and an update is executed with :ref:`ray up<ray-up-doc>`, the currently running head node will
be considered outdated. The user will receive a prompt asking to confirm scale-down of the outdated head node, and the cluster will restart with a new
head node. Changing the :ref:`node_config<cluster-configuration-node-config>` of the :ref:`node_type<cluster-configuration-node-types-type>` with key ``head_node_type`` will also result in cluster restart after a user prompt.



* **Required:** Yes
* **Importance:** High
* **Type:** String
* **Pattern:** ``[a-zA-Z0-9_]+``

.. _cluster-configuration-worker-nodes:

``worker_nodes``
~~~~~~~~~~~~~~~~

The configuration to be used to launch worker nodes on the cloud service provider. Generally, node configs are set in the :ref:`node config of each node type <cluster-configuration-node-config>`. Setting this property allows propagation of a default value to all the node types when they launch as workers (e.g., using spot instances across all workers can be configured here so that it doesn't have to be set across all instance types).

* **Required:** No
* **Importance:** Low
* **Type:** :ref:`Node config <cluster-configuration-node-config-type>`
* **Default:** ``{}``

.. _cluster-configuration-file-mounts:

``file_mounts``
Expand Down
51 changes: 34 additions & 17 deletions python/ray/autoscaler/_private/autoscaler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from collections import defaultdict, namedtuple, Counter
from ray.autoscaler._private.prom_metrics import AutoscalerPrometheusMetrics
from typing import Any, Optional, Dict, List
from typing import Any, Optional, Dict, List, Tuple
from urllib3.exceptions import MaxRetryError
import copy
import logging
Expand Down Expand Up @@ -197,9 +197,11 @@ def _update(self):
# Make sure to not kill idle node types if the number of workers
# of that type is lower/equal to the min_workers of that type
# or it is needed for request_resources().
if (self._keep_min_worker_of_node_type(node_id, node_type_counts)
or not nodes_allowed_to_terminate.get(
node_id, True)) and self.launch_config_ok(node_id):
should_keep_worker_of_node_type, node_type_counts = \
self._keep_worker_of_node_type(node_id, node_type_counts)
if ((should_keep_worker_of_node_type
or not nodes_allowed_to_terminate.get(node_id, True))
and self.launch_config_ok(node_id)):
continue

node_ip = self.provider.internal_ip(node_id)
Expand Down Expand Up @@ -449,34 +451,45 @@ def _get_nodes_allowed_to_terminate(
nodes_allowed_to_terminate[node_id] = False
return nodes_allowed_to_terminate

def _keep_min_worker_of_node_type(
self, node_id: NodeID,
node_type_counts: Dict[NodeType, int]) -> bool:
"""Returns if workers of node_type can be terminated.
The worker cannot be terminated to respect min_workers constraint.
def _keep_worker_of_node_type(self, node_id: NodeID,
node_type_counts: Dict[NodeType, int]
) -> Tuple[bool, Dict[NodeType, int]]:
"""Determines if a worker should be kept based on the min_workers
constraint of the worker's node_type.
Receives the counters of running nodes so far and determines if idle
node_id should be terminated or not. It also updates the counters
(node_type_counts), which is returned by reference.
Returns True exactly when both of the following hold:
(a) The worker's node_type is present among the keys of the current
config's available_node_types dict.
(b) Deleting the node would violate the min_workers constraint for that
worker's node_type.
Also updates and returns the dictionary of node type counts.
Args:
node_type_counts(Dict[NodeType, int]): The non_terminated node
types counted so far.
Returns:
bool: if workers of node_types can be terminated or not.
bool: True if the node should be kept. False otherwise.
Dict[NodeType, int]: Updated node type counts
"""
new_node_type_counts = copy.deepcopy(node_type_counts)
tags = self.provider.node_tags(node_id)
if TAG_RAY_USER_NODE_TYPE in tags:
node_type = tags[TAG_RAY_USER_NODE_TYPE]
node_type_counts[node_type] += 1
if node_type not in self.available_node_types:
# The node type has been deleted from the cluster config.
# Don't keep the node.
return False, new_node_type_counts
new_node_type_counts[node_type] += 1
min_workers = self.available_node_types[node_type].get(
"min_workers", 0)
max_workers = self.available_node_types[node_type].get(
"max_workers", 0)
if node_type_counts[node_type] <= min(min_workers, max_workers):
return True
if new_node_type_counts[node_type] <= min(min_workers,
max_workers):
return True, new_node_type_counts

return False
return False, new_node_type_counts

def _node_resources(self, node_id):
node_type = self.provider.node_tags(node_id).get(
Expand Down Expand Up @@ -579,6 +592,10 @@ def launch_config_ok(self, node_id):
node_tags = self.provider.node_tags(node_id)
tag_launch_conf = node_tags.get(TAG_RAY_LAUNCH_CONFIG)
node_type = node_tags.get(TAG_RAY_USER_NODE_TYPE)
if node_type not in self.available_node_types:
# The node type has been deleted from the cluster config.
# Don't keep the node.
return False

# The `worker_nodes` field is deprecated in favor of per-node-type
# node_configs. We allow it for backwards-compatibility.
Expand Down
80 changes: 63 additions & 17 deletions python/ray/autoscaler/_private/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -602,8 +602,8 @@ def get_or_create_head_node(config: Dict[str, Any],
# The above `head_node` field is deprecated in favor of per-node-type
# node_configs. We allow it for backwards-compatibility.
head_node_resources = None
if "head_node_type" in config:
head_node_type = config["head_node_type"]
head_node_type = config.get("head_node_type")
if head_node_type:
head_node_tags[TAG_RAY_USER_NODE_TYPE] = head_node_type
head_config = config["available_node_types"][head_node_type]
head_node_config.update(head_config["node_config"])
Expand All @@ -613,23 +613,15 @@ def get_or_create_head_node(config: Dict[str, Any],
head_node_resources = head_config.get("resources")

launch_hash = hash_launch_conf(head_node_config, config["auth"])
launching_new_head = False
if head_node is None or provider.node_tags(head_node).get(
TAG_RAY_LAUNCH_CONFIG) != launch_hash:
launching_new_head = True
creating_new_head = _should_create_new_head(head_node, launch_hash,
head_node_type, provider)
if creating_new_head:
with cli_logger.group("Acquiring an up-to-date head node"):
global_event_system.execute_callback(
CreateClusterEvent.acquiring_new_head_node)
if head_node is not None:
cli_logger.print(
"Currently running head node is out-of-date with "
"cluster configuration")
cli_logger.print(
"hash is {}, expected {}",
cf.bold(
provider.node_tags(head_node)
.get(TAG_RAY_LAUNCH_CONFIG)), cf.bold(launch_hash))
cli_logger.confirm(yes, "Relaunching it.", _abort=True)
cli_logger.confirm(
yes, "Relaunching the head node.", _abort=True)

provider.terminate_node(head_node)
cli_logger.print("Terminated head node {}", head_node)
Expand Down Expand Up @@ -685,9 +677,9 @@ def get_or_create_head_node(config: Dict[str, Any],
else:
setup_commands = []
ray_start_commands = config["head_start_ray_commands"]
# If user passed in --no-restart and we're not launching a new head,
# If user passed in --no-restart and we're not creating a new head,
# omit start commands.
elif no_restart and not launching_new_head:
elif no_restart and not creating_new_head:
setup_commands = config["head_setup_commands"]
ray_start_commands = []
else:
Expand Down Expand Up @@ -758,6 +750,60 @@ def get_or_create_head_node(config: Dict[str, Any],
cli_logger.print(" {}", remote_shell_str.strip())


def _should_create_new_head(head_node_id: Optional[str], new_launch_hash: str,
new_head_node_type: str,
provider: NodeProvider) -> bool:
"""Decides whether a new head node needs to be created.
We need a new head if at least one of the following holds:
(a) There isn't an existing head node
(b) The user-submitted head node_config differs from the existing head
node's node_config.
(c) The user-submitted head node_type key differs from the existing head
node's node_type.
Args:
head_node_id (Optional[str]): head node id if a head exists, else None
new_launch_hash (str): hash of current user-submitted head config
new_head_node_type (str): current user-submitted head node-type key
Returns:
bool: True if a new Ray head node should be launched, False otherwise
"""
if not head_node_id:
# No head node exists, need to create it.
return True

# Pull existing head's data.
head_tags = provider.node_tags(head_node_id)
current_launch_hash = head_tags.get(TAG_RAY_LAUNCH_CONFIG)
current_head_type = head_tags.get(TAG_RAY_USER_NODE_TYPE)

# Compare to current head
hashes_mismatch = new_launch_hash != current_launch_hash
types_mismatch = new_head_node_type != current_head_type

new_head_required = hashes_mismatch or types_mismatch

# Warn user
if new_head_required:
with cli_logger.group(
"Currently running head node is out-of-date with cluster "
"configuration"):

if hashes_mismatch:
cli_logger.print("Current hash is {}, expected {}",
cf.bold(current_launch_hash),
cf.bold(new_launch_hash))

if types_mismatch:
cli_logger.print("Current head node type is {}, expected {}",
cf.bold(current_head_type),
cf.bold(new_head_node_type))

return new_head_required


def _set_up_config_for_head_node(config: Dict[str, Any],
provider: NodeProvider,
no_restart: bool) ->\
Expand Down
13 changes: 8 additions & 5 deletions python/ray/autoscaler/_private/resource_demand_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,13 +459,16 @@ def calculate_node_resources(

def add_node(node_type, available_resources=None):
if node_type not in self.node_types:
logger.warn(
# We should not get here, but if for some reason we do, log an
# error and skip the errant node_type.
logger.error(
f"Missing entry for node_type {node_type} in "
f"cluster config: {self.node_types} under entry "
f"available_node_types. This node's resources will be "
f"ignored. If you are using an unmanaged node, manually "
f"set the user_node_type tag to \"{NODE_KIND_UNMANAGED}\""
f"in your cloud provider's management console.")
"available_node_types. This node's resources will be "
"ignored. If you are using an unmanaged node, manually "
f"set the {TAG_RAY_NODE_KIND} tag to "
f"\"{NODE_KIND_UNMANAGED}\" in your cloud provider's "
"management console.")
return None
# Careful not to include the same dict object multiple times.
available = copy.deepcopy(self.node_types[node_type]["resources"])
Expand Down
1 change: 1 addition & 0 deletions python/ray/autoscaler/ray-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@
"min_workers": {"type": "integer"},
"max_workers": {"type": "integer"},
"resources": {
"type": "object",
"patternProperties": {
".*":{
"type": "integer",
Expand Down
Loading

0 comments on commit a27a817

Please sign in to comment.