Skip to content

Commit

Permalink
[Core] Fix worker process leaks after job finishes (#44214)
Browse files Browse the repository at this point in the history
Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
  • Loading branch information
jjyao committed May 11, 2024
1 parent a3d5874 commit 795c323
Show file tree
Hide file tree
Showing 23 changed files with 469 additions and 61 deletions.
2 changes: 1 addition & 1 deletion python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2718,7 +2718,7 @@ def put(
@PublicAPI
@client_mode_hook
def wait(
ray_waitables: Union["ObjectRef[R]", "ObjectRefGenerator[R]"],
ray_waitables: List[Union["ObjectRef[R]", "ObjectRefGenerator[R]"]],
*,
num_returns: int = 1,
timeout: Optional[float] = None,
Expand Down
5 changes: 5 additions & 0 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -4855,6 +4855,11 @@ cdef class CoreWorker:
return (CCoreWorkerProcess.GetCoreWorker().GetWorkerContext()
.CurrentActorMaxConcurrency())

def get_current_root_detached_actor_id(self) -> ActorID:
# This is only used in test
return ActorID(CCoreWorkerProcess.GetCoreWorker().GetWorkerContext()
.GetRootDetachedActorID().Binary())

def get_queued_future(self, task_id: Optional[TaskID]) -> ConcurrentFuture:
"""Get a asyncio.Future that's queued in the event loop."""
with self._task_id_to_future_lock:
Expand Down
1 change: 1 addition & 0 deletions python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ cdef extern from "ray/core_worker/context.h" nogil:
c_bool CurrentActorIsAsync()
const c_string &GetCurrentSerializedRuntimeEnv()
int CurrentActorMaxConcurrency()
const CActorID &GetRootDetachedActorID()

cdef extern from "ray/core_worker/generator_waiter.h" nogil:
cdef cppclass CGeneratorBackpressureWaiter "ray::core::GeneratorBackpressureWaiter": # noqa
Expand Down
94 changes: 94 additions & 0 deletions python/ray/tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
run_string_as_driver_nonblocking,
wait_for_condition,
format_web_url,
wait_for_pid_to_exit,
)
from ray.job_config import JobConfig
from ray.job_submission import JobSubmissionClient
Expand Down Expand Up @@ -296,6 +297,99 @@ def verify():
# TODO(sang): Client entrypoint not supported yet.


def test_task_spec_root_detached_actor_id(shutdown_only):
"""Test to make sure root detached actor id is set correctly
for task spec of submitted task or actor.
"""

ray.init()

@ray.remote
def get_task_root_detached_actor_id():
core_worker = ray._private.worker.global_worker.core_worker
return core_worker.get_current_root_detached_actor_id().hex()

@ray.remote
class Actor:
def get_root_detached_actor_id(self):
core_worker = ray._private.worker.global_worker.core_worker
return core_worker.get_current_root_detached_actor_id().hex()

@ray.remote(lifetime="detached")
class DetachedActor:
def check(self):
core_worker = ray._private.worker.global_worker.core_worker
assert (
ray.get_runtime_context().get_actor_id()
== core_worker.get_current_root_detached_actor_id().hex()
)
assert ray.get_runtime_context().get_actor_id() == ray.get(
get_task_root_detached_actor_id.remote()
)
actor = Actor.remote()
assert ray.get_runtime_context().get_actor_id() == ray.get(
actor.get_root_detached_actor_id.remote()
)

assert (
ray.get(get_task_root_detached_actor_id.remote())
== ray._raylet.ActorID.nil().hex()
)
actor = Actor.remote()
assert (
ray.get(actor.get_root_detached_actor_id.remote())
== ray._raylet.ActorID.nil().hex()
)
detached_actor = DetachedActor.remote()
ray.get(detached_actor.check.remote())


def test_no_process_leak_after_job_finishes(ray_start_cluster):
"""Test to make sure when a job finishes,
all the worker processes belonging to it exit.
"""
cluster = ray_start_cluster
cluster.add_node(num_cpus=8)
ray.init(address=cluster.address)

@ray.remote(num_cpus=0)
class PidActor:
def __init__(self):
self.pids = set()
self.pids.add(os.getpid())

def add_pid(self, pid):
self.pids.add(pid)

def get_pids(self):
return self.pids

@ray.remote
def child(pid_actor):
# child worker process should be forcibly killed
# when the job finishes.
ray.get(pid_actor.add_pid.remote(os.getpid()))
time.sleep(1000000)

@ray.remote
def parent(pid_actor):
ray.get(pid_actor.add_pid.remote(os.getpid()))
child.remote(pid_actor)

pid_actor = PidActor.remote()
ray.get(parent.remote(pid_actor))

wait_for_condition(lambda: len(ray.get(pid_actor.get_pids.remote())) == 3)

pids = ray.get(pid_actor.get_pids.remote())

ray.shutdown()
# Job finishes at this point

for pid in pids:
wait_for_pid_to_exit(pid)


if __name__ == "__main__":

# Make subprocess happy in bazel.
Expand Down
14 changes: 14 additions & 0 deletions python/ray/workflow/tests/test_workflow_queuing.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import os
import pytest
import ray
from ray import workflow
from ray._private.test_utils import wait_for_condition
from ray.tests.conftest import * # noqa


Expand Down Expand Up @@ -146,6 +148,8 @@ def test_workflow_queuing_resume_all(shutdown_only, tmp_path):

@ray.remote
def long_running(x):
file_path = str(tmp_path / f".long_running_{x}")
open(file_path, "w")
with filelock.FileLock(lock_path):
return x

Expand All @@ -156,6 +160,16 @@ def long_running(x):
workflow.run_async(wfs[i], workflow_id=f"workflow_{i}") for i in range(4)
]

# Make sure workflow_0 and workflow_1 are running user code
# Otherwise it might run workflow code that contains
# ray.get() when ray.shutdown()
# is called and that can cause ray.get() to throw exception
# since raylet is stopped
# before worker process (this is a bug we should fix)
# and transition the workflow to FAILED status.
wait_for_condition(lambda: os.path.isfile(str(tmp_path / ".long_running_0")))
wait_for_condition(lambda: os.path.isfile(str(tmp_path / ".long_running_1")))

assert sorted(x[0] for x in workflow.list_all({workflow.RUNNING})) == [
"workflow_0",
"workflow_1",
Expand Down
1 change: 1 addition & 0 deletions src/mock/ray/raylet/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class MockWorkerInterface : public WorkerInterface {
MOCK_METHOD(bool, IsRegistered, (), (override));
MOCK_METHOD(rpc::CoreWorkerClientInterface *, rpc_client, (), (override));
MOCK_METHOD(bool, SetJobId, (const JobID &job_id), (override));
MOCK_METHOD(const ActorID &, GetRootDetachedActorId, (), (override));
};

} // namespace raylet
Expand Down
3 changes: 0 additions & 3 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -862,9 +862,6 @@ RAY_CONFIG(int64_t,
RAY_CONFIG(bool, worker_core_dump_exclude_plasma_store, true)
RAY_CONFIG(bool, raylet_core_dump_exclude_plasma_store, true)

/// Whether to kill idle workers of a terminated job.
RAY_CONFIG(bool, kill_idle_workers_of_terminated_job, true)

// Instruct the Python default worker to preload the specified imports.
// This is specified as a comma-separated list.
// If left empty, no such attempt will be made.
Expand Down
17 changes: 14 additions & 3 deletions src/ray/common/task/task_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,13 @@ TaskID TaskSpecification::ParentTaskId() const {
return TaskID::FromBinary(message_->parent_task_id());
}

ActorID TaskSpecification::RootDetachedActorId() const {
if (message_->root_detached_actor_id().empty() /* e.g., empty proto default */) {
return ActorID::Nil();
}
return ActorID::FromBinary(message_->root_detached_actor_id());
}

TaskID TaskSpecification::SubmitterTaskId() const {
if (message_->submitter_task_id().empty() /* e.g., empty proto default */) {
return TaskID::Nil();
Expand Down Expand Up @@ -198,7 +205,8 @@ int TaskSpecification::GetRuntimeEnvHash() const {
WorkerCacheKey env = {SerializedRuntimeEnv(),
GetRequiredResources().GetResourceMap(),
IsActorCreationTask(),
GetRequiredResources().Get(scheduling::ResourceID::GPU()) > 0};
GetRequiredResources().Get(scheduling::ResourceID::GPU()) > 0,
!(RootDetachedActorId().IsNil())};
return env.IntHash();
}

Expand Down Expand Up @@ -594,13 +602,15 @@ WorkerCacheKey::WorkerCacheKey(
const std::string serialized_runtime_env,
const absl::flat_hash_map<std::string, double> &required_resources,
bool is_actor,
bool is_gpu)
bool is_gpu,
bool is_root_detached_actor)
: serialized_runtime_env(serialized_runtime_env),
required_resources(RayConfig::instance().worker_resource_limits_enabled()
? required_resources
: absl::flat_hash_map<std::string, double>{}),
is_actor(is_actor && RayConfig::instance().isolate_workers_across_task_types()),
is_gpu(is_gpu && RayConfig::instance().isolate_workers_across_resource_types()),
is_root_detached_actor(is_root_detached_actor),
hash_(CalculateHash()) {}

std::size_t WorkerCacheKey::CalculateHash() const {
Expand All @@ -617,6 +627,7 @@ std::size_t WorkerCacheKey::CalculateHash() const {
boost::hash_combine(hash, serialized_runtime_env);
boost::hash_combine(hash, is_actor);
boost::hash_combine(hash, is_gpu);
boost::hash_combine(hash, is_root_detached_actor);

std::vector<std::pair<std::string, double>> resource_vars(required_resources.begin(),
required_resources.end());
Expand All @@ -637,7 +648,7 @@ bool WorkerCacheKey::operator==(const WorkerCacheKey &k) const {

bool WorkerCacheKey::EnvIsEmpty() const {
return IsRuntimeEnvEmpty(serialized_runtime_env) && required_resources.empty() &&
!is_gpu;
!is_gpu && !is_root_detached_actor;
}

std::size_t WorkerCacheKey::Hash() const { return hash_; }
Expand Down
14 changes: 12 additions & 2 deletions src/ray/common/task/task_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {

TaskID ParentTaskId() const;

ActorID RootDetachedActorId() const;

TaskID SubmitterTaskId() const;

size_t ParentCounter() const;
Expand Down Expand Up @@ -527,12 +529,17 @@ class WorkerCacheKey {
/// worker. \param required_resources The required resouce.
/// worker. \param is_actor Whether the worker will be an actor. This is set when
/// task type isolation between workers is enabled.
/// worker. \param iis_gpu Whether the worker will be using GPUs. This is set when
/// worker. \param is_gpu Whether the worker will be using GPUs. This is set when
/// resource type isolation between workers is enabled.
/// worker. \param is_root_detached_actor Whether the worker will be running
/// tasks or actors whose root ancestor is a detached actor. This is set
/// to prevent worker reuse between tasks whose root is the driver process
/// and tasks whose root is a detached actor.
WorkerCacheKey(const std::string serialized_runtime_env,
const absl::flat_hash_map<std::string, double> &required_resources,
bool is_actor,
bool is_gpu);
bool is_gpu,
bool is_root_detached_actor);

bool operator==(const WorkerCacheKey &k) const;

Expand Down Expand Up @@ -564,6 +571,9 @@ class WorkerCacheKey {
const bool is_actor;
/// Whether the worker is to use a GPU.
const bool is_gpu;
/// Whether the worker is to run tasks or actors
/// whose root is a detached actor.
const bool is_root_detached_actor;
/// The hash of the worker's environment. This is set to 0
/// for unspecified or empty environments.
const std::size_t hash_ = 0;
Expand Down
12 changes: 10 additions & 2 deletions src/ray/common/task/task_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,16 @@ class TaskSpecBuilder {
int max_retries,
bool retry_exceptions,
const std::string &serialized_retry_exception_allowlist,
const rpc::SchedulingStrategy &scheduling_strategy) {
const rpc::SchedulingStrategy &scheduling_strategy,
const ActorID root_detached_actor_id) {
message_->set_max_retries(max_retries);
message_->set_retry_exceptions(retry_exceptions);
message_->set_serialized_retry_exception_allowlist(
serialized_retry_exception_allowlist);
message_->mutable_scheduling_strategy()->CopyFrom(scheduling_strategy);
if (!root_detached_actor_id.IsNil()) {
message_->set_root_detached_actor_id(root_detached_actor_id.Binary());
}
return *this;
}

Expand Down Expand Up @@ -230,7 +234,8 @@ class TaskSpecBuilder {
bool is_asyncio = false,
const std::vector<ConcurrencyGroup> &concurrency_groups = {},
const std::string &extension_data = "",
bool execute_out_of_order = false) {
bool execute_out_of_order = false,
ActorID root_detached_actor_id = ActorID::Nil()) {
message_->set_type(TaskType::ACTOR_CREATION_TASK);
auto actor_creation_spec = message_->mutable_actor_creation_task_spec();
actor_creation_spec->set_actor_id(actor_id.Binary());
Expand Down Expand Up @@ -258,6 +263,9 @@ class TaskSpecBuilder {
}
actor_creation_spec->set_execute_out_of_order(execute_out_of_order);
message_->mutable_scheduling_strategy()->CopyFrom(scheduling_strategy);
if (!root_detached_actor_id.IsNil()) {
message_->set_root_detached_actor_id(root_detached_actor_id.Binary());
}
return *this;
}

Expand Down
Loading

0 comments on commit 795c323

Please sign in to comment.