Skip to content

Commit

Permalink
[Core] Better propogate node death information (#45128)
Browse files Browse the repository at this point in the history
- Add reason_message to NodeDeathInfo so that we track more verbose node death info
- Introduce UnregisterNode PRC in GCS and use it instead of DrainNode RPC wherever appropriate

Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
  • Loading branch information
ruisearch42 committed May 15, 2024
1 parent cf7a09d commit a475ffe
Show file tree
Hide file tree
Showing 40 changed files with 439 additions and 209 deletions.
1 change: 0 additions & 1 deletion BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,6 @@ ray_cc_library(
),
hdrs = glob(
[
"src/ray/raylet/raylet_util.h",
"src/ray/raylet/scheduling/**/*.h",
"src/ray/core_worker/common.h",
],
Expand Down
22 changes: 20 additions & 2 deletions dashboard/tests/test_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
wait_until_server_available,
wait_until_succeeded_without_exception,
)
from ray.core.generated import gcs_pb2
import ray.scripts.scripts as scripts
from ray.dashboard import dashboard
from ray.dashboard.head import DashboardHead
Expand Down Expand Up @@ -197,7 +198,7 @@ def test_raylet_and_agent_share_fate(shutdown_only):

ray.shutdown()

ray.init()
ray_context = ray.init()
all_processes = ray._private.worker._global_node.all_processes
raylet_proc_info = all_processes[ray_constants.PROCESS_TYPE_RAYLET][0]
raylet_proc = psutil.Process(raylet_proc_info.process.pid)
Expand All @@ -212,6 +213,19 @@ def test_raylet_and_agent_share_fate(shutdown_only):
agent_proc.wait()
raylet_proc.wait(15)

worker_node_id = ray_context.address_info["node_id"]
worker_node_info = [
node for node in ray.nodes() if node["NodeID"] == worker_node_id
][0]
assert not worker_node_info["Alive"]
assert worker_node_info["DeathReason"] == gcs_pb2.NodeDeathInfo.Reason.Value(
"UNEXPECTED_TERMINATION"
)
assert (
"failed and raylet fate-shares with it."
in worker_node_info["DeathReasonMessage"]
)


@pytest.mark.parametrize("parent_health_check_by_pipe", [True, False])
def test_agent_report_unexpected_raylet_death(
Expand Down Expand Up @@ -940,7 +954,11 @@ def test_dashboard_port_conflict(ray_start_with_dashboard):
os.environ.get("RAY_MINIMAL") == "1",
reason="This test is not supposed to work for minimal installation.",
)
def test_gcs_check_alive(fast_gcs_failure_detection, ray_start_with_dashboard):
def test_gcs_check_alive(
fast_gcs_failure_detection, ray_start_with_dashboard, call_ray_stop_only
):
# call_ray_stop_only is used to ensure a clean environment (especially
# killing dashboard agent in time) before the next test runs.
assert wait_until_server_available(ray_start_with_dashboard["webui_url"]) is True

all_processes = ray._private.worker._global_node.all_processes
Expand Down
5 changes: 5 additions & 0 deletions python/ray/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,10 @@ cdef extern from "src/ray/protobuf/gcs.pb.h" nogil:
c_string ray_namespace() const
const c_string &SerializeAsString()

cdef cppclass CNodeDeathInfo "ray::rpc::NodeDeathInfo":
int reason() const
c_string reason_message() const

cdef cppclass CGcsNodeInfo "ray::rpc::GcsNodeInfo":
c_string node_id() const
c_string node_name() const
Expand All @@ -520,6 +524,7 @@ cdef extern from "src/ray/protobuf/gcs.pb.h" nogil:
c_string raylet_socket_name() const
int metrics_export_port() const
int runtime_env_agent_port() const
CNodeDeathInfo death_info() const
void ParseFromString(const c_string &serialized)

cdef enum CGcsNodeState "ray::rpc::GcsNodeInfo_GcsNodeState":
Expand Down
3 changes: 3 additions & 0 deletions python/ray/includes/global_state_accessor.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ cdef class GlobalStateAccessor:
"MetricsExportPort": c_node_info.metrics_export_port(),
"NodeName": c_node_info.node_name().decode(),
"RuntimeEnvAgentPort": c_node_info.runtime_env_agent_port(),
"DeathReason": c_node_info.death_info().reason(),
"DeathReasonMessage":
c_node_info.death_info().reason_message().decode(),
}
node_info["alive"] = node_info["Alive"]
c_resources = PythonGetResourcesTotal(c_node_info)
Expand Down
1 change: 1 addition & 0 deletions python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ py_test_module_list(
"test_metrics_agent_2.py",
"test_microbenchmarks.py",
"test_mini.py",
"test_node_death.py",
"test_numba.py",
"test_raylet_output.py",
"test_top_level_api.py",
Expand Down
14 changes: 13 additions & 1 deletion python/ray/tests/test_draining.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import ray
import time
from ray._raylet import GcsClient
from ray.core.generated import autoscaler_pb2
from ray.core.generated import autoscaler_pb2, gcs_pb2
from ray._private.test_utils import wait_for_condition
from ray.util.scheduling_strategies import (
NodeAffinitySchedulingStrategy,
Expand Down Expand Up @@ -72,6 +72,12 @@ def drain_until_accept():
== {head_node_id}
)

worker_node = [node for node in ray.nodes() if node["NodeID"] == worker_node_id][0]
assert worker_node["DeathReason"] == gcs_pb2.NodeDeathInfo.Reason.Value(
"AUTOSCALER_DRAIN_IDLE"
)
assert worker_node["DeathReasonMessage"] == "idle for long enough"

# Draining a dead node is always accepted.
is_accepted, _ = gcs_client.drain_node(
worker_node_id,
Expand Down Expand Up @@ -138,6 +144,12 @@ def ping(self):
== {head_node_id}
)

worker_node = [node for node in ray.nodes() if node["NodeID"] == worker_node_id][0]
assert worker_node["DeathReason"] == gcs_pb2.NodeDeathInfo.Reason.Value(
"AUTOSCALER_DRAIN_PREEMPTED"
)
assert worker_node["DeathReasonMessage"] == "preemption"


def test_scheduling_placement_groups_during_draining(ray_start_cluster):
"""Test that the draining node is unschedulable for new pgs."""
Expand Down
29 changes: 29 additions & 0 deletions python/ray/tests/test_node_death.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import sys
import pytest

import ray

from ray.core.generated import gcs_pb2


def test_normal_termination(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(resources={"head": 1})
ray.init(address=cluster.address)
worker_node = cluster.add_node(resources={"worker": 1})
cluster.wait_for_nodes()
worker_node_id = worker_node.node_id
cluster.remove_node(worker_node)

worker_node_info = [
node for node in ray.nodes() if node["NodeID"] == worker_node_id
][0]
assert not worker_node_info["Alive"]
assert worker_node_info["DeathReason"] == gcs_pb2.NodeDeathInfo.Reason.Value(
"EXPECTED_TERMINATION"
)
assert worker_node_info["DeathReasonMessage"] == "Received SIGTERM"


if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))
16 changes: 15 additions & 1 deletion python/ray/tests/test_runtime_env_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
init_error_pubsub,
wait_for_condition,
)
from ray.core.generated import gcs_pb2
from ray.runtime_env import RuntimeEnv
import psutil

