Skip to content

Commit

Permalink
[core] Fix empty resource update to GCS resource manager. (#39648) (#…
Browse files Browse the repository at this point in the history
…39664)

* [core] Fix empty resource update to GCS resource manager.  (#39648)


---------

Signed-off-by: rickyyx <rickyx@anyscale.com>

* update

Signed-off-by: rickyyx <rickyx@anyscale.com>

---------

Signed-off-by: rickyyx <rickyx@anyscale.com>
  • Loading branch information
rickyyx committed Sep 14, 2023
1 parent 8cfb98a commit b4bba47
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 43 deletions.
6 changes: 6 additions & 0 deletions python/ray/autoscaler/v2/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@ class ClusterStatus:
# Query metics
stats: Stats = field(default_factory=Stats)

def total_resources(self) -> Dict[str, float]:
return {r.resource_name: r.total for r in self.cluster_resource_usage}

def available_resources(self) -> Dict[str, float]:
return {r.resource_name: r.total - r.used for r in self.cluster_resource_usage}

# TODO(rickyx): we don't show infeasible requests as of now.
# (They will just be pending forever as part of the demands)
# We should show them properly in the future.
74 changes: 74 additions & 0 deletions python/ray/autoscaler/v2/tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,80 @@ def verify():
cluster.shutdown()


def test_non_corrupted_resources():
"""
Test that when node's local gc happens due to object store pressure,
the message doesn't corrupt the resource view on the gcs.
See issue https://github.com/ray-project/ray/issues/39644
"""
num_worker_nodes = 5
cluster = AutoscalingCluster(
head_resources={"CPU": 2, "object_store_memory": 100 * 1024 * 1024},
worker_node_types={
"type-1": {
"resources": {"CPU": 2},
"node_config": {},
"min_workers": num_worker_nodes,
"max_workers": num_worker_nodes,
},
},
)

driver_script = """
import ray
import time
ray.init("auto")
@ray.remote(num_cpus=1)
def foo():
ray.put(bytearray(1024*1024* 50))
while True:
ray.get([foo.remote() for _ in range(50)])
"""

try:
# This should trigger many COMMANDS messages from NodeManager.
cluster.start(
_system_config={
"debug_dump_period_milliseconds": 10,
"raylet_report_resources_period_milliseconds": 10000,
"global_gc_min_interval_s": 1,
"local_gc_interval_s": 1,
"high_plasma_storage_usage": 0.2,
"raylet_check_gc_period_milliseconds": 10,
},
)
ray.init("auto")

from ray.autoscaler.v2.sdk import get_cluster_status

def nodes_up():
cluster_state = get_cluster_status()
return len(cluster_state.healthy_nodes) == num_worker_nodes + 1

wait_for_condition(nodes_up)

# Schedule tasks
run_string_as_driver_nonblocking(driver_script)
start = time.time()

# Check the cluster state for 10 seconds
while time.time() - start < 10:
cluster_state = get_cluster_status()

# Verify total cluster resources never change
assert len((cluster_state.healthy_nodes)) == num_worker_nodes + 1
assert cluster_state.total_resources()["CPU"] == 2 * (num_worker_nodes + 1)

finally:
ray.shutdown()
cluster.shutdown()


if __name__ == "__main__":
if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))
Expand Down
6 changes: 6 additions & 0 deletions python/ray/cluster_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ def start(self, _system_config=None, override_env: Optional[Dict] = None):
cmd.append("--num-cpus={}".format(self._head_resources.pop("CPU")))
if "GPU" in self._head_resources:
cmd.append("--num-gpus={}".format(self._head_resources.pop("GPU")))
if "object_store_memory" in self._head_resources:
cmd.append(
"--object-store-memory={}".format(
self._head_resources.pop("object_store_memory")
)
)
if self._head_resources:
cmd.append("--resources='{}'".format(json.dumps(self._head_resources)))
if _system_config is not None:
Expand Down
45 changes: 31 additions & 14 deletions src/ray/gcs/gcs_server/gcs_resource_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,13 @@ void GcsResourceManager::ConsumeSyncMessage(
rpc::ResourcesData resources;
resources.ParseFromString(message->sync_message());
resources.set_node_id(message->node_id());
UpdateFromResourceReport(resources);
if (message->message_type() == syncer::MessageType::COMMANDS) {
UpdateFromResourceCommand(resources);
} else if (message->message_type() == syncer::MessageType::RESOURCE_VIEW) {
UpdateFromResourceView(resources);
} else {
RAY_LOG(FATAL) << "Unsupported message type: " << message->message_type();
}
},
"GcsResourceManager::Update");
}
Expand Down Expand Up @@ -124,7 +130,7 @@ void GcsResourceManager::HandleGetAllAvailableResources(
++counts_[CountType::GET_ALL_AVAILABLE_RESOURCES_REQUEST];
}

