Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GCS FT] Mark job as finished for dead node #40431

Merged
merged 11 commits into from
Oct 27, 2023
57 changes: 57 additions & 0 deletions python/ray/tests/test_gcs_fault_tolerance.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
wait_for_pid_to_exit,
run_string_as_driver,
)
from ray.job_submission import JobSubmissionClient

import psutil

Expand Down Expand Up @@ -975,6 +976,62 @@ def test_redis_logs(external_redis):
)


@pytest.mark.parametrize(
"ray_start_regular_with_external_redis",
[
generate_system_config_map(
gcs_failover_worker_reconnect_timeout=20,
gcs_rpc_server_reconnect_timeout_s=60,
)
],
indirect=True,
)
def test_job_finished_after_gcs_server_restart(ray_start_regular_with_external_redis):
gcs_address = ray._private.worker.global_worker.gcs_client.address
client = JobSubmissionClient(gcs_address)

# submit job
job_id = client.submit_job(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set the gcs_rpc_server_reconnect_timeout_s = ?
submit a long running job with 1 head node
cluster.remove_node(head)
Wait until driver pid is gone
Restart head node, cluster.add_node()
make sure driver is dead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to this flow, but still failing when trying to check if job is marked as FAILED after raylet killed, so might need help on that

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think instead of head_node.kill_raylet, we can kill the entire head node by cluster.remote_node(head_node) and then restart the head node by cluster.add_node()

entrypoint="python -c 'import ray; ray.init(); print(ray.cluster_resources());'"
)
# restart the gcs server
ray._private.worker._global_node.kill_gcs_server()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Killing gcs won't mark the node as dead, is this what we want to test?

ray._private.worker._global_node.start_gcs_server()

gcs_address = ray._private.worker.global_worker.gcs_client.address

async def async_get_all_job_info():
gcs_aio_client = gcs_utils.GcsAioClient(
address=gcs_address, nums_reconnect_retry=20
)
return await gcs_aio_client.get_all_job_info()

def get_job_info(job_id):
import asyncio

asyncio.set_event_loop(asyncio.new_event_loop())
all_job_info = get_or_create_event_loop().run_until_complete(
async_get_all_job_info()
)
return list(
filter(
lambda job_info: "job_submission_id" in job_info.config.metadata
and job_info.config.metadata["job_submission_id"] == job_id,
list(all_job_info.values()),
)
)

# verify if job is finished, which marked is_dead
def _check_job_is_dead(job_id: str) -> bool:
job_infos = get_job_info(job_id)
if len(job_infos) == 0:
return False
job_info = job_infos[0]
return job_info.is_dead

wait_for_condition(_check_job_is_dead, job_id=job_id, timeout=20)


if __name__ == "__main__":

