Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Periodically GC metadata for streaming generators #43772

Merged
merged 12 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ class ObjectRefGenerator:
# NOTE: This can be called multiple times
# because python doesn't guarantee __del__ is called
# only once.
self.worker.core_worker.delete_object_ref_stream(self._generator_ref)
self.worker.core_worker.async_delete_object_ref_stream(self._generator_ref)

def __getstate__(self):
raise TypeError(
Expand Down Expand Up @@ -4930,12 +4930,12 @@ cdef class CoreWorker:
make_optional[ObjectIDIndexType](
<int>1 + <int>return_size + <int>generator_index))

def delete_object_ref_stream(self, ObjectRef generator_id):
def async_delete_object_ref_stream(self, ObjectRef generator_id):
cdef:
CObjectID c_generator_id = generator_id.native()

with nogil:
CCoreWorkerProcess.GetCoreWorker().DelObjectRefStream(c_generator_id)
CCoreWorkerProcess.GetCoreWorker().AsyncDelObjectRefStream(c_generator_id)

def try_read_next_object_ref_stream(self, ObjectRef generator_id):
cdef:
Expand All @@ -4960,7 +4960,7 @@ cdef class CoreWorker:
c_bool finished

with nogil:
finished = CCoreWorkerProcess.GetCoreWorker().IsFinished(
finished = CCoreWorkerProcess.GetCoreWorker().StreamingGeneratorIsFinished(
c_generator_id)
return finished

Expand Down
4 changes: 2 additions & 2 deletions python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,11 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
const CObjectID& return_id,
shared_ptr[CRayObject] *return_object,
const CObjectID& generator_id)
void DelObjectRefStream(const CObjectID &generator_id)
void AsyncDelObjectRefStream(const CObjectID &generator_id)
CRayStatus TryReadObjectRefStream(
const CObjectID &generator_id,
CObjectReference *object_ref_out)
c_bool IsFinished(const CObjectID &generator_id) const
c_bool StreamingGeneratorIsFinished(const CObjectID &generator_id) const
pair[CObjectReference, c_bool] PeekObjectRefStream(
const CObjectID &generator_id)
CObjectID AllocateDynamicReturnId(
Expand Down
27 changes: 21 additions & 6 deletions python/ray/tests/test_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,24 @@
)
from ray._private.client_mode_hook import enable_client_mode
from ray.tests.conftest import call_ray_start_context
from ray._private.test_utils import (
wait_for_condition,
)


def assert_no_leak():
gc.collect()
core_worker = ray._private.worker.global_worker.core_worker
ref_counts = core_worker.get_all_reference_counts()
for rc in ref_counts.values():
assert rc["local"] == 0
assert rc["submitted"] == 0
def check():
gc.collect()
core_worker = ray._private.worker.global_worker.core_worker
ref_counts = core_worker.get_all_reference_counts()
for k, rc in ref_counts.items():
if rc["local"] != 0:
return False
if rc["submitted"] != 0:
return False
return True

wait_for_condition(check)


