Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][gcs] Fix task events profile events per task leak #42248

Merged
merged 2 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/ray/gcs/gcs_server/gcs_task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,20 @@ void GcsTaskManager::GcsTaskManagerStorage::UpdateExistingTaskAttempt(
// Update the task event.
existing_task.MergeFrom(task_events);

// Truncate the profile events if needed.
auto max_num_profile_events_per_task =
RayConfig::instance().task_events_max_num_profile_events_per_task();
if (existing_task.profile_events().events_size() > max_num_profile_events_per_task) {
auto to_drop =
existing_task.profile_events().events_size() - max_num_profile_events_per_task;
existing_task.mutable_profile_events()->mutable_events()->DeleteSubrange(0, to_drop);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if to_drop is negative, it will be noop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will not enter the if clause in this case.


// Update the tracking per job
auto job_id = JobID::FromBinary(existing_task.job_id());
job_task_summary_[job_id].RecordProfileEventsDropped(to_drop);
stats_counter_.Increment(kTotalNumProfileTaskEventsDropped, to_drop);
}

// Move the task events around different gc priority list.
auto target_list_index = gc_policy_->GetTaskListPriority(existing_task);
auto cur_list_index = loc->GetCurrentListIndex();
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/gcs_server/gcs_task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ class GcsTaskManager : public rpc::TaskInfoHandler {
FRIEND_TEST(GcsTaskManagerTest, TestTaskDataLossWorker);
FRIEND_TEST(GcsTaskManagerTest, TestMultipleJobsDataLoss);
FRIEND_TEST(GcsTaskManagerDroppedTaskAttemptsLimit, TestDroppedTaskAttemptsLimit);
FRIEND_TEST(GcsTaskManagerProfileEventsLimitTest, TestProfileEventsNoLeak);
};

} // namespace gcs
Expand Down
41 changes: 41 additions & 0 deletions src/ray/gcs/gcs_server/test/gcs_task_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,18 @@ class GcsTaskManagerMemoryLimitedTest : public GcsTaskManagerTest {
}
};

class GcsTaskManagerProfileEventsLimitTest : public GcsTaskManagerTest {
public:
GcsTaskManagerProfileEventsLimitTest() : GcsTaskManagerTest() {
RayConfig::instance().initialize(
R"(
{
"task_events_max_num_profile_events_per_task": 10
}
)");
}
};

class GcsTaskManagerDroppedTaskAttemptsLimit : public GcsTaskManagerTest {
public:
GcsTaskManagerDroppedTaskAttemptsLimit() : GcsTaskManagerTest() {
Expand Down Expand Up @@ -1025,6 +1037,35 @@ TEST_F(GcsTaskManagerDroppedTaskAttemptsLimit, TestDroppedTaskAttemptsLimit) {
EXPECT_EQ(job_summary.NumTaskAttemptsDropped(), 10);
}

TEST_F(GcsTaskManagerProfileEventsLimitTest, TestProfileEventsNoLeak) {
auto task = GenTaskIDs(1)[0];

// Keep generating profile events and make sure the number of profile events
// is bounded.
for (int i = 0; i < 100; i++) {
auto events = GenTaskEvents({task},
/* attempt_number */ 0,
/* job_id */ 0,
GenProfileEvents("event", 1, 1));
auto events_data = Mocker::GenTaskEventsData(events);
SyncAddTaskEventData(events_data);
}

// Assert on the profile events in the buffer.
{
auto reply = SyncGetTaskEvents({});
EXPECT_EQ(reply.events_by_task_size(), 1);
EXPECT_EQ(reply.events_by_task().begin()->profile_events().events().size(),
RayConfig::instance().task_events_max_num_profile_events_per_task());

// assert on the profile events dropped counter.
EXPECT_EQ(reply.num_profile_task_events_dropped(),
100 - RayConfig::instance().task_events_max_num_profile_events_per_task());
EXPECT_EQ(task_manager->GetTotalNumProfileTaskEventsDropped(),
100 - RayConfig::instance().task_events_max_num_profile_events_per_task());
}
}

TEST_F(GcsTaskManagerMemoryLimitedTest, TestLimitGcPriorityBased) {
size_t num_limit = 10; // sync with class config
// For the default gc policy, we evict tasks based (first to last):
Expand Down
Loading