Skip to content

Commit

Permalink
Enable distributed ref counting by default (#7628)
Browse files Browse the repository at this point in the history
* enable

* Turn on eager eviction

* Shorten tests and drain ReferenceCounter

* Don't force kill actor handles that have gone out of scope, lint

* Fix locks

* Cleanup Plasma Async Callback (#7452)

* [rllib][tune] fix some nans (#7611)

* Change /tmp to platform-specific temporary directory (#7529)

* [Serve] UI Improvements (#7569)

* bugfix about test_dynres.py (#7615)

Co-authored-by: senlin.zsl <senlin.zsl@antfin.com>

* Java call Python actor method use actor.call (#7614)

* bug fix about useage of absl::flat_hash_map::erase and absl::flat_hash_set::erase (#7633)

Co-authored-by: senlin.zsl <senlin.zsl@antfin.com>

* [Java] Make both `RayActor` and `RayPyActor` inheriting from `BaseActor` (#7462)

* [Java] Fix the issue that the cached value in `RayObject` is serialized (#7613)

* Add failure tests to test_reference_counting (#7400)

* Fix typo in asyncio documentation (#7602)

* Fix segfault

* debug

* Force kill actor

* Fix test
  • Loading branch information
stephanie-wang committed Mar 19, 2020
1 parent fca9dc7 commit b499100
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 24 deletions.
2 changes: 2 additions & 0 deletions python/ray/tests/conftest.py
Expand Up @@ -22,6 +22,8 @@ def get_default_fixure_internal_config():
internal_config = json.dumps({
"initial_reconstruction_timeout_milliseconds": 200,
"num_heartbeats_timeout": 10,
"object_store_full_max_retries": 3,
"object_store_full_initial_delay_ms": 100,
})
return internal_config

Expand Down
1 change: 1 addition & 0 deletions python/ray/tests/test_global_gc.py
Expand Up @@ -112,6 +112,7 @@ def check_refs_gced():

# Remote workers.
actors = [GarbageHolder.remote() for _ in range(2)]
assert all(ray.get([a.has_garbage.remote() for a in actors]))

# GC should be triggered for all workers, including the local driver,
# when a remote task tries to put a return value that doesn't fit in
Expand Down
11 changes: 1 addition & 10 deletions python/ray/tests/test_reference_counting.py
Expand Up @@ -18,16 +18,7 @@

@pytest.fixture
def one_worker_100MiB(request):
config = json.dumps({
"distributed_ref_counting_enabled": 1,
"task_retry_delay_ms": 0,
"object_store_full_max_retries": 3,
"object_store_full_initial_delay_ms": 100,
})
yield ray.init(
num_cpus=1,
object_store_memory=100 * 1024 * 1024,
_internal_config=config)
yield ray.init(num_cpus=1, object_store_memory=100 * 1024 * 1024)
ray.shutdown()


Expand Down
4 changes: 2 additions & 2 deletions src/ray/common/ray_config_def.h
Expand Up @@ -72,7 +72,7 @@ RAY_CONFIG(bool, object_pinning_enabled, true)
/// cluster and all objects that contain it are also out of scope. If this flag
/// is off and object_pinning_enabled is turned on, then an object will not be
/// LRU evicted until it is out of scope on the CREATOR of the ObjectID.
RAY_CONFIG(bool, distributed_ref_counting_enabled, false)
RAY_CONFIG(bool, distributed_ref_counting_enabled, true)

/// Whether to record the creation sites of object references. This adds more
/// information to `ray memstat`, but introduces a little extra overhead when
Expand All @@ -91,7 +91,7 @@ RAY_CONFIG(bool, record_ref_creation_sites, true)
/// NOTE(swang): The timer is checked by the raylet during every heartbeat, so
/// this should be set to a value larger than
/// raylet_heartbeat_timeout_milliseconds.
RAY_CONFIG(int64_t, free_objects_period_milliseconds, -1)
RAY_CONFIG(int64_t, free_objects_period_milliseconds, 1000)

/// If object_pinning_enabled is on, then objects that have been unpinned are
/// added to a local cache. When the cache is flushed, all objects in the cache
Expand Down
3 changes: 3 additions & 0 deletions src/ray/core_worker/core_worker.cc
Expand Up @@ -288,6 +288,9 @@ void CoreWorker::Disconnect() {
}

void CoreWorker::Exit(bool intentional) {
RAY_LOG(INFO)
<< "Exit signal " << (intentional ? "(intentional)" : "")
<< " received, this process will exit after all outstanding tasks have finished";
exiting_ = true;
// Release the resources early in case draining takes a long time.
RAY_CHECK_OK(local_raylet_client_->NotifyDirectCallTaskBlocked());
Expand Down
21 changes: 21 additions & 0 deletions src/ray/core_worker/reference_count.cc
Expand Up @@ -29,6 +29,26 @@ namespace {} // namespace

namespace ray {

void ReferenceCounter::DrainAndShutdown(std::function<void()> shutdown) {
absl::MutexLock lock(&mutex_);
if (object_id_refs_.empty()) {
shutdown();
} else {
RAY_LOG(WARNING)
<< "This worker is still managing " << object_id_refs_.size()
<< " objects, waiting for them to go out of scope before shutting down.";
}
shutdown_hook_ = shutdown;
}

void ReferenceCounter::ShutdownIfNeeded() {
if (shutdown_hook_ && object_id_refs_.empty()) {
RAY_LOG(WARNING)
<< "All object references have gone out of scope, shutting down worker.";
shutdown_hook_();
}
}

ReferenceCounter::ReferenceTable ReferenceCounter::ReferenceTableFromProto(
const ReferenceTableProto &proto) {
ReferenceTable refs;
Expand Down Expand Up @@ -342,6 +362,7 @@ void ReferenceCounter::DeleteReferenceInternal(ReferenceTable::iterator it,
if (should_delete_reference) {
RAY_LOG(DEBUG) << "Deleting Reference to object " << id;
object_id_refs_.erase(it);
ShutdownIfNeeded();
}
}

Expand Down
13 changes: 13 additions & 0 deletions src/ray/core_worker/reference_count.h
Expand Up @@ -46,6 +46,11 @@ class ReferenceCounter {

~ReferenceCounter() {}

/// Wait for all object references to go out of scope, and then shutdown.
///
/// \param shutdown The shutdown callback to call.
void DrainAndShutdown(std::function<void()> shutdown);

/// Increase the reference count for the ObjectID by one. If there is no
/// entry for the ObjectID, one will be created. The object ID will not have
/// any owner information, since we don't know how it was created.
Expand Down Expand Up @@ -365,6 +370,10 @@ class ReferenceCounter {

using ReferenceTable = absl::flat_hash_map<ObjectID, Reference>;

/// Shutdown if all references have gone out of scope and shutdown
/// is scheduled.
void ShutdownIfNeeded() EXCLUSIVE_LOCKS_REQUIRED(mutex_);

/// Deserialize a ReferenceTable.
static ReferenceTable ReferenceTableFromProto(const ReferenceTableProto &proto);

Expand Down Expand Up @@ -487,6 +496,10 @@ class ReferenceCounter {

/// Holds all reference counts and dependency information for tracked ObjectIDs.
ReferenceTable object_id_refs_ GUARDED_BY(mutex_);

/// Optional shutdown hook to call when all references have gone
/// out of scope.
std::function<void()> shutdown_hook_ GUARDED_BY(mutex_) = nullptr;
};

} // namespace ray
Expand Down
39 changes: 27 additions & 12 deletions src/ray/core_worker/task_manager.cc
Expand Up @@ -74,15 +74,22 @@ void TaskManager::AddPendingTask(const TaskID &caller_id,
}

void TaskManager::DrainAndShutdown(std::function<void()> shutdown) {
absl::MutexLock lock(&mu_);
if (pending_tasks_.empty()) {
shutdown();
} else {
RAY_LOG(WARNING)
<< "This worker is still managing " << pending_tasks_.size()
<< " in flight tasks, waiting for them to finish before shutting down.";
bool has_pending_tasks = false;
{
absl::MutexLock lock(&mu_);
if (!pending_tasks_.empty()) {
has_pending_tasks = true;
RAY_LOG(WARNING)
<< "This worker is still managing " << pending_tasks_.size()
<< " in flight tasks, waiting for them to finish before shutting down.";
shutdown_hook_ = shutdown;
}
}

// Do not hold the lock when calling into the reference counter.
if (!has_pending_tasks) {
reference_counter_->DrainAndShutdown(shutdown);
}
shutdown_hook_ = shutdown;
}

bool TaskManager::IsTaskPending(const TaskID &task_id) const {
Expand Down Expand Up @@ -201,10 +208,18 @@ void TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_
}

void TaskManager::ShutdownIfNeeded() {
absl::MutexLock lock(&mu_);
if (shutdown_hook_ && pending_tasks_.empty()) {
RAY_LOG(WARNING) << "All in flight tasks finished, shutting down worker.";
shutdown_hook_();
std::function<void()> shutdown_hook = nullptr;
{
absl::MutexLock lock(&mu_);
if (shutdown_hook_ && pending_tasks_.empty()) {
RAY_LOG(WARNING) << "All in flight tasks finished, worker will shut down after "
"draining references.";
std::swap(shutdown_hook_, shutdown_hook);
}
}
// Do not hold the lock when calling into the reference counter.
if (shutdown_hook != nullptr) {
reference_counter_->DrainAndShutdown(shutdown_hook);
}
}

Expand Down

0 comments on commit b499100

Please sign in to comment.