Expand Down Expand Up @@ -139,7 +140,7 @@ def test_raylet_and_agent_share_fate(shutdown_only):

ray.shutdown()

ray.init()
ray_context = ray.init()
all_processes = ray._private.worker._global_node.all_processes
raylet_proc_info = all_processes[ray_constants.PROCESS_TYPE_RAYLET][0]
raylet_proc = psutil.Process(raylet_proc_info.process.pid)
Expand All @@ -154,6 +155,19 @@ def test_raylet_and_agent_share_fate(shutdown_only):
agent_proc.wait()
raylet_proc.wait(15)

worker_node_id = ray_context.address_info["node_id"]
worker_node_info = [
node for node in ray.nodes() if node["NodeID"] == worker_node_id
][0]
assert not worker_node_info["Alive"]
assert worker_node_info["DeathReason"] == gcs_pb2.NodeDeathInfo.Reason.Value(
"UNEXPECTED_TERMINATION"
)
assert (
"failed and raylet fate-shares with it."
in worker_node_info["DeathReasonMessage"]
)


@pytest.mark.skipif(sys.platform == "win32", reason="no fate sharing for windows")
def test_agent_report_unexpected_raylet_death(shutdown_only):
Expand Down
1 change: 0 additions & 1 deletion src/mock/ray/gcs/gcs_client/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ class MockNodeInfoAccessor : public NodeInfoAccessor {
RegisterSelf,
(const rpc::GcsNodeInfo &local_node_info, const StatusCallback &callback),
(override));
MOCK_METHOD(Status, DrainSelf, (), (override));
MOCK_METHOD(const NodeID &, GetSelfId, (), (const, override));
MOCK_METHOD(const rpc::GcsNodeInfo &, GetSelfInfo, (), (const, override));
MOCK_METHOD(Status,
Expand Down
27 changes: 15 additions & 12 deletions src/ray/gcs/gcs_client/accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -441,27 +441,30 @@ Status NodeInfoAccessor::RegisterSelf(const GcsNodeInfo &local_node_info,
return Status::OK();
}

Status NodeInfoAccessor::DrainSelf() {
void NodeInfoAccessor::UnregisterSelf(const rpc::NodeDeathInfo &node_death_info,
std::function<void()> unregister_done_callback) {
if (local_node_id_.IsNil()) {
RAY_LOG(INFO) << "The node is already drained.";
// This node is already drained.
return Status::OK();
RAY_LOG(INFO) << "The node is already unregistered.";
return;
}
NodeID node_id = NodeID::FromBinary(local_node_info_.node_id());
RAY_LOG(INFO) << "Unregistering node info, node id = " << node_id;
rpc::DrainNodeRequest request;
auto draining_request = request.add_drain_node_data();
draining_request->set_node_id(local_node_info_.node_id());
client_impl_->GetGcsRpcClient().DrainNode(
request, [this, node_id](const Status &status, const rpc::DrainNodeReply &reply) {
auto node_id = NodeID::FromBinary(local_node_info_.node_id());
RAY_LOG(INFO) << "Unregistering node, node id = " << node_id;

rpc::UnregisterNodeRequest request;
request.set_node_id(local_node_info_.node_id());
request.mutable_node_death_info()->CopyFrom(node_death_info);
client_impl_->GetGcsRpcClient().UnregisterNode(
request,
[this, node_id, unregister_done_callback](const Status &status,
const rpc::UnregisterNodeReply &reply) {
if (status.ok()) {
local_node_info_.set_state(GcsNodeInfo::DEAD);
local_node_id_ = NodeID::Nil();
}
RAY_LOG(INFO) << "Finished unregistering node info, status = " << status
<< ", node id = " << node_id;
unregister_done_callback();
});
return Status::OK();
}

const NodeID &NodeInfoAccessor::GetSelfId() const { return local_node_id_; }
Expand Down
12 changes: 6 additions & 6 deletions src/ray/gcs/gcs_client/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -286,13 +286,13 @@ class NodeInfoAccessor {
virtual Status RegisterSelf(const rpc::GcsNodeInfo &local_node_info,
const StatusCallback &callback);

/// Drain (remove the information of the node from the cluster) the local node from GCS
/// synchronously.
/// Unregister local node to GCS asynchronously.
///
/// Once the RPC is replied, it is guaranteed that GCS drains the information of the
/// local node, and all the nodes in the cluster will "eventually" be informed that the
/// node is drained. \return Status
virtual Status DrainSelf();
/// \param node_death_info The death information regarding why to unregister from GCS.
/// \param unregister_done_callback Callback that will be called when unregistration is
/// done.
virtual void UnregisterSelf(const rpc::NodeDeathInfo &node_death_info,
std::function<void()> unregister_done_callback);

/// Get id of local node which was registered by 'RegisterSelf'.
///
Expand Down
52 changes: 39 additions & 13 deletions src/ray/gcs/gcs_client/test/gcs_client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -321,18 +321,18 @@ class GcsClientTest : public ::testing::TestWithParam<bool> {
return status.ok();
}

bool DrainSelf() {
Status status = gcs_client_->Nodes().DrainSelf();
return status.ok();
}

bool RegisterNode(const rpc::GcsNodeInfo &node_info) {
std::promise<bool> promise;
RAY_CHECK_OK(gcs_client_->Nodes().AsyncRegister(
node_info, [&promise](Status status) { promise.set_value(status.ok()); }));
return WaitReady(promise.get_future(), timeout_ms_);
}

void UnregisterSelf(const rpc::NodeDeathInfo &node_death_info,
std::function<void()> unregister_done_callback) {
gcs_client_->Nodes().UnregisterSelf(node_death_info, unregister_done_callback);
}

std::vector<rpc::GcsNodeInfo> GetNodeInfoList() {
std::promise<bool> promise;
std::vector<rpc::GcsNodeInfo> nodes;
Expand Down Expand Up @@ -552,23 +552,49 @@ TEST_P(GcsClientTest, TestNodeInfo) {
ASSERT_TRUE(gcs_client_->Nodes().Get(node1_id));
EXPECT_EQ(gcs_client_->Nodes().GetAll().size(), 2);

// Cancel registration of local node to GCS.
ASSERT_TRUE(DrainSelf());

// Cancel registration of a node to GCS.
// Cancel registration of both nodes to GCS.
ASSERT_TRUE(DrainNode(node1_id));
ASSERT_TRUE(DrainNode(node2_id));
WaitForExpectedCount(unregister_count, 2);

// Get information of all nodes from GCS.
node_list = GetNodeInfoList();
EXPECT_EQ(node_list.size(), 2);
EXPECT_EQ(node_list[0].state(),
rpc::GcsNodeInfo_GcsNodeState::GcsNodeInfo_GcsNodeState_DEAD);
EXPECT_EQ(node_list[1].state(),
rpc::GcsNodeInfo_GcsNodeState::GcsNodeInfo_GcsNodeState_DEAD);
EXPECT_EQ(node_list[0].state(), rpc::GcsNodeInfo::DEAD);
EXPECT_EQ(node_list[1].state(), rpc::GcsNodeInfo::DEAD);
ASSERT_TRUE(gcs_client_->Nodes().IsRemoved(node1_id));
ASSERT_TRUE(gcs_client_->Nodes().IsRemoved(node2_id));
}

TEST_P(GcsClientTest, TestUnregisterNode) {
// Create gcs node info.
auto gcs_node_info = Mocker::GenNodeInfo();
NodeID node_id = NodeID::FromBinary(gcs_node_info->node_id());

// Register local node to GCS.
ASSERT_TRUE(RegisterSelf(*gcs_node_info));
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
EXPECT_EQ(gcs_client_->Nodes().GetSelfId(), node_id);
EXPECT_EQ(gcs_client_->Nodes().GetSelfInfo().node_id(), gcs_node_info->node_id());
EXPECT_EQ(gcs_client_->Nodes().GetSelfInfo().state(), gcs_node_info->state());

// Unregister local node from GCS.
rpc::NodeDeathInfo node_death_info;
node_death_info.set_reason(rpc::NodeDeathInfo::EXPECTED_TERMINATION);
auto reason_message = "Testing unregister node from GCS.";
node_death_info.set_reason_message(reason_message);

std::promise<bool> promise;
UnregisterSelf(node_death_info, [&promise]() { promise.set_value(true); });
WaitReady(promise.get_future(), timeout_ms_);

auto node_list = GetNodeInfoList();
EXPECT_EQ(node_list.size(), 1);
EXPECT_EQ(node_list[0].state(), rpc::GcsNodeInfo::DEAD);
EXPECT_EQ(node_list[0].death_info().reason(), rpc::NodeDeathInfo::EXPECTED_TERMINATION);
EXPECT_EQ(node_list[0].death_info().reason_message(), reason_message);
}

TEST_P(GcsClientTest, TestGetAllAvailableResources) {
// Register node.
auto node_info = Mocker::GenNodeInfo();
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ void GcsAutoscalerStateManager::HandleDrainNode(
} else {
death_info->set_reason(rpc::NodeDeathInfo::AUTOSCALER_DRAIN_IDLE);
}
death_info->set_reason_message(request.reason_message());
if (RayConfig::instance().enable_reap_actor_death()) {
gcs_actor_manager_.SetPreemptedAndPublish(node_id);
}
Expand Down
Loading

0 comments on commit a475ffe

Please sign in to comment.