Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Better propogate node death information #45128

Merged
merged 44 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
aef1757
[Core] Better propogate node death information
ruisearch42 May 2, 2024
5d46d45
WIP: to fix compilation error
ruisearch42 May 4, 2024
4bca620
fix compilation
ruisearch42 May 6, 2024
7297706
cleanup
ruisearch42 May 6, 2024
0889057
format
ruisearch42 May 6, 2024
440d319
update
ruisearch42 May 7, 2024
ddc9491
update
ruisearch42 May 7, 2024
81cdd79
Merge branch 'master' into node_death
ruisearch42 May 8, 2024
9f27865
async
ruisearch42 May 8, 2024
bb60589
async shutdown_gracefully
ruisearch42 May 8, 2024
5b5ad48
fix
ruisearch42 May 8, 2024
96ebaa7
unregister callback
ruisearch42 May 9, 2024
78138aa
fix
ruisearch42 May 9, 2024
96f8399
fix
ruisearch42 May 9, 2024
56ff6be
fix
ruisearch42 May 10, 2024
52d91cb
fix
ruisearch42 May 10, 2024
dd3d285
debug
ruisearch42 May 10, 2024
3347b0a
update
ruisearch42 May 10, 2024
7acc550
debug
ruisearch42 May 10, 2024
ef7d6c4
Merge branch 'master' into node_death
ruisearch42 May 10, 2024
6254ff1
fix
ruisearch42 May 10, 2024
eb15a1a
debug
ruisearch42 May 10, 2024
2b95e7e
fix
ruisearch42 May 10, 2024
b562a90
debug
ruisearch42 May 10, 2024
2230189
Revert "debug"
ruisearch42 May 10, 2024
844419e
Revert "debug"
ruisearch42 May 13, 2024
a832557
fix
ruisearch42 May 13, 2024
fec13c2
cleanup
ruisearch42 May 13, 2024
47a5810
address comments
ruisearch42 May 14, 2024
56d72b0
Merge branch 'master' into node_death
ruisearch42 May 14, 2024
b9a3697
unit test
ruisearch42 May 14, 2024
f31440e
address comments
ruisearch42 May 14, 2024
c6da857
cleanup
ruisearch42 May 14, 2024
8ccfa09
rm raylet_util
ruisearch42 May 14, 2024
20aeff8
update GcsNodeManager::DrainNode
ruisearch42 May 14, 2024
abc8262
fix
ruisearch42 May 14, 2024
d615c3f
Merge branch 'master' into node_death
ruisearch42 May 14, 2024
bbbbace
unit tests
ruisearch42 May 14, 2024
0e8d2ec
format
ruisearch42 May 14, 2024
bd4746d
update
ruisearch42 May 14, 2024
f085851
lint
ruisearch42 May 14, 2024
ffbe09e
fix
ruisearch42 May 14, 2024
290a26f
cleanup
ruisearch42 May 14, 2024
42aa253
fix tests
ruisearch42 May 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion dashboard/tests/test_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,11 @@ def test_dashboard_port_conflict(ray_start_with_dashboard):
os.environ.get("RAY_MINIMAL") == "1",
reason="This test is not supposed to work for minimal installation.",
)
def test_gcs_check_alive(fast_gcs_failure_detection, ray_start_with_dashboard):
def test_gcs_check_alive(
fast_gcs_failure_detection, ray_start_with_dashboard, call_ray_stop_only
):
# call_ray_stop_only is used to ensure a clean environment (especially
# killing dashboard agent in time) before the next test runs.
assert wait_until_server_available(ray_start_with_dashboard["webui_url"]) is True

all_processes = ray._private.worker._global_node.all_processes
Expand Down
26 changes: 26 additions & 0 deletions src/ray/gcs/gcs_client/accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,32 @@ Status NodeInfoAccessor::RegisterSelf(const GcsNodeInfo &local_node_info,
return Status::OK();
}

