-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[autoscaler] Make placement groups bypass max launch limit #13089
Changes from all commits
7248cf9
bc43e46
14be0a1
ab660a8
1ea0c1f
dad31ae
49bcf56
16d736d
06911df
0d8dddb
fe69ce3
35832ed
ca0be53
c9518bd
5452d39
9e904cd
f9edcbe
cb02267
5e5d403
4d44cd8
614abbf
818a63a
539b29c
9a63866
7501623
b4edd21
fc48725
0aef789
39245a8
c7eb4ad
df31394
cac0bcb
445fa9f
f3974bd
a4228f5
c289dcc
06cc20f
3aee34d
01c8ac9
e47b014
afc6fe1
2921fbf
69c5a5c
33ab664
c0cc78f
556eec8
6e4b290
4cab21c
7c0d81c
a49ca55
3f41c74
1b7c06d
be89c0c
a4032c1
ab93d3b
0f382e7
b2b4cc0
95ea031
1f000e5
d4253a2
a195d6e
7f05e7e
2e16221
64595ec
abaf4c7
668898c
4a3ea6a
1a2eb0b
49069c7
7b1a1b4
6063cbb
ab288c2
ef9d144
efe828e
f1b8618
2a720b6
4f55501
db37d17
a68f773
0ed10cc
fa41192
0593398
b49af1f
d1ef080
2b9515d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -159,11 +159,15 @@ def get_nodes_to_launch( | |
node_resources, node_type_counts, self.node_types, | ||
self.max_workers, self.head_node_type, ensure_min_cluster_size) | ||
|
||
# Step 3: add nodes for strict spread groups | ||
logger.debug(f"Placement group demands: {pending_placement_groups}") | ||
# Step 3: get resource demands of placement groups and return the | ||
# groups that should be strictly spread. | ||
logger.info(f"Placement group demands: {pending_placement_groups}") | ||
placement_group_demand_vector, strict_spreads = \ | ||
placement_groups_to_resource_demands(pending_placement_groups) | ||
resource_demands.extend(placement_group_demand_vector) | ||
# Place placement groups demand vector at the beginning of the resource | ||
# demands vector to make it consistent (results in the same types of | ||
# nodes to add) with pg_demands_nodes_max_launch_limit calculated later | ||
resource_demands = placement_group_demand_vector + resource_demands | ||
|
||
if self.is_legacy_yaml() and \ | ||
not self.node_types[NODE_TYPE_LEGACY_WORKER]["resources"]: | ||
|
@@ -179,18 +183,32 @@ def get_nodes_to_launch( | |
return self._legacy_worker_node_to_launch( | ||
nodes, launching_nodes, node_resources, | ||
resource_demands + request_resources_demands) | ||
placement_group_nodes_to_add, node_resources, node_type_counts = \ | ||
|
||
spread_pg_nodes_to_add, node_resources, node_type_counts = \ | ||
self.reserve_and_allocate_spread( | ||
strict_spreads, node_resources, node_type_counts) | ||
|
||
# Calculate the nodes to add for bypassing max launch limit for | ||
# placement groups and spreads. | ||
unfulfilled_placement_groups_demands, _ = get_bin_pack_residual( | ||
node_resources, placement_group_demand_vector) | ||
# Add 1 to account for the head node. | ||
max_to_add = self.max_workers + 1 - sum(node_type_counts.values()) | ||
pg_demands_nodes_max_launch_limit = get_nodes_for( | ||
self.node_types, node_type_counts, self.head_node_type, max_to_add, | ||
unfulfilled_placement_groups_demands) | ||
placement_groups_nodes_max_limit = { | ||
node_type: spread_pg_nodes_to_add.get(node_type, 0) + | ||
pg_demands_nodes_max_launch_limit.get(node_type, 0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we add them together here? Shouldn't it be a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See comment above. this is why I split it to spread_pg_nodes_to_add and pg_demands_nodes_max_launch_limit that should be added. The first counts the spreads and the second counts the demands. |
||
for node_type in self.node_types | ||
} | ||
|
||
# Step 4/5: add nodes for pending tasks, actors, and non-strict spread | ||
# groups | ||
unfulfilled, _ = get_bin_pack_residual(node_resources, | ||
resource_demands) | ||
logger.debug("Resource demands: {}".format(resource_demands)) | ||
logger.debug("Unfulfilled demands: {}".format(unfulfilled)) | ||
# Add 1 to account for the head node. | ||
max_to_add = self.max_workers + 1 - sum(node_type_counts.values()) | ||
nodes_to_add_based_on_demand = get_nodes_for( | ||
self.node_types, node_type_counts, self.head_node_type, max_to_add, | ||
unfulfilled) | ||
|
@@ -201,15 +219,16 @@ def get_nodes_to_launch( | |
|
||
for node_type in self.node_types: | ||
nodes_to_add = (adjusted_min_workers.get( | ||
node_type, 0) + placement_group_nodes_to_add.get(node_type, 0) | ||
+ nodes_to_add_based_on_demand.get(node_type, 0)) | ||
node_type, 0) + spread_pg_nodes_to_add.get(node_type, 0) + | ||
nodes_to_add_based_on_demand.get(node_type, 0)) | ||
if nodes_to_add > 0: | ||
total_nodes_to_add[node_type] = nodes_to_add | ||
|
||
# Limit the number of concurrent launches | ||
total_nodes_to_add = self._get_concurrent_resource_demand_to_launch( | ||
total_nodes_to_add, unused_resources_by_ip.keys(), nodes, | ||
launching_nodes, adjusted_min_workers) | ||
launching_nodes, adjusted_min_workers, | ||
placement_groups_nodes_max_limit) | ||
|
||
logger.debug("Node requests: {}".format(total_nodes_to_add)) | ||
return total_nodes_to_add | ||
|
@@ -288,6 +307,7 @@ def _get_concurrent_resource_demand_to_launch( | |
non_terminated_nodes: List[NodeID], | ||
pending_launches_nodes: Dict[NodeType, int], | ||
adjusted_min_workers: Dict[NodeType, int], | ||
placement_group_nodes: Dict[NodeType, int], | ||
) -> Dict[NodeType, int]: | ||
"""Updates the max concurrent resources to launch for each node type. | ||
|
||
|
@@ -311,6 +331,8 @@ def _get_concurrent_resource_demand_to_launch( | |
min_workers and request_resources(). This overrides the launch | ||
limits since the user is hinting to immediately scale up to | ||
this size. | ||
placement_group_nodes: Nodes to launch for placement groups. | ||
This overrides the launch concurrency limits. | ||
Returns: | ||
Dict[NodeType, int]: Maximum number of nodes to launch for each | ||
node type. | ||
|
@@ -333,8 +355,9 @@ def _get_concurrent_resource_demand_to_launch( | |
max_allowed_pending_nodes - total_pending_nodes, | ||
|
||
# Allow more nodes if this is to respect min_workers or | ||
# request_resources(). | ||
adjusted_min_workers.get(node_type, 0)) | ||
# request_resources() or placement groups. | ||
adjusted_min_workers.get(node_type, 0) + | ||
placement_group_nodes.get(node_type, 0)) | ||
|
||
if upper_bound > 0: | ||
updated_nodes_to_launch[node_type] = min( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -766,7 +766,8 @@ def test_get_concurrent_resource_demand_to_launch(): | |
|
||
# Sanity check. | ||
updated_to_launch = \ | ||
scheduler._get_concurrent_resource_demand_to_launch({}, [], [], {}, {}) | ||
scheduler._get_concurrent_resource_demand_to_launch( | ||
{}, [], [], {}, {}, {}) | ||
assert updated_to_launch == {} | ||
|
||
provider.create_node({}, { | ||
|
@@ -785,11 +786,38 @@ def test_get_concurrent_resource_demand_to_launch(): | |
connected_nodes = [] # All the non_terminated_nodes are not connected yet. | ||
updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch( | ||
to_launch, connected_nodes, non_terminated_nodes, | ||
pending_launches_nodes, {}) | ||
pending_launches_nodes, {}, {}) | ||
# Note: we have 2 pending/launching gpus, 3 pending/launching cpus, | ||
# 0 running gpu, and 0 running cpus. | ||
assert updated_to_launch == {"p2.8xlarge": 3, "m4.large": 2} | ||
|
||
# Test min_workers bypass max launch limit. | ||
updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch( | ||
to_launch, | ||
connected_nodes, | ||
non_terminated_nodes, | ||
pending_launches_nodes, | ||
adjusted_min_workers={"m4.large": 40}, | ||
placement_group_nodes={}) | ||
assert updated_to_launch == {"p2.8xlarge": 3, "m4.large": 40} | ||
# Test placement groups bypass max launch limit. | ||
updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch( | ||
to_launch, | ||
connected_nodes, | ||
non_terminated_nodes, | ||
pending_launches_nodes, {}, | ||
placement_group_nodes={"m4.large": 40}) | ||
assert updated_to_launch == {"p2.8xlarge": 3, "m4.large": 40} | ||
# Test combining min_workers and placement groups bypass max launch limit. | ||
updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch( | ||
to_launch, | ||
connected_nodes, | ||
non_terminated_nodes, | ||
pending_launches_nodes, | ||
adjusted_min_workers={"m4.large": 25}, | ||
placement_group_nodes={"m4.large": 15}) | ||
assert updated_to_launch == {"p2.8xlarge": 3, "m4.large": 40} | ||
|
||
# This starts the min workers only, so we have no more pending workers. | ||
# The workers here are either running (connected) or in | ||
# pending_launches_nodes (i.e., launching). | ||
|
@@ -798,7 +826,7 @@ def test_get_concurrent_resource_demand_to_launch(): | |
] | ||
updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch( | ||
to_launch, connected_nodes, non_terminated_nodes, | ||
pending_launches_nodes, {}) | ||
pending_launches_nodes, {}, {}) | ||
# Note that here we have 1 launching gpu, 1 launching cpu, | ||
# 1 running gpu, and 2 running cpus. | ||
assert updated_to_launch == {"p2.8xlarge": 4, "m4.large": 4} | ||
|
@@ -819,7 +847,7 @@ def test_get_concurrent_resource_demand_to_launch(): | |
pending_launches_nodes = {} # No pending launches | ||
updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch( | ||
to_launch, connected_nodes, non_terminated_nodes, | ||
pending_launches_nodes, {}) | ||
pending_launches_nodes, {}, {}) | ||
# Note: we have 5 pending cpus. So we are not allowed to start any. | ||
# Still only 2 running cpus. | ||
assert updated_to_launch == {} | ||
|
@@ -830,7 +858,7 @@ def test_get_concurrent_resource_demand_to_launch(): | |
] | ||
updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch( | ||
to_launch, connected_nodes, non_terminated_nodes, | ||
pending_launches_nodes, {}) | ||
pending_launches_nodes, {}, {}) | ||
# Note: that here we have 7 running cpus and nothing pending/launching. | ||
assert updated_to_launch == {"m4.large": 7} | ||
|
||
|
@@ -846,7 +874,7 @@ def test_get_concurrent_resource_demand_to_launch(): | |
pending_launches_nodes = {"m4.large": 1} | ||
updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch( | ||
to_launch, connected_nodes, non_terminated_nodes, | ||
pending_launches_nodes, {}) | ||
pending_launches_nodes, {}, {}) | ||
# Note: we have 8 pending/launching cpus and only 7 running. | ||
# So we should not launch anything (8 < 7). | ||
assert updated_to_launch == {} | ||
|
@@ -857,24 +885,90 @@ def test_get_concurrent_resource_demand_to_launch(): | |
] | ||
updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch( | ||
to_launch, connected_nodes, non_terminated_nodes, | ||
pending_launches_nodes, {}) | ||
pending_launches_nodes, {}, {}) | ||
# Note: that here we have 14 running cpus and 1 launching. | ||
assert updated_to_launch == {"m4.large": 13} | ||
|
||
|
||
def test_get_nodes_to_launch_max_launch_concurrency_placement_groups(): | ||
provider = MockProvider() | ||
new_types = copy.deepcopy(TYPES_A) | ||
new_types["p2.8xlarge"]["min_workers"] = 10 | ||
new_types["p2.8xlarge"]["max_workers"] = 40 | ||
|
||
scheduler = ResourceDemandScheduler( | ||
provider, new_types, 50, head_node_type=None) | ||
|
||
pending_placement_groups = [ | ||
PlacementGroupTableData( | ||
state=PlacementGroupTableData.RESCHEDULING, | ||
strategy=PlacementStrategy.PACK, | ||
bundles=([Bundle(unit_resources={"GPU": 8})] * 25)) | ||
] | ||
# placement groups should bypass max launch limit. | ||
# Note that 25 = max(placement group resources=25, min_workers=10). | ||
to_launch = scheduler.get_nodes_to_launch([], {}, [], {}, | ||
pending_placement_groups, {}) | ||
assert to_launch == {"p2.8xlarge": 25} | ||
|
||
pending_placement_groups = [ | ||
# Requires 25 p2.8xlarge nodes. | ||
PlacementGroupTableData( | ||
state=PlacementGroupTableData.RESCHEDULING, | ||
strategy=PlacementStrategy.STRICT_SPREAD, | ||
bundles=([Bundle(unit_resources={"GPU": 2})] * 25)), | ||
# Requires 5 additional nodes (total 30). | ||
PlacementGroupTableData( | ||
state=PlacementGroupTableData.RESCHEDULING, | ||
strategy=PlacementStrategy.PACK, | ||
bundles=([Bundle(unit_resources={"GPU": 6})] * 30)) | ||
] | ||
|
||
to_launch = scheduler.get_nodes_to_launch([], {}, [], {}, | ||
pending_placement_groups, {}) | ||
# Test that combining spreads and normal placement group demands bypasses | ||
# launch limit. | ||
assert to_launch == {"p2.8xlarge": 30} | ||
|
||
pending_placement_groups = [ | ||
# Requires 25 p2.8xlarge nodes. | ||
PlacementGroupTableData( | ||
state=PlacementGroupTableData.RESCHEDULING, | ||
strategy=PlacementStrategy.STRICT_SPREAD, | ||
bundles=([Bundle(unit_resources={"GPU": 2})] * 25)), | ||
# Requires 35 additional nodes (total 60). | ||
PlacementGroupTableData( | ||
state=PlacementGroupTableData.RESCHEDULING, | ||
strategy=PlacementStrategy.PACK, | ||
bundles=([Bundle(unit_resources={"GPU": 6})] * 60)) | ||
] | ||
|
||
to_launch = scheduler.get_nodes_to_launch([], {}, [], {}, | ||
pending_placement_groups, {}) | ||
# make sure it still respects max_workers of p2.8xlarge. | ||
assert to_launch == {"p2.8xlarge": 40} | ||
|
||
scheduler.node_types["p2.8xlarge"]["max_workers"] = 60 | ||
to_launch = scheduler.get_nodes_to_launch([], {}, [], {}, | ||
pending_placement_groups, {}) | ||
# make sure it still respects global max_workers constraint. | ||
# 50 + 1 is global max_workers + head node.ß | ||
assert to_launch == {"p2.8xlarge": 51} | ||
|
||
|
||
def test_get_nodes_to_launch_max_launch_concurrency(): | ||
provider = MockProvider() | ||
new_types = copy.deepcopy(TYPES_A) | ||
new_types["p2.8xlarge"]["min_workers"] = 4 | ||
new_types["p2.8xlarge"]["min_workers"] = 10 | ||
new_types["p2.8xlarge"]["max_workers"] = 40 | ||
|
||
scheduler = ResourceDemandScheduler( | ||
provider, new_types, 30, head_node_type=None) | ||
|
||
to_launch = scheduler.get_nodes_to_launch([], {}, [], {}, [], {}) | ||
# Respects min_workers despite concurrency limitation. | ||
assert to_launch == {"p2.8xlarge": 4} | ||
|
||
# Respects min_workers despite max launch limit. | ||
assert to_launch == {"p2.8xlarge": 10} | ||
scheduler.node_types["p2.8xlarge"]["min_workers"] = 4 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we check it still respects max_workers? It would be ideal to break placement group handling out in a separate test actually, since this one is getting quite long. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will separate it to a new test and add the check for max_workers. Though I am not sure how this is relevant to the PR which only lifts the restrictions of upscaling. |
||
provider.create_node({}, { | ||
TAG_RAY_USER_NODE_TYPE: "p2.8xlarge", | ||
TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it just be
pg_nodes_to_add
? Spread is a type of placement group right?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two placement groups and I used the current naming to make it clearer. Originally we counted only spreads in placement_group_nodes_to_add, which is confusing because placement_group_nodes_to_add should be both (demand based and spread based). The placement groups nodes to add are actually the combination of spreads and placement group demands.