Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Fix worker process leaks after job finishes #44214

Merged
merged 21 commits into from
May 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2718,7 +2718,7 @@ def put(
@PublicAPI
@client_mode_hook
def wait(
ray_waitables: Union["ObjectRef[R]", "ObjectRefGenerator[R]"],
ray_waitables: List[Union["ObjectRef[R]", "ObjectRefGenerator[R]"]],
*,
num_returns: int = 1,
timeout: Optional[float] = None,
Expand Down
5 changes: 5 additions & 0 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -4832,6 +4832,11 @@ cdef class CoreWorker:
return (CCoreWorkerProcess.GetCoreWorker().GetWorkerContext()
.CurrentActorMaxConcurrency())

def get_current_root_detached_actor_id(self) -> ActorID:
jjyao marked this conversation as resolved.
Show resolved Hide resolved
# This is only used in test
return ActorID(CCoreWorkerProcess.GetCoreWorker().GetWorkerContext()
.GetRootDetachedActorID().Binary())

def get_queued_future(self, task_id: Optional[TaskID]) -> ConcurrentFuture:
"""Get a asyncio.Future that's queued in the event loop."""
with self._task_id_to_future_lock:
Expand Down
1 change: 1 addition & 0 deletions python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ cdef extern from "ray/core_worker/context.h" nogil:
c_bool CurrentActorIsAsync()
const c_string &GetCurrentSerializedRuntimeEnv()
int CurrentActorMaxConcurrency()
const CActorID &GetRootDetachedActorID()

cdef extern from "ray/core_worker/generator_waiter.h" nogil:
cdef cppclass CGeneratorBackpressureWaiter "ray::core::GeneratorBackpressureWaiter": # noqa
Expand Down
94 changes: 94 additions & 0 deletions python/ray/tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
run_string_as_driver_nonblocking,
wait_for_condition,
format_web_url,
wait_for_pid_to_exit,
)
from ray.job_config import JobConfig
from ray.job_submission import JobSubmissionClient
Expand Down Expand Up @@ -296,6 +297,99 @@ def verify():
# TODO(sang): Client entrypoint not supported yet.


def test_task_spec_root_detached_actor_id(shutdown_only):
"""Test to make sure root detached actor id is set correctly
for task spec of submitted task or actor.
"""

ray.init()

@ray.remote
def get_task_root_detached_actor_id():
core_worker = ray._private.worker.global_worker.core_worker
return core_worker.get_current_root_detached_actor_id().hex()

@ray.remote
class Actor:
def get_root_detached_actor_id(self):
core_worker = ray._private.worker.global_worker.core_worker
return core_worker.get_current_root_detached_actor_id().hex()

@ray.remote(lifetime="detached")
class DetachedActor:
def check(self):
core_worker = ray._private.worker.global_worker.core_worker
assert (
ray.get_runtime_context().get_actor_id()
== core_worker.get_current_root_detached_actor_id().hex()
)
assert ray.get_runtime_context().get_actor_id() == ray.get(
get_task_root_detached_actor_id.remote()
)
actor = Actor.remote()
assert ray.get_runtime_context().get_actor_id() == ray.get(
actor.get_root_detached_actor_id.remote()
)

assert (
ray.get(get_task_root_detached_actor_id.remote())
== ray._raylet.ActorID.nil().hex()
)
actor = Actor.remote()
assert (
ray.get(actor.get_root_detached_actor_id.remote())
== ray._raylet.ActorID.nil().hex()
)
detached_actor = DetachedActor.remote()
ray.get(detached_actor.check.remote())


def test_no_process_leak_after_job_finishes(ray_start_cluster):
"""Test to make sure when a job finishes,
all the worker processes belonging to it exit.
"""
cluster = ray_start_cluster
cluster.add_node(num_cpus=8)
ray.init(address=cluster.address)

@ray.remote(num_cpus=0)
class PidActor:
def __init__(self):
self.pids = set()
self.pids.add(os.getpid())

def add_pid(self, pid):
self.pids.add(pid)

def get_pids(self):
return self.pids

@ray.remote
def child(pid_actor):
# child worker process should be forcibly killed
# when the job finishes.
ray.get(pid_actor.add_pid.remote(os.getpid()))
time.sleep(1000000)

@ray.remote
def parent(pid_actor):
ray.get(pid_actor.add_pid.remote(os.getpid()))
child.remote(pid_actor)

pid_actor = PidActor.remote()
ray.get(parent.remote(pid_actor))

wait_for_condition(lambda: len(ray.get(pid_actor.get_pids.remote())) == 3)

pids = ray.get(pid_actor.get_pids.remote())

ray.shutdown()
# Job finishes at this point

for pid in pids:
wait_for_pid_to_exit(pid)


if __name__ == "__main__":

# Make subprocess happy in bazel.
Expand Down
14 changes: 14 additions & 0 deletions python/ray/workflow/tests/test_workflow_queuing.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import os
import pytest
import ray
from ray import workflow
from ray._private.test_utils import wait_for_condition
from ray.tests.conftest import * # noqa


Expand Down Expand Up @@ -146,6 +148,8 @@ def test_workflow_queuing_resume_all(shutdown_only, tmp_path):

@ray.remote
def long_running(x):
file_path = str(tmp_path / f".long_running_{x}")
open(file_path, "w")
with filelock.FileLock(lock_path):
return x

Expand All @@ -156,6 +160,16 @@ def long_running(x):
workflow.run_async(wfs[i], workflow_id=f"workflow_{i}") for i in range(4)
]

