Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] respect bundle index when task is scheduled with pg with no resources. #43448

Merged
merged 5 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 9 additions & 4 deletions python/ray/_private/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2013,8 +2013,9 @@ def pasre_pg_formatted_resources_to_original(
original_resources = {}

for key, value in pg_formatted_resources.items():
result = PLACEMENT_GROUP_WILDCARD_RESOURCE_PATTERN.match(key)
if result and len(result.groups()) == 2:
result = PLACEMENT_GROUP_INDEXED_BUNDLED_RESOURCE_PATTERN.match(key)
if result and len(result.groups()) == 3:
# This should be already skipped from the logic above.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment should be removed

# Filter out resources that have bundle_group_[pg_id] since
# it is an implementation detail.
# This resource is automatically added to the resource
Expand All @@ -2024,8 +2025,12 @@ def pasre_pg_formatted_resources_to_original(

original_resources[result.group(1)] = value
continue
result = PLACEMENT_GROUP_INDEXED_BUNDLED_RESOURCE_PATTERN.match(key)
if result and len(result.groups()) == 3:

result = PLACEMENT_GROUP_WILDCARD_RESOURCE_PATTERN.match(key)
if result and len(result.groups()) == 2:
if result.group(1) == "bundle":
continue

original_resources[result.group(1)] = value
continue
original_resources[key] = value
Expand Down
90 changes: 90 additions & 0 deletions python/ray/tests/test_placement_group_5.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
from ray._private.runtime_env.plugin import RuntimeEnvPlugin
from ray._private.test_utils import wait_for_condition
from click.testing import CliRunner
import ray.scripts.scripts as scripts


@pytest.mark.parametrize("connect_to_client", [False, True])
Expand Down Expand Up @@ -114,6 +116,94 @@ def check_eq(ip1, ip2):
placement_group_assert_no_leak([pg1, pg2])


@pytest.mark.parametrize("connect_to_client", [False, True])
def test_pg_no_resource_bundle_index(ray_start_cluster, connect_to_client):
@ray.remote(num_cpus=0)
class Actor:
def node_id(self):
return ray.get_runtime_context().get_node_id()

cluster = ray_start_cluster
num_nodes = 4
for _ in range(num_nodes):
cluster.add_node(num_cpus=1)
ray.init(address=cluster.address)

with connect_to_client_or_not(connect_to_client):
pg = ray.util.placement_group(
bundles=[{"CPU": 1} for _ in range(num_nodes)],
)
ray.get(pg.ready())
first_bundle_node_id = ray.util.placement_group_table(pg)["bundles_to_node_id"][
0
]

# Iterate 10 times to make sure it is not flaky.
for _ in range(10):
actor = Actor.options(
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=pg, placement_group_bundle_index=0
)
).remote()

assert first_bundle_node_id == ray.get(actor.node_id.remote())

placement_group_assert_no_leak([pg])


# Make sure the task observability API outputs don't contain
# pg related data.
# TODO(sang): Currently, when a task hangs because the bundle
# index doesn't have enough resources, it is not displayed. Fix it.
def test_task_using_pg_observability(ray_start_cluster):
@ray.remote(num_cpus=1)
class Actor:
def get_assigned_resources(self):
return ray.get_runtime_context().get_assigned_resources()

cluster = ray_start_cluster
num_nodes = 1
for _ in range(num_nodes):
cluster.add_node(num_cpus=1)
ray.init(address=cluster.address)

pg = ray.util.placement_group(
bundles=[{"CPU": 1} for _ in range(num_nodes)],
)

# Make sure get_assigned_id doesn't contain formatted resources.
bundle_index = 0
actor1 = Actor.options(
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=pg, placement_group_bundle_index=bundle_index
)
).remote()
r = ray.get(actor1.get_assigned_resources.remote())
assert "bundle_group" not in r
assert f"bundle_group_{bundle_index}" not in r

# Make sure ray status doesn't contain formatted resources.
actor2 = Actor.options( # noqa
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=pg, placement_group_bundle_index=0
)
).remote()

def check_demands():
runner = CliRunner()
result = runner.invoke(scripts.status)
if "No cluster status." in result.stdout:
return False

expected_demand_str = (
"{'CPU': 1.0}: 1+ pending tasks/actors " "(1+ using placement groups)"
)
assert expected_demand_str in result.stdout, result.stdout
return True

wait_for_condition(check_demands)


@pytest.mark.parametrize("connect_to_client", [False, True])
@pytest.mark.parametrize("scheduling_strategy", ["SPREAD", "STRICT_SPREAD", "PACK"])
def test_placement_group_bin_packing_priority(
Expand Down
5 changes: 5 additions & 0 deletions src/ray/common/bundle_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,11 @@ std::unordered_map<std::string, double> AddPlacementGroupConstraint(
auto bundle_key =
FormatPlacementGroupResource(kBundle_ResourceLabel, placement_group_id, -1);
new_resources[bundle_key] = 0.001;
if (bundle_index >= 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I'm wondering if we should only add bundle resource when normal resources are empty.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add index resource in case users want to use bundle_index

auto bundle_key_with_index = FormatPlacementGroupResource(
kBundle_ResourceLabel, placement_group_id, bundle_index);
new_resources[bundle_key_with_index] = 0.001;
}
return new_resources;
}

Expand Down