Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[scheduler][autoscaler] Report placement resources for actor creation tasks #26813

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 31 additions & 0 deletions python/ray/tests/test_autoscaler_fake_multinode.py
Expand Up @@ -55,6 +55,37 @@ def g():
# __example_end__


def test_zero_cpu_default_actor():
cluster = AutoscalingCluster(
head_resources={"CPU": 0},
worker_node_types={
"cpu_node": {
"resources": {
"CPU": 1,
},
"node_config": {},
"min_workers": 0,
"max_workers": 1,
},
},
)

try:
cluster.start()
ray.init("auto")

@ray.remote
class Actor:
def ping(self):
pass

actor = Actor.remote()
ray.get(actor.ping.remote())
ray.shutdown()
finally:
cluster.shutdown()


if __name__ == "__main__":
import os
import sys
Expand Down
2 changes: 1 addition & 1 deletion python/ray/tests/test_failure_2.py
Expand Up @@ -31,7 +31,7 @@ def test_warning_for_too_many_actors(shutdown_only):

p = init_error_pubsub()

@ray.remote
@ray.remote(num_cpus=0)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it appears that this test needed to be changed, because with the fixed scheduling class, these actors are getting hit by worker capping now.

class Foo:
def __init__(self):
time.sleep(1000)
Expand Down
48 changes: 48 additions & 0 deletions python/ray/tests/test_global_state.py
Expand Up @@ -378,6 +378,54 @@ def backlog_size_set():
global_state_accessor.disconnect()


def test_default_load_reports(shutdown_only):
"""Despite the fact that default actors release their cpu after being
wuisawesome marked this conversation as resolved.
Show resolved Hide resolved
placed, they should still require 1 CPU for laod reporting purposes.
https://github.com/ray-project/ray/issues/26806
"""
cluster = ray.init(
num_cpus=0,
)

global_state_accessor = make_global_state_accessor(cluster)

@ray.remote
def foo():
return None

@ray.remote
class Foo:
pass

def actor_and_task_queued_together():
message = global_state_accessor.get_all_resource_usage()
if message is None:
return False

resource_usage = gcs_utils.ResourceUsageBatchData.FromString(message)
aggregate_resource_load = resource_usage.resource_load_by_shape.resource_demands
print(f"Num shapes {len(aggregate_resource_load)}")
if len(aggregate_resource_load) == 1:
num_infeasible = aggregate_resource_load[0].num_infeasible_requests_queued
print(f"num in shape {num_infeasible}")
# Ideally we'd want to assert backlog_size == 8, but guaranteeing
# the order the order that submissions will occur is too
# hard/flaky.
return num_infeasible == 2
return False

# Assign to variables to keep the ref counter happy.
handle = Foo.remote()
ref = foo.remote()

wait_for_condition(actor_and_task_queued_together, timeout=2)
global_state_accessor.disconnect()

# Do something with the variables so lint is happy.
del handle
del ref


def test_heartbeat_ip(shutdown_only):
cluster = ray.init(num_cpus=1)
global_state_accessor = make_global_state_accessor(cluster)
Expand Down
7 changes: 7 additions & 0 deletions src/ray/common/ray_config_def.h
Expand Up @@ -92,6 +92,13 @@ RAY_CONFIG(uint64_t, raylet_get_agent_info_interval_ms, 1)
/// handler is drifting.
RAY_CONFIG(uint64_t, num_resource_report_periods_warning, 5)

/// Whether to report placement or regular resource usage for an actor.
/// Reporting placement may cause the autoscaler to overestimate the resources
/// required of the cluster, but reporting regular resource may lead to no
/// autoscaling when an actor can't be placed.
/// https://github.com/ray-project/ray/issues/26806
RAY_CONFIG(bool, report_actor_placement_resources, true)

/// Whether to record the creation sites of object references. This adds more
/// information to `ray memory`, but introduces a little extra overhead when
/// creating object references (e.g. 5~10 microsec per call in Python).
Expand Down
8 changes: 7 additions & 1 deletion src/ray/common/task/task_spec.cc
Expand Up @@ -113,7 +113,13 @@ void TaskSpecification::ComputeResources() {
if (!IsActorTask()) {
// There is no need to compute `SchedulingClass` for actor tasks since
// the actor tasks need not be scheduled.
const auto &resource_set = GetRequiredResources();
const bool is_actor_creation_task = IsActorCreationTask();
const bool should_report_placement_resources =
RayConfig::instance().report_actor_placement_resources();
const auto &resource_set =
(is_actor_creation_task && should_report_placement_resources)
? GetRequiredPlacementResources()
: GetRequiredResources();
const auto &function_descriptor = FunctionDescriptor();
auto depth = GetDepth();
auto sched_cls_desc = SchedulingClassDescriptor(
Expand Down
25 changes: 24 additions & 1 deletion src/ray/common/test/task_spec_test.cc
Expand Up @@ -100,6 +100,29 @@ TEST(TaskSpecTest, TestSchedulingClassDescriptor) {
TaskSpecification::GetSchedulingClass(descriptor9));
}

TEST(TaskSpecTest, TestActorSchedulingClass) {
// This test ensures that an actor's lease request's scheduling class is
// determined by the placement resources, not the regular resources.

const std::unordered_map<std::string, double> one_cpu = {{"CPU", 1}};

rpc::TaskSpec actor_task_spec_proto;
actor_task_spec_proto.set_type(TaskType::ACTOR_CREATION_TASK);
actor_task_spec_proto.mutable_required_placement_resources()->insert(one_cpu.begin(),
one_cpu.end());

TaskSpecification actor_task(actor_task_spec_proto);

rpc::TaskSpec regular_task_spec_proto;
regular_task_spec_proto.set_type(TaskType::NORMAL_TASK);
regular_task_spec_proto.mutable_required_resources()->insert(one_cpu.begin(),
one_cpu.end());

TaskSpecification regular_task(regular_task_spec_proto);

ASSERT_EQ(regular_task.GetSchedulingClass(), actor_task.GetSchedulingClass());
}

TEST(TaskSpecTest, TestTaskSpecification) {
rpc::SchedulingStrategy scheduling_strategy;
NodeID node_id = NodeID::FromRandom();
Expand All @@ -118,4 +141,4 @@ TEST(TaskSpecTest, TestTaskSpecification) {
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
}
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/gcs_resource_manager.h
Expand Up @@ -73,7 +73,7 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler,
rpc::GetAllAvailableResourcesReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

/// Handle report resource usage rpc come from raylet.
/// Handle report resource usage rpc from a raylet.
void HandleReportResourceUsage(const rpc::ReportResourceUsageRequest &request,
rpc::ReportResourceUsageReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
Expand Down