@pytest.mark.skipif(
Expand Down Expand Up @@ -358,6 +367,7 @@ def test_dynamic_generator_reconstruction(ray_start_cluster, num_returns_type):
"task_retry_delay_ms": 100,
"object_timeout_milliseconds": 200,
"fetch_warn_timeout_milliseconds": 1000,
"local_gc_min_interval_s": 1,
}
cluster = ray_start_cluster
# Head node with no resources.
Expand Down Expand Up @@ -420,6 +430,7 @@ def test_dynamic_generator_reconstruction_nondeterministic(
"task_retry_delay_ms": 100,
"object_timeout_milliseconds": 200,
"fetch_warn_timeout_milliseconds": 1000,
"local_gc_min_interval_s": 1,
}
cluster = ray_start_cluster
# Head node with no resources.
Expand Down Expand Up @@ -509,6 +520,7 @@ def test_dynamic_generator_reconstruction_fails(ray_start_cluster, num_returns_t
"task_retry_delay_ms": 100,
"object_timeout_milliseconds": 200,
"fetch_warn_timeout_milliseconds": 1000,
"local_gc_min_interval_s": 1,
}
cluster = ray_start_cluster
cluster.add_node(
Expand Down Expand Up @@ -579,6 +591,7 @@ def test_dynamic_empty_generator_reconstruction_nondeterministic(
"task_retry_delay_ms": 100,
"object_timeout_milliseconds": 200,
"fetch_warn_timeout_milliseconds": 1000,
"local_gc_min_interval_s": 1,
}
cluster = ray_start_cluster
# Head node with no resources.
Expand Down Expand Up @@ -627,6 +640,8 @@ def check(empty_generator):
# We should never reconstruct an empty generator.
assert ray.get(exec_counter.get_count.remote()) == 1

print("gen", gen._generator_ref)
print("refs", refs)
del gen, refs, exec_counter
assert_no_leak()

Expand Down
4 changes: 2 additions & 2 deletions python/ray/tests/test_streaming_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def check_connected(self):
def mocked_worker():
mocked_core_worker = Mock()
mocked_core_worker.try_read_next_object_ref_stream.return_value = None
mocked_core_worker.delete_object_ref_stream.return_value = None
mocked_core_worker.async_delete_object_ref_stream.return_value = None
mocked_core_worker.create_object_ref_stream.return_value = None
mocked_core_worker.peek_object_ref_stream.return_value = [], []
worker = MockedWorker(mocked_core_worker)
Expand Down Expand Up @@ -94,7 +94,7 @@ def test_streaming_object_ref_generator_basic_unit(mocked_worker):
dumps(generator)

del generator
c.delete_object_ref_stream.assert_called()
c.async_delete_object_ref_stream.assert_called()


def test_streaming_object_ref_generator_task_failed_unit(mocked_worker):
Expand Down
74 changes: 74 additions & 0 deletions python/ray/tests/test_streaming_generator_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,80 @@ def verify_generator(actor, fail):
assert len(list_actors()) == 12


@pytest.mark.parametrize("delay", [True, False])
@pytest.mark.parametrize("actor_task", [True, False])
def test_reconstruction_generator_out_of_scope(
monkeypatch, ray_start_cluster, delay, actor_task
):
with monkeypatch.context() as m:
if delay:
m.setenv(
"RAY_testing_asio_delay_us",
"CoreWorkerService.grpc_server."
"ReportGeneratorItemReturns=10000:1000000",
)
cluster = ray_start_cluster
# Head node with no resources.
cluster.add_node(
num_cpus=0,
_system_config=RECONSTRUCTION_CONFIG,
enable_object_reconstruction=True,
)
ray.init(address=cluster.address)
# Node to place the initial object.
node_to_kill = cluster.add_node(
num_cpus=1, num_gpus=1, object_store_memory=10**8
)
cluster.wait_for_nodes()

@ray.remote(num_cpus=0, num_gpus=1, max_restarts=-1, max_task_retries=2)
class Actor:
def dynamic_generator(self, num_returns):
for i in range(num_returns):
print("YIELD", i)
yield np.ones(1_000_000, dtype=np.int8) * i

@ray.remote(num_returns="streaming", max_retries=2)
def dynamic_generator(num_returns):
for i in range(num_returns):
print("YIELD", i)
yield np.ones(1_000_000, dtype=np.int8) * i

@ray.remote
def dependent_task(x):
return x

@ray.remote
def fetch(x):
return x[0]

# Test recovery of all dynamic objects through re-execution.
if actor_task:
actor = Actor.remote()
gen = actor.dynamic_generator.options(num_returns="streaming").remote(1)
else:
gen = ray.get(dynamic_generator.remote(1))
refs = []

print("generator", gen)
for i in range(1):
ref = next(gen)
print("generator return", ref)
ref = dependent_task.remote(ref)
print("dependent ref", ref)
refs.append(ref)
del gen

for i, ref in enumerate(refs):
assert ray.get(fetch.remote(ref)) == i

cluster.remove_node(node_to_kill, allow_graceful=False)
node_to_kill = cluster.add_node(num_cpus=1, num_gpus=1, object_store_memory=10**8)

for i, ref in enumerate(refs):
assert ray.get(fetch.remote(ref)) == i


if __name__ == "__main__":
import os

Expand Down
38 changes: 34 additions & 4 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,11 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
RayConfig::instance().metrics_report_interval_ms() / 2,
"CoreWorker.RecordMetrics");

periodical_runner_.RunFnPeriodically(
[this] { TryDeleteObjectRefStreams(); },
RayConfig::instance().local_gc_min_interval_s() * 1000,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the wrong value? (it is already a second, but you multiply 1000).

Besides, do you think 10 seconds is good enough? Feel like it needs to be a little more frequent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's to convert to ms?

Should be fine because we trigger it once immediately now.

"CoreWorker.GCStreamingGeneratorMetadata");

#ifndef _WIN32
// Doing this last during CoreWorker initialization, so initialization logic like
// registering with Raylet can finish with higher priority.
Expand Down Expand Up @@ -2963,8 +2968,33 @@ Status CoreWorker::SealReturnObject(const ObjectID &return_id,
return status;
}