void NodeInfoAccessor::UnregisterSelf(const rpc::NodeDeathInfo &node_death_info,
std::function<void()> unregister_done_callback) {
if (local_node_id_.IsNil()) {
RAY_LOG(INFO) << "The node is already unregistered.";
return;
}
auto node_id = NodeID::FromBinary(local_node_info_.node_id());
RAY_LOG(INFO) << "Unregistering node, node id = " << node_id;

rpc::UnregisterNodeRequest request;
request.set_node_id(local_node_info_.node_id());
request.mutable_node_death_info()->CopyFrom(node_death_info);
client_impl_->GetGcsRpcClient().UnregisterNode(
request,
[this, node_id, unregister_done_callback](const Status &status,
const rpc::UnregisterNodeReply &reply) {
if (status.ok()) {
local_node_info_.set_state(GcsNodeInfo::DEAD);
local_node_id_ = NodeID::Nil();
}
RAY_LOG(INFO) << "Finished unregistering node info, status = " << status
<< ", node id = " << node_id;
unregister_done_callback();
});
}

Status NodeInfoAccessor::DrainSelf() {
ruisearch42 marked this conversation as resolved.
Show resolved Hide resolved
if (local_node_id_.IsNil()) {
RAY_LOG(INFO) << "The node is already drained.";
Expand Down
9 changes: 9 additions & 0 deletions src/ray/gcs/gcs_client/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,15 @@ class NodeInfoAccessor {
virtual Status RegisterSelf(const rpc::GcsNodeInfo &local_node_info,
const StatusCallback &callback);

/// Unregister local node to GCS synchronously
ruisearch42 marked this conversation as resolved.
Show resolved Hide resolved
///
/// \param node_death_info The death information regarding why to unregister from GCS.
/// \param unregister_done_callback Callback that will be called when unregistration is
/// done.
/// \return Status
ruisearch42 marked this conversation as resolved.
Show resolved Hide resolved
virtual void UnregisterSelf(const rpc::NodeDeathInfo &node_death_info,
std::function<void()> unregister_done_callback);

/// Drain (remove the information of the node from the cluster) the local node from GCS
/// synchronously.
///
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ void GcsAutoscalerStateManager::HandleDrainNode(
} else {
death_info->set_reason(rpc::NodeDeathInfo::AUTOSCALER_DRAIN_IDLE);
}
death_info->set_reason_message(request.reason_message());
if (RayConfig::instance().enable_reap_actor_death()) {
gcs_actor_manager_.SetPreemptedAndPublish(node_id);
}
Expand Down
30 changes: 30 additions & 0 deletions src/ray/gcs/gcs_server/gcs_node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,36 @@ void GcsNodeManager::HandleCheckAlive(rpc::CheckAliveRequest request,
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
}

void GcsNodeManager::HandleUnregisterNode(rpc::UnregisterNodeRequest request,
rpc::UnregisterNodeReply *reply,
rpc::SendReplyCallback send_reply_callback) {
NodeID node_id = NodeID::FromBinary(request.node_id());
RAY_LOG(DEBUG) << "HandleUnregisterNode() for node id = " << node_id;
auto node = RemoveNode(node_id, /* is_intended = */ true);
if (!node) {
RAY_LOG(INFO) << "Node " << node_id << " is already removed";
return;
}

node->set_state(rpc::GcsNodeInfo::DEAD);
jjyao marked this conversation as resolved.
Show resolved Hide resolved
node->mutable_death_info()->CopyFrom(request.node_death_info());
node->set_end_time_ms(current_sys_time_ms());

AddDeadNodeToCache(node);

auto node_info_delta = std::make_shared<rpc::GcsNodeInfo>();
node_info_delta->set_node_id(node->node_id());
node_info_delta->mutable_death_info()->CopyFrom(request.node_death_info());
node_info_delta->set_state(node->state());
node_info_delta->set_end_time_ms(node->end_time_ms());

auto on_put_done = [=](const Status &status) {
RAY_CHECK_OK(gcs_publisher_->PublishNodeInfo(node_id, *node_info_delta, nullptr));
};
RAY_CHECK_OK(gcs_table_storage_->NodeTable().Put(node_id, *node, on_put_done));
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
}

void GcsNodeManager::HandleDrainNode(rpc::DrainNodeRequest request,
ruisearch42 marked this conversation as resolved.
Show resolved Hide resolved
rpc::DrainNodeReply *reply,
rpc::SendReplyCallback send_reply_callback) {
Expand Down
5 changes: 5 additions & 0 deletions src/ray/gcs/gcs_server/gcs_node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
rpc::RegisterNodeReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

/// Handle unregister rpc request come from raylet.
void HandleUnregisterNode(rpc::UnregisterNodeRequest request,
rpc::UnregisterNodeReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

/// Handle unregister rpc request come from raylet.
void HandleDrainNode(rpc::DrainNodeRequest request,
rpc::DrainNodeReply *reply,
Expand Down
2 changes: 2 additions & 0 deletions src/ray/protobuf/gcs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,8 @@ message NodeDeathInfo {
AUTOSCALER_DRAIN_IDLE = 4;
}
Reason reason = 1;
// A message describing the reason for the node death.
string reason_message = 2;
}

message GcsNodeInfo {
Expand Down
12 changes: 12 additions & 0 deletions src/ray/protobuf/gcs_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,16 @@ message RegisterNodeReply {
GcsStatus status = 1;
}

message UnregisterNodeRequest {
// The node id of the node to be unregistered.
bytes node_id = 1;
NodeDeathInfo node_death_info = 2;
}

message UnregisterNodeReply {
GcsStatus status = 1;
}

message GetAllNodeInfoRequest {}

message GetAllNodeInfoReply {
Expand Down Expand Up @@ -661,6 +671,8 @@ service NodeInfoGcsService {
rpc GetClusterId(GetClusterIdRequest) returns (GetClusterIdReply);
// Register a node to GCS Service.
rpc RegisterNode(RegisterNodeRequest) returns (RegisterNodeReply);
// Unregister a node to GCS Service.
rpc UnregisterNode(UnregisterNodeRequest) returns (UnregisterNodeReply);
// Drain a node from GCS Service.
// When the RPC is replied,
// - It is guaranteed that the requested nodes are going to be drained eventually.
Expand Down
6 changes: 5 additions & 1 deletion src/ray/raylet/agent_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,11 @@ void AgentManager::StartAgent() {
"https://docs.ray.io/en/master/ray-observability/user-guides/"
"configure-logging.html#logging-directory-structure.\n"
"- The agent is killed by the OS (e.g., out of memory).";
ShutdownRayletGracefully();
rpc::NodeDeathInfo node_death_info;
node_death_info.set_reason(rpc::NodeDeathInfo::UNEXPECTED_TERMINATION);
node_death_info.set_reason_message(
"Ray agent failed and raylet fate-shares with it.");
ruisearch42 marked this conversation as resolved.
Show resolved Hide resolved
shutdown_raylet_gracefully_(node_death_info);
// If the process is not terminated within 10 seconds, forcefully kill raylet
// itself.
delay_executor_([]() { QuickExit(); }, /*ms*/ 10000);
Expand Down
11 changes: 8 additions & 3 deletions src/ray/raylet/agent_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "ray/common/id.h"
#include "ray/util/process.h"
#include "src/ray/protobuf/gcs.pb.h"

namespace ray {
namespace raylet {
Expand Down Expand Up @@ -54,11 +55,14 @@ class AgentManager {
bool fate_shares;
};

explicit AgentManager(Options options,
DelayExecutorFn delay_executor,
bool start_agent = true /* for test */)
explicit AgentManager(
Options options,
DelayExecutorFn delay_executor,
std::function<void(const rpc::NodeDeathInfo &)> shutdown_raylet_gracefully,
bool start_agent = true /* for test */)
: options_(std::move(options)),
delay_executor_(std::move(delay_executor)),
shutdown_raylet_gracefully_(shutdown_raylet_gracefully),
fate_shares_(options_.fate_shares) {
if (options_.agent_name.empty()) {
RAY_LOG(FATAL) << "AgentManager agent_name must not be empty.";
Expand All @@ -79,6 +83,7 @@ class AgentManager {
const Options options_;
Process process_;
DelayExecutorFn delay_executor_;
std::function<void(const rpc::NodeDeathInfo &)> shutdown_raylet_gracefully_;
// If true, when the agent dies, raylet kills itself.
std::atomic<bool> fate_shares_;
std::unique_ptr<std::thread> monitor_thread_;
Expand Down
74 changes: 51 additions & 23 deletions src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "ray/util/event.h"
#include "ray/util/process.h"
#include "ray/util/subreaper.h"
#include "src/ray/protobuf/gcs.pb.h"

using json = nlohmann::json;

Expand Down Expand Up @@ -224,6 +225,47 @@ int main(int argc, char *argv[]) {
#endif
};

auto shutted_down = std::make_shared<std::atomic<bool>>(false);

auto shutdown_raylet_after_unregistration =
[&main_service, &raylet_socket_name, &raylet, &gcs_client]() {
// We should stop the service and remove the local socket file.
raylet->Stop();
gcs_client->Disconnect();
ray::stats::Shutdown();
main_service.stop();
remove(raylet_socket_name.c_str());
};

// Shut down raylet gracefully, in a synchronous fashion.
// This is an internal method and should only be run on the main_service.
auto _shutdown_raylet_gracefully_sync =
ruisearch42 marked this conversation as resolved.
Show resolved Hide resolved
[&raylet, shutted_down, shutdown_raylet_after_unregistration](
const ray::rpc::NodeDeathInfo &node_death_info) {
// Make the shutdown method idempotent since graceful shutdown can be triggered
// by many places.
if (*shutted_down) {
RAY_LOG(INFO) << "Raylet shutdown already triggered, ignoring this request.";
return;
}
RAY_LOG(INFO) << "Raylet graceful shutdown triggered, reason = "
<< NodeDeathInfo_Reason_Name(node_death_info.reason()) << ", "
<< "reason message = " << node_death_info.reason_message();
RAY_LOG(INFO) << "Shutting down...";
*shutted_down = true;

raylet->UnregisterSelf(node_death_info, shutdown_raylet_after_unregistration);
};

auto shutdown_raylet_gracefully = [&main_service, _shutdown_raylet_gracefully_sync](
const ray::rpc::NodeDeathInfo &node_death_info) {
main_service.post(
[_shutdown_raylet_gracefully_sync, node_death_info]() {
_shutdown_raylet_gracefully_sync(node_death_info);
},
"_shutdown_raylet_gracefully_sync");
};

RAY_CHECK_OK(gcs_client->Nodes().AsyncGetInternalConfig(
[&](::ray::Status status,
const boost::optional<std::string> &stored_raylet_config) {
Expand Down Expand Up @@ -382,7 +424,8 @@ int main(int argc, char *argv[]) {
object_manager_config,
gcs_client,
metrics_export_port,
is_head_node);
is_head_node,
shutdown_raylet_gracefully);

// Initialize event framework.
if (RayConfig::instance().event_log_reporter_enabled() && !log_dir.empty()) {
Expand All @@ -396,35 +439,20 @@ int main(int argc, char *argv[]) {
raylet->Start();
}));

auto shutted_down = std::make_shared<std::atomic<bool>>(false);

// Destroy the Raylet on a SIGTERM. The pointer to main_service is
// guaranteed to be valid since this function will run the event loop
// instead of returning immediately.
// We should stop the service and remove the local socket file.
auto handler = [&main_service, &raylet_socket_name, &raylet, &gcs_client, shutted_down](
const boost::system::error_code &error, int signal_number) {
// Make the shutdown handler idempotent since graceful shutdown can be triggered
// by many places.
if (*shutted_down) {
RAY_LOG(INFO) << "Raylet already received SIGTERM. It will ignore the request.";
return;
}
RAY_LOG(INFO) << "Raylet received SIGTERM, shutting down...";
*shutted_down = true;
raylet->Stop();
gcs_client->Disconnect();
ray::stats::Shutdown();
main_service.stop();
remove(raylet_socket_name.c_str());
auto signal_handler = [_shutdown_raylet_gracefully_sync](
const boost::system::error_code &error, int signal_number) {
ray::rpc::NodeDeathInfo node_death_info;
node_death_info.set_reason(ray::rpc::NodeDeathInfo::EXPECTED_TERMINATION);
node_death_info.set_reason_message("Received SIGTERM");
_shutdown_raylet_gracefully_sync(node_death_info);
};
boost::asio::signal_set signals(main_service);
#ifdef _WIN32
signals.add(SIGBREAK);
#else
signals.add(SIGTERM);
#endif
signals.async_wait(handler);
signals.async_wait(signal_handler);

main_service.run();
}
Expand Down
Loading
Loading