Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Release PG CPU resources for blocked worker #43270

Merged
merged 4 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
42 changes: 42 additions & 0 deletions python/ray/tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
get_error_message,
run_string_as_driver,
)
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -46,6 +47,47 @@ def f():
run_string_as_driver(script, dict(os.environ, **env))


def test_release_cpu_resources(shutdown_only):
ray.init(num_cpus=1)

@ray.remote(num_cpus=1)
def child():
return 3

@ray.remote(num_cpus=1)
def parent():
# Parent should release the CPU resource
# to run child.
return ray.get(child.remote())

assert ray.get(parent.remote()) == 3

# Make sure CPU resource inside PG can also be released properly.
pg = ray.util.placement_group(bundles=[{"CPU": 1}])
assert (
ray.get(
parent.options(
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=pg, placement_group_capture_child_tasks=True
)
).remote()
)
== 3
)
assert (
jjyao marked this conversation as resolved.
Show resolved Hide resolved
ray.get(
parent.options(
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=pg,
placement_group_bundle_index=0,
placement_group_capture_child_tasks=True,
)
).remote()
)
== 3
)


# https://github.com/ray-project/ray/issues/16025
def test_release_resources_race(shutdown_only):
# This test fails with the flag set to false.
Expand Down
121 changes: 84 additions & 37 deletions src/ray/raylet/local_task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,23 @@
namespace ray {
namespace raylet {

bool IsCPUOrPlacementGroupCPUResource(ResourceID resource_id) {
// Check whether the resource is CPU resource or CPU resource inside PG.
if (resource_id == ResourceID::CPU()) {
return true;
}

auto possible_pg_resource = ParsePgFormattedResource(resource_id.Binary(),
/*for_wildcard_resource*/ true,
/*for_indexed_resource*/ true);
if (possible_pg_resource.has_value() &&
possible_pg_resource->original_resource == ResourceID::CPU().Binary()) {
return true;
}

return false;
}

LocalTaskManager::LocalTaskManager(
const NodeID &self_node_id,
std::shared_ptr<ClusterResourceScheduler> cluster_resource_scheduler,
Expand Down Expand Up @@ -943,30 +960,34 @@ int64_t LocalTaskManager::TotalBacklogSize(SchedulingClass scheduling_class) {

void LocalTaskManager::ReleaseWorkerResources(std::shared_ptr<WorkerInterface> worker) {
RAY_CHECK(worker != nullptr);
auto allocated_instances = worker->GetAllocatedInstances();
if (allocated_instances != nullptr) {
if (worker->IsBlocked()) {
// If the worker is blocked, its CPU instances have already been released. We clear
// the CPU instances to avoid double freeing.
allocated_instances->Remove(ResourceID::CPU());
}
cluster_resource_scheduler_->GetLocalResourceManager().ReleaseWorkerResources(
worker->GetAllocatedInstances());
worker->ClearAllocatedInstances();
auto allocated_instances = worker->GetAllocatedInstances()
? worker->GetAllocatedInstances()
: worker->GetLifetimeAllocatedInstances();
if (allocated_instances == nullptr) {
return;
}

auto lifetime_allocated_instances = worker->GetLifetimeAllocatedInstances();
if (lifetime_allocated_instances != nullptr) {
if (worker->IsBlocked()) {
// If the worker is blocked, its CPU instances have already been released. We clear
// the CPU instances to avoid double freeing.
lifetime_allocated_instances->Remove(ResourceID::CPU());
if (worker->IsBlocked()) {
// If the worker is blocked, its CPU instances have already been released. We clear
// the CPU instances to avoid double freeing.

// For PG, there may be two cpu resources: wildcard and indexed.
std::vector<ResourceID> cpu_resource_ids;
for (const auto &resource_id : allocated_instances->ResourceIds()) {
if (IsCPUOrPlacementGroupCPUResource(resource_id)) {
cpu_resource_ids.emplace_back(resource_id);
}
}

for (const auto cpu_resource_id : cpu_resource_ids) {
allocated_instances->Remove(cpu_resource_id);
}
cluster_resource_scheduler_->GetLocalResourceManager().ReleaseWorkerResources(
worker->GetLifetimeAllocatedInstances());
worker->ClearLifetimeAllocatedInstances();
}

cluster_resource_scheduler_->GetLocalResourceManager().ReleaseWorkerResources(
allocated_instances);
worker->ClearAllocatedInstances();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is theoretically different from previous logics (we called only one of these 2 APIs). Is it guaranteed if allocated_instances is not null lifetime_allocated_instances is null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup

worker->ClearLifetimeAllocatedInstances();
}

bool LocalTaskManager::ReleaseCpuResourcesFromBlockedWorker(
Expand All @@ -975,37 +996,59 @@ bool LocalTaskManager::ReleaseCpuResourcesFromBlockedWorker(
return false;
}

bool cpu_resources_released = false;
if (worker->GetAllocatedInstances() != nullptr) {
if (worker->GetAllocatedInstances()->Has(ResourceID::CPU())) {
auto cpu_instances = worker->GetAllocatedInstances()->GetDouble(ResourceID::CPU());
cluster_resource_scheduler_->GetLocalResourceManager().AddResourceInstances(
ResourceID::CPU(), cpu_instances);
worker->MarkBlocked();
return true;
for (const auto resource_id : worker->GetAllocatedInstances()->ResourceIds()) {
if (IsCPUOrPlacementGroupCPUResource(resource_id)) {
auto cpu_instances = worker->GetAllocatedInstances()->GetDouble(resource_id);
cluster_resource_scheduler_->GetLocalResourceManager().AddResourceInstances(
resource_id, cpu_instances);
cpu_resources_released = true;

// Cannot break since we need to release
// both PG wildcard and indexed CPU resources.
}
}
}

return false;
if (cpu_resources_released) {
worker->MarkBlocked();
return true;
} else {
return false;
}
}

bool LocalTaskManager::ReturnCpuResourcesToUnblockedWorker(
std::shared_ptr<WorkerInterface> worker) {
if (!worker || !worker->IsBlocked()) {
return false;
}

bool cpu_resources_returned = false;
if (worker->GetAllocatedInstances() != nullptr) {
if (worker->GetAllocatedInstances()->Has(ResourceID::CPU())) {
auto cpu_instances = worker->GetAllocatedInstances()->GetDouble(ResourceID::CPU());
// Important: we allow going negative here, since otherwise you can use infinite
// CPU resources by repeatedly blocking / unblocking a task. By allowing it to go
// negative, at most one task can "borrow" this worker's resources.
cluster_resource_scheduler_->GetLocalResourceManager().SubtractResourceInstances(
ResourceID::CPU(), cpu_instances, /*allow_going_negative=*/true);
worker->MarkUnblocked();
return true;
for (const auto resource_id : worker->GetAllocatedInstances()->ResourceIds()) {
if (IsCPUOrPlacementGroupCPUResource(resource_id)) {
auto cpu_instances = worker->GetAllocatedInstances()->GetDouble(resource_id);
// Important: we allow going negative here, since otherwise you can use infinite
// CPU resources by repeatedly blocking / unblocking a task. By allowing it to go
// negative, at most one task can "borrow" this worker's resources.
cluster_resource_scheduler_->GetLocalResourceManager().SubtractResourceInstances(
resource_id, cpu_instances, /*allow_going_negative=*/true);
cpu_resources_returned = true;

// Cannot break since we need to return
// both PG wildcard and indexed CPU resources.
}
}
}
return false;

if (cpu_resources_returned) {
worker->MarkUnblocked();
return true;
} else {
return false;
}
}

ResourceSet LocalTaskManager::CalcNormalTaskResources() const {
Expand All @@ -1028,7 +1071,11 @@ ResourceSet LocalTaskManager::CalcNormalTaskResources() const {
auto resource_set = allocated_instances->ToResourceSet();
// Blocked normal task workers have temporarily released its allocated CPU.
if (worker->IsBlocked()) {
resource_set.Set(ResourceID::CPU(), 0);
for (const auto resource_id : allocated_instances->ResourceIds()) {
if (IsCPUOrPlacementGroupCPUResource(resource_id)) {
resource_set.Set(resource_id, 0);
}
}
}
total_normal_task_resources += resource_set;
}
Expand Down