# Make sure workflow_0 and workflow_1 are running user code
# Otherwise it might run workflow code that contains
# ray.get() when ray.shutdown()
# is called and that can cause ray.get() to throw exception
# since raylet is stopped
# before worker process (this is a bug we should fix)
# and transition the workflow to FAILED status.
wait_for_condition(lambda: os.path.isfile(str(tmp_path / ".long_running_0")))
wait_for_condition(lambda: os.path.isfile(str(tmp_path / ".long_running_1")))

assert sorted(x[0] for x in workflow.list_all({workflow.RUNNING})) == [
"workflow_0",
"workflow_1",
Expand Down
1 change: 1 addition & 0 deletions src/mock/ray/raylet/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class MockWorkerInterface : public WorkerInterface {
MOCK_METHOD(bool, IsRegistered, (), (override));
MOCK_METHOD(rpc::CoreWorkerClientInterface *, rpc_client, (), (override));
MOCK_METHOD(bool, SetJobId, (const JobID &job_id), (override));
MOCK_METHOD(const ActorID &, GetRootDetachedActorId, (), (override));
};

} // namespace raylet
Expand Down
3 changes: 0 additions & 3 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -862,9 +862,6 @@ RAY_CONFIG(int64_t,
RAY_CONFIG(bool, worker_core_dump_exclude_plasma_store, true)
RAY_CONFIG(bool, raylet_core_dump_exclude_plasma_store, true)

/// Whether to kill idle workers of a terminated job.
RAY_CONFIG(bool, kill_idle_workers_of_terminated_job, true)

// Instruct the Python default worker to preload the specified imports.
// This is specified as a comma-separated list.
// If left empty, no such attempt will be made.
Expand Down
17 changes: 14 additions & 3 deletions src/ray/common/task/task_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,13 @@ TaskID TaskSpecification::ParentTaskId() const {
return TaskID::FromBinary(message_->parent_task_id());
}

ActorID TaskSpecification::RootDetachedActorId() const {
if (message_->root_detached_actor_id().empty() /* e.g., empty proto default */) {
return ActorID::Nil();
}
return ActorID::FromBinary(message_->root_detached_actor_id());
}

TaskID TaskSpecification::SubmitterTaskId() const {
if (message_->submitter_task_id().empty() /* e.g., empty proto default */) {
return TaskID::Nil();
Expand Down Expand Up @@ -198,7 +205,8 @@ int TaskSpecification::GetRuntimeEnvHash() const {
WorkerCacheKey env = {SerializedRuntimeEnv(),
GetRequiredResources().GetResourceMap(),
IsActorCreationTask(),
GetRequiredResources().Get(scheduling::ResourceID::GPU()) > 0};
GetRequiredResources().Get(scheduling::ResourceID::GPU()) > 0,
!(RootDetachedActorId().IsNil())};
return env.IntHash();
}

Expand Down Expand Up @@ -594,13 +602,15 @@ WorkerCacheKey::WorkerCacheKey(
const std::string serialized_runtime_env,
const absl::flat_hash_map<std::string, double> &required_resources,
bool is_actor,
bool is_gpu)
bool is_gpu,
bool is_root_detached_actor)
: serialized_runtime_env(serialized_runtime_env),
required_resources(RayConfig::instance().worker_resource_limits_enabled()
? required_resources
: absl::flat_hash_map<std::string, double>{}),
is_actor(is_actor && RayConfig::instance().isolate_workers_across_task_types()),
is_gpu(is_gpu && RayConfig::instance().isolate_workers_across_resource_types()),
is_root_detached_actor(is_root_detached_actor),
hash_(CalculateHash()) {}

std::size_t WorkerCacheKey::CalculateHash() const {
Expand All @@ -617,6 +627,7 @@ std::size_t WorkerCacheKey::CalculateHash() const {
boost::hash_combine(hash, serialized_runtime_env);
boost::hash_combine(hash, is_actor);
boost::hash_combine(hash, is_gpu);
boost::hash_combine(hash, is_root_detached_actor);

std::vector<std::pair<std::string, double>> resource_vars(required_resources.begin(),
required_resources.end());
Expand All @@ -637,7 +648,7 @@ bool WorkerCacheKey::operator==(const WorkerCacheKey &k) const {

bool WorkerCacheKey::EnvIsEmpty() const {
return IsRuntimeEnvEmpty(serialized_runtime_env) && required_resources.empty() &&
!is_gpu;
!is_gpu && !is_root_detached_actor;
}

std::size_t WorkerCacheKey::Hash() const { return hash_; }
Expand Down
14 changes: 12 additions & 2 deletions src/ray/common/task/task_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {

TaskID ParentTaskId() const;

ActorID RootDetachedActorId() const;

TaskID SubmitterTaskId() const;

size_t ParentCounter() const;
Expand Down Expand Up @@ -527,12 +529,17 @@ class WorkerCacheKey {
/// worker. \param required_resources The required resouce.
/// worker. \param is_actor Whether the worker will be an actor. This is set when
/// task type isolation between workers is enabled.
/// worker. \param iis_gpu Whether the worker will be using GPUs. This is set when
/// worker. \param is_gpu Whether the worker will be using GPUs. This is set when
/// resource type isolation between workers is enabled.
/// worker. \param is_root_detached_actor Whether the worker will be running
/// tasks or actors whose root ancestor is a detached actor. This is set
/// to prevent worker reuse between tasks whose root is the driver process
/// and tasks whose root is a detached actor.
WorkerCacheKey(const std::string serialized_runtime_env,
const absl::flat_hash_map<std::string, double> &required_resources,
bool is_actor,
bool is_gpu);
bool is_gpu,
bool is_root_detached_actor);

bool operator==(const WorkerCacheKey &k) const;

Expand Down Expand Up @@ -564,6 +571,9 @@ class WorkerCacheKey {
const bool is_actor;
/// Whether the worker is to use a GPU.
const bool is_gpu;
/// Whether the worker is to run tasks or actors
/// whose root is a detached actor.
const bool is_root_detached_actor;
/// The hash of the worker's environment. This is set to 0
/// for unspecified or empty environments.
const std::size_t hash_ = 0;
Expand Down
12 changes: 10 additions & 2 deletions src/ray/common/task/task_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,16 @@ class TaskSpecBuilder {
int max_retries,
bool retry_exceptions,
const std::string &serialized_retry_exception_allowlist,
const rpc::SchedulingStrategy &scheduling_strategy) {
const rpc::SchedulingStrategy &scheduling_strategy,
const ActorID root_detached_actor_id) {
message_->set_max_retries(max_retries);
message_->set_retry_exceptions(retry_exceptions);
message_->set_serialized_retry_exception_allowlist(
serialized_retry_exception_allowlist);
message_->mutable_scheduling_strategy()->CopyFrom(scheduling_strategy);
if (!root_detached_actor_id.IsNil()) {
message_->set_root_detached_actor_id(root_detached_actor_id.Binary());
}
return *this;
}

Expand Down Expand Up @@ -230,7 +234,8 @@ class TaskSpecBuilder {
bool is_asyncio = false,
const std::vector<ConcurrencyGroup> &concurrency_groups = {},
const std::string &extension_data = "",
bool execute_out_of_order = false) {
bool execute_out_of_order = false,
ActorID root_detached_actor_id = ActorID::Nil()) {
message_->set_type(TaskType::ACTOR_CREATION_TASK);
auto actor_creation_spec = message_->mutable_actor_creation_task_spec();
actor_creation_spec->set_actor_id(actor_id.Binary());
Expand Down Expand Up @@ -258,6 +263,9 @@ class TaskSpecBuilder {
}
actor_creation_spec->set_execute_out_of_order(execute_out_of_order);
message_->mutable_scheduling_strategy()->CopyFrom(scheduling_strategy);
if (!root_detached_actor_id.IsNil()) {
message_->set_root_detached_actor_id(root_detached_actor_id.Binary());
}
return *this;
}

Expand Down
Loading
Loading