Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Periodically GC metadata for streaming generators #43772

Merged
merged 12 commits into from Mar 8, 2024

Conversation

stephanie-wang
Copy link
Contributor

@stephanie-wang stephanie-wang commented Mar 7, 2024

Why are these changes needed?

Less invasive fix than #43584. Changes:

  • When generator goes out of scope, add it to a list of streaming generator tasks that we scan periodically
  • For each generator task, check if the task and streaming metadata can be removed. It can be removed if the generator task and all generated return refs have gone out of scope.
  • Fixes an existing potential leak where task completes after the generator ref and returned refs have gone out of scope, by deleting the task metadata with the stream metadata.

#43584 is probably better long-term as it refactors stream metadata to be GCed through the normal ref counting path, and lineage can be GCed eagerly. However, this fix is safer.

Related issue number

Closes #39151.

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
@@ -238,7 +238,7 @@ std::vector<rpc::ObjectReference> TaskManager::AddPendingTask(
// The language frontend is responsible for calling DeleteObjectRefStream.
if (spec.IsStreamingGenerator()) {
const auto generator_id = spec.ReturnId(0);
RAY_LOG(DEBUG) << "Create an object ref stream of an id " << generator_id;
RAY_LOG(INFO) << "Create an object ref stream of an id " << generator_id;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this debug -> info intended? we should revert?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it has been reverted.

@@ -410,7 +423,9 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
ABSL_LOCKS_EXCLUDED(mu_);

/// Return True if there's no more object to read. False otherwise.
bool IsFinished(const ObjectID &generator_id) const ABSL_LOCKS_EXCLUDED(mu_);
bool StreamingGeneratorIsFinished(const ObjectID &generator_id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add docstring for num_objects_generated?

src/ray/core_worker/reference_count.cc Show resolved Hide resolved
for (const auto &return_id_info : reply.streaming_generator_return_ids()) {
if (return_id_info.is_plasma_object()) {
// NOTE(swang): It is possible that the dynamically returned refs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a separate issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Existing issue that I added a fix for in this PR.

continue;
}

bool can_gc = reference_counter_->CheckGeneratorRefsOutOfScope(generator_id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove all unconsumed objects here (and add a test)? This seems like a regression (where unconsumed objects are not deleted if lineage is alive).

E.g., when delete hapapens, all unconsumed objects are deleted (same behavior as now). And only the stream metadata is cleaned up after a delay.

Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Copy link
Contributor

@rkooo567 rkooo567 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

assert_no_leak()

# Test that when the generator task stays in the in-scope lineage, we still
# clean up the unconsumed objects' values. The lineage (task and stream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice tests!

src/ray/core_worker/core_worker.cc Outdated Show resolved Hide resolved
src/ray/core_worker/core_worker.cc Outdated Show resolved Hide resolved
@@ -641,6 +641,11 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
RayConfig::instance().metrics_report_interval_ms() / 2,
"CoreWorker.RecordMetrics");

periodical_runner_.RunFnPeriodically(
[this] { TryDeleteObjectRefStreams(); },
RayConfig::instance().local_gc_min_interval_s() * 1000,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the wrong value? (it is already a second, but you multiply 1000).

Besides, do you think 10 seconds is good enough? Feel like it needs to be a little more frequent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's to convert to ms?

Should be fine because we trigger it once immediately now.

@@ -57,6 +57,38 @@ absl::flat_hash_set<ObjectID> ObjectRefStream::GetItemsUnconsumed() const {
return result;
}

std::vector<ObjectID> ObjectRefStream::PopUnconsumedItems() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it same as GetItemsUnconsumed? If so can we remove other APi?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method pops the items, not just get.

Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
@stephanie-wang stephanie-wang merged commit 9440cb0 into ray-project:master Mar 8, 2024
9 checks passed
@stephanie-wang stephanie-wang deleted the fix-39151-2 branch March 8, 2024 21:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Core][Streaming Generator] Lineage reconstruction is not working if generator ref is GC'ed.
3 participants