import pytest
Expand Down
30 changes: 29 additions & 1 deletion src/ray/gcs/gcs_server/gcs_job_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,16 @@ void GcsJobManager::HandleGetAllJobInfo(rpc::GetAllJobInfoRequest request,
auto client = core_worker_clients_.GetOrConnect(data.second.driver_address());
std::unique_ptr<rpc::NumPendingTasksRequest> request(
new rpc::NumPendingTasksRequest());
RAY_LOG(DEBUG) << "SendNumPendingTask: "
jonathan-anyscale marked this conversation as resolved.
Show resolved Hide resolved
<< WorkerID::FromBinary(data.second.driver_address().worker_id());
client->NumPendingTasks(
std::move(request),
[reply, i, num_processed_jobs, try_send_reply](
[data, reply, i, num_processed_jobs, try_send_reply](
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of capturing the entire data, let's just capture worker_id

const Status &status,
const rpc::NumPendingTasksReply &num_pending_tasks_reply) {
RAY_LOG(DEBUG) << "SendNumPendingTaskReply: "
<< WorkerID::FromBinary(
data.second.driver_address().worker_id());
if (!status.ok()) {
RAY_LOG(WARNING) << "Failed to get is_running_tasks from core worker: "
<< status.ToString();
Expand Down Expand Up @@ -297,5 +302,28 @@ std::shared_ptr<rpc::JobConfig> GcsJobManager::GetJobConfig(const JobID &job_id)
return it->second;
}

void GcsJobManager::OnNodeDead(const NodeID &node_id) {
RAY_LOG(INFO) << "Node " << node_id
jonathan-anyscale marked this conversation as resolved.
Show resolved Hide resolved
<< " failed, mark all jobs from this node as finished";

auto on_done = [this, node_id](const absl::flat_hash_map<JobID, JobTableData> &result) {
// If job is not dead and from driver in current node, then mark it as finished
for (auto &data : result) {
if (!data.second.is_dead() &&
NodeID::FromBinary(data.second.driver_address().raylet_id()) == node_id) {
RAY_LOG(DEBUG) << "Marking job: " << data.first << " as finished";
MarkJobAsFinished(data.second, [data](Status status) {
if (!status.ok()) {
RAY_LOG(WARNING) << "Failed to mark job as finished";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add status to logs

<< "Failed to mark job as finished. Status: " << status

}
});
}
}
};

// make all jobs in current node to finished
RAY_CHECK_OK(gcs_table_storage_->JobTable().GetAll(on_done));
}

} // namespace gcs
} // namespace ray
6 changes: 6 additions & 0 deletions src/ray/gcs/gcs_server/gcs_job_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ class GcsJobManager : public rpc::JobInfoHandler {

std::shared_ptr<rpc::JobConfig> GetJobConfig(const JobID &job_id) const;

/// Handle a node death. This will marks all jobs associated with the
/// specified node id as finished.
///
/// \param node_id The specified node id.
void OnNodeDead(const NodeID &node_id);

private:
std::shared_ptr<GcsTableStorage> gcs_table_storage_;
std::shared_ptr<GcsPublisher> gcs_publisher_;
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,7 @@ void GcsServer::InstallEventListeners() {
gcs_resource_manager_->OnNodeDead(node_id);
gcs_placement_group_manager_->OnNodeDead(node_id);
gcs_actor_manager_->OnNodeDead(node_id, node_ip_address);
gcs_job_manager_->OnNodeDead(node_id);
raylet_client_pool_->Disconnect(node_id);
gcs_healthcheck_manager_->RemoveNode(node_id);
pubsub_handler_->RemoveSubscriberFrom(node_id.Binary());
Expand Down
84 changes: 84 additions & 0 deletions src/ray/gcs/gcs_server/test/gcs_job_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,90 @@ TEST_F(GcsJobManagerTest, TestPreserveDriverInfo) {
ASSERT_EQ(data.driver_pid(), 8264);
}

TEST_F(GcsJobManagerTest, TestNodeFailure) {
gcs::GcsJobManager gcs_job_manager(gcs_table_storage_,
gcs_publisher_,
runtime_env_manager_,
*function_manager_,
*fake_kv_,
client_factory_);

auto job_id1 = JobID::FromInt(1);
auto job_id2 = JobID::FromInt(2);
gcs::GcsInitData gcs_init_data(gcs_table_storage_);
gcs_job_manager.Initialize(/*init_data=*/gcs_init_data);

rpc::AddJobReply empty_reply;
std::promise<bool> promise1;
std::promise<bool> promise2;

auto add_job_request1 = Mocker::GenAddJobRequest(job_id1, "namespace_1");
gcs_job_manager.HandleAddJob(
*add_job_request1,
&empty_reply,
[&promise1](Status, std::function<void()>, std::function<void()>) {
promise1.set_value(true);
});
promise1.get_future().get();

auto add_job_request2 = Mocker::GenAddJobRequest(job_id2, "namespace_2");
gcs_job_manager.HandleAddJob(
*add_job_request2,
&empty_reply,
[&promise2](Status, std::function<void()>, std::function<void()>) {
promise2.set_value(true);
});
promise2.get_future().get();

rpc::GetAllJobInfoRequest all_job_info_request;
rpc::GetAllJobInfoReply all_job_info_reply;
std::promise<bool> all_job_info_promise;

// Check if all job are not dead
gcs_job_manager.HandleGetAllJobInfo(
all_job_info_request,
&all_job_info_reply,
[&all_job_info_promise](Status, std::function<void()>, std::function<void()>) {
all_job_info_promise.set_value(true);
});
all_job_info_promise.get_future().get();
for (auto job_info : all_job_info_reply.job_info_list()) {
ASSERT_TRUE(!job_info.is_dead());
}

// Remove node and then check that the job is dead.
auto address = all_job_info_reply.job_info_list().Get(0).driver_address();
auto node_id = NodeID::FromBinary(address.raylet_id());
gcs_job_manager.OnNodeDead(node_id);

// Test get all jobs with limit larger than the number of jobs.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this comment mean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not suppose to be there, will remove

auto condition = [&gcs_job_manager, node_id]() -> bool {
rpc::GetAllJobInfoRequest all_job_info_request2;
rpc::GetAllJobInfoReply all_job_info_reply2;
std::promise<bool> all_job_info_promise2;
gcs_job_manager.HandleGetAllJobInfo(
all_job_info_request2,
&all_job_info_reply2,
[&all_job_info_promise2](Status, std::function<void()>, std::function<void()>) {
all_job_info_promise2.set_value(true);
});
all_job_info_promise2.get_future().get();

auto job_info1 = all_job_info_reply2.job_info_list().Get(0);
auto job_info2 = all_job_info_reply2.job_info_list().Get(1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two variables are not used?


bool job_condition = true;
// job1 from the current node should dead, while job2 is still alive
for (auto job_info : all_job_info_reply2.job_info_list()) {
auto job_node_id = NodeID::FromBinary(job_info.driver_address().raylet_id());
jonathan-anyscale marked this conversation as resolved.
Show resolved Hide resolved
job_condition = job_condition && (job_info.is_dead() == (job_node_id == node_id));
}
return job_condition;
};

EXPECT_TRUE(WaitForCondition(condition, 2000));
}

int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
Expand Down
Loading