void GcsResourceManager::UpdateFromResourceReport(const rpc::ResourcesData &data) {
void GcsResourceManager::UpdateFromResourceView(const rpc::ResourcesData &data) {
NodeID node_id = NodeID::FromBinary(data.node_id());
// When gcs detects task pending, we may receive an local update. But it can be ignored
// here because gcs' syncer has already broadcast it.
Expand All @@ -134,10 +140,11 @@ void GcsResourceManager::UpdateFromResourceReport(const rpc::ResourcesData &data
if (RayConfig::instance().gcs_actor_scheduling_enabled()) {
UpdateNodeNormalTaskResources(node_id, data);
} else {
// We will only update the node's resources if it's from resource view reports.
if (!cluster_resource_manager_.UpdateNode(scheduling::NodeID(node_id.Binary()),
data)) {
RAY_LOG(INFO)
<< "[UpdateFromResourceReport]: received resource usage from unknown node id "
<< "[UpdateFromResourceView]: received resource usage from unknown node id "
<< node_id;
}
}
Expand Down Expand Up @@ -167,7 +174,7 @@ void GcsResourceManager::HandleReportResourceUsage(
rpc::ReportResourceUsageRequest request,
rpc::ReportResourceUsageReply *reply,
rpc::SendReplyCallback send_reply_callback) {
UpdateFromResourceReport(request.resources());
UpdateFromResourceView(request.resources());

GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
++counts_[CountType::REPORT_RESOURCE_USAGE_REQUEST];
Expand Down Expand Up @@ -260,6 +267,19 @@ void GcsResourceManager::HandleGetAllResourceUsage(
++counts_[CountType::GET_ALL_RESOURCE_USAGE_REQUEST];
}

void GcsResourceManager::UpdateFromResourceCommand(const rpc::ResourcesData &data) {
const auto node_id = NodeID::FromBinary(data.node_id());
auto iter = node_resource_usages_.find(node_id);
if (iter == node_resource_usages_.end()) {
return;
}

// TODO(rickyx): We should change this to be part of RESOURCE_VIEW.
// This is being populated from NodeManager as part of COMMANDS
iter->second.set_cluster_full_of_actors_detected(
data.cluster_full_of_actors_detected());
}

void GcsResourceManager::UpdateNodeResourceUsage(const NodeID &node_id,
const rpc::ResourcesData &resources) {
auto iter = node_resource_usages_.find(node_id);
Expand All @@ -268,18 +288,15 @@ void GcsResourceManager::UpdateNodeResourceUsage(const NodeID &node_id,
// If the node is not registered to GCS,
// we are guaranteed that no resource usage will be reported.
return;
} else {
if (resources.resources_total_size() > 0) {
(*iter->second.mutable_resources_total()) = resources.resources_total();
}
}
if (resources.resources_total_size() > 0) {
(*iter->second.mutable_resources_total()) = resources.resources_total();
}

(*iter->second.mutable_resources_available()) = resources.resources_available();
(*iter->second.mutable_resources_available()) = resources.resources_available();

if (resources.resources_normal_task_changed()) {
(*iter->second.mutable_resources_normal_task()) = resources.resources_normal_task();
}
iter->second.set_cluster_full_of_actors_detected(
resources.cluster_full_of_actors_detected());
if (resources.resources_normal_task_changed()) {
(*iter->second.mutable_resources_normal_task()) = resources.resources_normal_task();
}
}

Expand Down
16 changes: 14 additions & 2 deletions src/ray/gcs/gcs_server/gcs_resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,25 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler,
/// Update resource usage of given node.
///
/// \param node_id Node id.
/// \param request Request containing resource usage.
/// \param resources The resource usage of the node.
/// \param from_resource_view Whether the resource report is from resource view, i.e.
/// syncer::MessageType::RESOURCE_VIEW.
void UpdateNodeResourceUsage(const NodeID &node_id,
const rpc::ResourcesData &resources);

/// Process a new resource report from a node, independent of the rpc handler it came
/// from.
void UpdateFromResourceReport(const rpc::ResourcesData &data);
///
/// \param data The resource report.
/// \param from_resource_view Whether the resource report is from resource view, i.e.
/// syncer::MessageType::RESOURCE_VIEW.
void UpdateFromResourceView(const rpc::ResourcesData &data);

/// Update the resource usage of a node from syncer COMMANDS
///
/// This is currently used for setting cluster full of actors info from syncer.
/// \param data The resource report.
void UpdateFromResourceCommand(const rpc::ResourcesData &data);

/// Update the placement group load information so that it will be reported through
/// heartbeat.
Expand Down
28 changes: 14 additions & 14 deletions src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class GcsAutoscalerStateManagerTest : public ::testing::Test {
return reply.is_accepted();
}

void UpdateFromResourceReportSync(
void UpdateFromResourceViewSync(
const NodeID &node_id,
const absl::flat_hash_map<std::string, double> &available_resources,
const absl::flat_hash_map<std::string, double> &total_resources,
Expand All @@ -159,7 +159,7 @@ class GcsAutoscalerStateManagerTest : public ::testing::Test {
total_resources,
idle_ms,
is_draining);
gcs_resource_manager_->UpdateFromResourceReport(resources_data);
gcs_resource_manager_->UpdateFromResourceView(resources_data);
}

rpc::autoscaler::GetClusterStatusReply GetClusterStatusSync() {
Expand Down Expand Up @@ -364,9 +364,9 @@ TEST_F(GcsAutoscalerStateManagerTest, TestNodeAddUpdateRemove) {

// Update available resources.
{
UpdateFromResourceReportSync(NodeID::FromBinary(node->node_id()),
{/* available */ {"CPU", 1.75}},
/* total*/ {{"CPU", 2}, {"GPU", 1}});
UpdateFromResourceViewSync(NodeID::FromBinary(node->node_id()),
{/* available */ {"CPU", 1.75}},
/* total*/ {{"CPU", 2}, {"GPU", 1}});

const auto &state = GetClusterResourceStateSync();
ASSERT_EQ(state.node_states_size(), 1);
Expand Down Expand Up @@ -721,11 +721,11 @@ TEST_F(GcsAutoscalerStateManagerTest, TestDrainingStatus) {
}

// Report draining info.
UpdateFromResourceReportSync(NodeID::FromBinary(node->node_id()),
{/* available */ {"CPU", 2}, {"GPU", 1}},
/* total*/ {{"CPU", 2}, {"GPU", 1}},
/* idle_duration_ms */ 10,
/* is_draining */ true);
UpdateFromResourceViewSync(NodeID::FromBinary(node->node_id()),
{/* available */ {"CPU", 2}, {"GPU", 1}},
/* total*/ {{"CPU", 2}, {"GPU", 1}},
/* idle_duration_ms */ 10,
/* is_draining */ true);
{
const auto &state = GetClusterResourceStateSync();
ASSERT_EQ(state.node_states(0).status(), rpc::autoscaler::NodeStatus::DRAINING);
Expand Down Expand Up @@ -758,10 +758,10 @@ TEST_F(GcsAutoscalerStateManagerTest, TestIdleTime) {
}

// Report idle node info.
UpdateFromResourceReportSync(NodeID::FromBinary(node->node_id()),
{/* available */ {"CPU", 2}, {"GPU", 1}},
/* total*/ {{"CPU", 2}, {"GPU", 1}},
/* idle_duration_ms */ 10);
UpdateFromResourceViewSync(NodeID::FromBinary(node->node_id()),
{/* available */ {"CPU", 2}, {"GPU", 1}},
/* total*/ {{"CPU", 2}, {"GPU", 1}},
/* idle_duration_ms */ 10);

// Check report idle time is set.
{
Expand Down

0 comments on commit b4bba47

Please sign in to comment.