void CoreWorker::DelObjectRefStream(const ObjectID &generator_id) {
task_manager_->DelObjectRefStream(generator_id);
void CoreWorker::AsyncDelObjectRefStream(const ObjectID &generator_id) {
RAY_LOG(DEBUG) << "AsyncDelObjectRefStream " << generator_id;
deleted_generator_ids_.insert(generator_id);
}

void CoreWorker::TryDeleteObjectRefStreams() {
std::vector<ObjectID> out_of_scope_generator_ids;
for (auto it = deleted_generator_ids_.begin(); it != deleted_generator_ids_.end();
it++) {
const auto &generator_id = *it;
RAY_LOG(DEBUG) << "Try DelObjectRefStream " << generator_id;
int64_t num_objects_generated = 0;
if (!task_manager_->StreamingGeneratorIsFinished(generator_id,
&num_objects_generated)) {
continue;
stephanie-wang marked this conversation as resolved.
Show resolved Hide resolved
}

bool can_gc = reference_counter_->CheckGeneratorRefsOutOfScope(generator_id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove all unconsumed objects here (and add a test)? This seems like a regression (where unconsumed objects are not deleted if lineage is alive).

E.g., when delete hapapens, all unconsumed objects are deleted (same behavior as now). And only the stream metadata is cleaned up after a delay.

num_objects_generated);
if (can_gc && task_manager_->DelObjectRefStream(generator_id)) {
out_of_scope_generator_ids.push_back(generator_id);
}
}

for (const auto &generator_id : out_of_scope_generator_ids) {
deleted_generator_ids_.erase(generator_id);
}
}

Status CoreWorker::TryReadObjectRefStream(const ObjectID &generator_id,
Expand All @@ -2977,8 +3007,8 @@ Status CoreWorker::TryReadObjectRefStream(const ObjectID &generator_id,
return status;
}

bool CoreWorker::IsFinished(const ObjectID &generator_id) const {
return task_manager_->IsFinished(generator_id);
bool CoreWorker::StreamingGeneratorIsFinished(const ObjectID &generator_id) const {
return task_manager_->StreamingGeneratorIsFinished(generator_id);
}

std::pair<rpc::ObjectReference, bool> CoreWorker::PeekObjectRefStream(
Expand Down
20 changes: 12 additions & 8 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
rpc::ObjectReference *object_ref_out);

/// Return True if there's no more object to read. False otherwise.
bool IsFinished(const ObjectID &generator_id) const;
bool StreamingGeneratorIsFinished(const ObjectID &generator_id) const;

/// Read the next index of a ObjectRefStream of generator_id without
/// consuming an index.
Expand All @@ -400,16 +400,18 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// It should not be nil.
std::pair<rpc::ObjectReference, bool> PeekObjectRefStream(const ObjectID &generator_id);

/// Delete the ObjectRefStream that was
/// created upon the initial task
/// submission.
///
/// It is a pass-through method. See TaskManager::DelObjectRefStream
/// for details.
/// Asynchronously delete the ObjectRefStream that was created upon the
/// initial task submission. This method triggers a timer. On each interval,
/// we check whether the generator ref and all dynamic return refs have been
/// removed in the ref counter. If so, we remove the stream and task
/// metadata, because we know that the streaming task can never be
/// re-executed.
///
/// \param[in] generator_id The object ref id of the streaming
/// generator task.
void DelObjectRefStream(const ObjectID &generator_id);
void AsyncDelObjectRefStream(const ObjectID &generator_id);

void TryDeleteObjectRefStreams();

const PlacementGroupID &GetCurrentPlacementGroupId() const {
return worker_context_.GetCurrentPlacementGroupId();
Expand Down Expand Up @@ -1898,6 +1900,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {

/// Worker's PID
uint32_t pid_;

absl::flat_hash_set<ObjectID> deleted_generator_ids_;
};

// Lease request rate-limiter based on cluster node size.
Expand Down
20 changes: 20 additions & 0 deletions src/ray/core_worker/reference_count.cc
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,26 @@ void ReferenceCounter::OwnDynamicStreamingTaskReturnRef(const ObjectID &object_i
absl::optional<NodeID>()));
}

bool ReferenceCounter::CheckGeneratorRefsOutOfScope(const ObjectID &generator_id,
int64_t num_objects_generated) {
absl::MutexLock lock(&mutex_);
if (object_id_refs_.count(generator_id)) {
return false;
}

auto task_id = generator_id.TaskId();
for (int64_t i = 0; i < num_objects_generated; i++) {
// Add 2 because task returns start from index 1 and the
// first return object is the generator ID.
const auto return_id = ObjectID::FromIndex(task_id, i + 2);
stephanie-wang marked this conversation as resolved.
Show resolved Hide resolved
if (object_id_refs_.count(return_id)) {
return false;
}
}

return true;
}

bool ReferenceCounter::AddOwnedObjectInternal(
const ObjectID &object_id,
const std::vector<ObjectID> &inner_ids,
Expand Down
4 changes: 4 additions & 0 deletions src/ray/core_worker/reference_count.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,10 @@ class ReferenceCounter : public ReferenceCounterInterface,
const ObjectID &generator_id)
ABSL_LOCKS_EXCLUDED(mutex_);

bool CheckGeneratorRefsOutOfScope(const ObjectID &generator_id,
int64_t num_objects_generated)
ABSL_LOCKS_EXCLUDED(mutex_);

/// Update the size of the object.
///
/// \param[in] object_id The ID of the object.
Expand Down