Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][2/2] Kill worker on root detached actor died. #45638

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 111 additions & 12 deletions python/ray/tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,20 @@ def check(self):
ray.get(detached_actor.check.remote())


@ray.remote(num_cpus=0)
class PidActor:
def __init__(self):
self.pids = set()
self.pids.add(os.getpid())

def add_pid(self, pid):
self.pids.add(pid)

def get_pids(self):
print(f"pids: {self.pids}")
return self.pids


def test_no_process_leak_after_job_finishes(ray_start_cluster):
"""Test to make sure when a job finishes,
all the worker processes belonging to it exit.
Expand All @@ -352,18 +366,6 @@ def test_no_process_leak_after_job_finishes(ray_start_cluster):
cluster.add_node(num_cpus=8)
ray.init(address=cluster.address)

@ray.remote(num_cpus=0)
class PidActor:
def __init__(self):
self.pids = set()
self.pids.add(os.getpid())

def add_pid(self, pid):
self.pids.add(pid)

def get_pids(self):
return self.pids

@ray.remote
def child(pid_actor):
# child worker process should be forcibly killed
Expand All @@ -390,6 +392,103 @@ def parent(pid_actor):
wait_for_pid_to_exit(pid)


@pytest.mark.parametrize("grandchild", ["task", "actor"])
def test_no_process_leak_after_detached_actor_finishes(ray_start_cluster, grandchild):
"""Test to make sure when a detached actor finishes,
all the worker processes descended from it exit.

Detached actor Spawner spawns stuff, and then gets killed. This is transitive and
applies to all descendants. So Spawner spawns actor/task which spawns actor/task.

When a worker dies, all children workers are recursively killed already. However
when there is a worker A (actor) -> worker B (task) -> worker C (actor/task), and
task B finishes, the worker B is released as idle. Then when A finishes, worker B
is no longer killed and C leaks. To test this, we spawn a short lived task (B) who
spawns a long lived actor/task (C).

In the code:

DetachedActor -> short_lived_task -> long_lived_task/LongLivedActor
"""
cluster = ray_start_cluster
cluster.add_node(num_cpus=8)
ray.init(address=cluster.address)

pid_actor = PidActor.remote()

@ray.remote(num_cpus=0)
def long_lived_task():
"""
Registers pid, and then sleeps.
"""
print(f"init {os.getpid()}")
ray.get(pid_actor.add_pid.remote(os.getpid()))
while True:
time.sleep(10000)

@ray.remote(num_cpus=0)
class LongLivedActor:
"""
Registers pid, and then sleeps.
"""

def __init__(self):
print(f"init {os.getpid()}")
ray.get(pid_actor.add_pid.remote(os.getpid()))

@ray.remote(num_cpus=0)
def short_lived_task(detached_actor, grandchild):
"""
Spawns long lived actor/task and exit.
"""
print(f"init {os.getpid()}")
ray.get(pid_actor.add_pid.remote(os.getpid()))
if grandchild == "task":
long_lived_task.remote()
else:
a = LongLivedActor.remote()
detached_actor.add_handle.remote(a)

time.sleep(1)
print(f"short_lived_task {os.getpid()} exiting")

@ray.remote(num_cpus=0, lifetime="detached")
class DetachedActor:
def __init__(self):
ray.get(pid_actor.add_pid.remote(os.getpid()))
self.actor_handles = []

def spawn(self, self_handle, grandchild):
print(f"spawning short_lived_task to spawn {grandchild=}")
ray.get(short_lived_task.remote(self_handle, grandchild))

def add_handle(self, handle):
# to avoid actor being GC'd
self.actor_handles.append(handle)

detached_actor = DetachedActor.options(
name="my_detached_actor", namespace="ns"
).remote()
ray.get(detached_actor.spawn.remote(detached_actor, grandchild))
# pids: pid_actor, detached_actor, child, grandchild
wait_for_condition(lambda: len(ray.get(pid_actor.get_pids.remote())) == 4)

pids = ray.get(pid_actor.get_pids.remote())

ray.shutdown()
# Job 1 finishes. pid_actor and short_lived_task should die, detached_actor and
# grandchild should live on.

# Job 2. Kill the actor
ray.init(address=cluster.address)
detached_actor_in_job2 = ray.get_actor("my_detached_actor", namespace="ns")
ray.kill(detached_actor_in_job2)

# All should die.
for pid in pids:
wait_for_pid_to_exit(pid)


if __name__ == "__main__":

# Make subprocess happy in bazel.
Expand Down
16 changes: 16 additions & 0 deletions src/ray/gcs/gcs_client/accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -404,12 +404,28 @@ void ActorInfoAccessor::AsyncResubscribe() {
}
}));
}
if (subscribe_all_operation_) {
RAY_CHECK_OK(subscribe_all_operation_(nullptr));
}
}

bool ActorInfoAccessor::IsActorUnsubscribed(const ActorID &actor_id) {
return client_impl_->GetGcsSubscriber().IsActorUnsubscribed(actor_id);
}

Status ActorInfoAccessor::AsyncSubscribeAll(
const SubscribeCallback<ActorID, rpc::ActorTableData> &subscribe,
const StatusCallback &done) {
RAY_LOG(DEBUG) << "Subscribing update operations of all actors.";
RAY_CHECK(subscribe) << "subcribe callback must not be empty.";

absl::MutexLock lock(&mutex_);
subscribe_all_operation_ = [this, subscribe](const StatusCallback &done) {
return client_impl_->GetGcsSubscriber().SubscribeAllActors(subscribe, done);
};
return subscribe_all_operation_(done);
}

NodeInfoAccessor::NodeInfoAccessor(GcsClient *client_impl) : client_impl_(client_impl) {}

Status NodeInfoAccessor::RegisterSelf(const GcsNodeInfo &local_node_info,
Expand Down
11 changes: 11 additions & 0 deletions src/ray/gcs/gcs_client/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,15 @@ class ActorInfoAccessor {
/// \return Whether the specified actor is unsubscribed.
virtual bool IsActorUnsubscribed(const ActorID &actor_id);

/// Subscribe to all update operations of all actors.
///
/// \param subscribe Callback that will be called each time when an actor is updated.
/// \param done Callback that will be called when subscription is complete.
/// \return Status
virtual Status AsyncSubscribeAll(
const SubscribeCallback<ActorID, rpc::ActorTableData> &subscribe,
const StatusCallback &done);

private:
// Mutex to protect the resubscribe_operations_ field and fetch_data_operations_ field.
absl::Mutex mutex_;
Expand All @@ -205,6 +214,8 @@ class ActorInfoAccessor {
absl::flat_hash_map<ActorID, FetchDataOperation> fetch_data_operations_
ABSL_GUARDED_BY(mutex_);

SubscribeOperation subscribe_all_operation_ ABSL_GUARDED_BY(mutex_);

GcsClient *client_impl_;
};

Expand Down
4 changes: 3 additions & 1 deletion src/ray/gcs/gcs_server/gcs_actor_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -554,14 +554,16 @@ class GcsActorManager : public rpc::ActorInfoHandler {
actor_delta->set_state(actor.state());
actor_delta->mutable_death_cause()->CopyFrom(actor.death_cause());
actor_delta->mutable_address()->CopyFrom(actor.address());
// Actor's is_detached is used by raylet to kill descendants.
actor_delta->set_is_detached(actor.is_detached());
actor_delta->set_num_restarts(actor.num_restarts());
actor_delta->set_timestamp(actor.timestamp());
actor_delta->set_pid(actor.pid());
actor_delta->set_start_time(actor.start_time());
actor_delta->set_end_time(actor.end_time());
actor_delta->set_repr_name(actor.repr_name());
actor_delta->set_preempted(actor.preempted());
// Acotr's namespace and name are used for removing cached name when it's dead.
// Actor's namespace and name are used for removing cached name when it's dead.
if (!actor.ray_namespace().empty()) {
actor_delta->set_ray_namespace(actor.ray_namespace());
}
Expand Down
27 changes: 27 additions & 0 deletions src/ray/gcs/pubsub/gcs_pub_sub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,33 @@ bool GcsSubscriber::IsActorUnsubscribed(const ActorID &id) {
rpc::ChannelType::GCS_ACTOR_CHANNEL, gcs_address_, id.Binary());
}

Status GcsSubscriber::SubscribeAllActors(
const SubscribeCallback<ActorID, rpc::ActorTableData> &subscribe,
const StatusCallback &done) {
// GCS subscriber.
auto subscribe_item_callback = [subscribe](const rpc::PubMessage &msg) {
RAY_CHECK(msg.channel_type() == rpc::ChannelType::GCS_ACTOR_CHANNEL);
const ActorID id = ActorID::FromBinary(msg.key_id());
subscribe(id, msg.actor_message());
};
auto subscription_failure_callback = [](const std::string &, const Status &status) {
RAY_LOG(WARNING) << "Subscription to Actor channel failed: " << status.ToString();
};
// Ignore if the subscription already exists, because the resubscription is intentional.
RAY_UNUSED(subscriber_->SubscribeChannel(
std::make_unique<rpc::SubMessage>(),
rpc::ChannelType::GCS_ACTOR_CHANNEL,
gcs_address_,
[done](Status status) {
if (done != nullptr) {
done(status);
}
},
std::move(subscribe_item_callback),
std::move(subscription_failure_callback)));
return Status::OK();
}

Status GcsSubscriber::SubscribeAllNodeInfo(
const ItemCallback<rpc::GcsNodeInfo> &subscribe, const StatusCallback &done) {
// GCS subscriber.
Expand Down
4 changes: 4 additions & 0 deletions src/ray/gcs/pubsub/gcs_pub_sub.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ class GcsSubscriber {

bool IsActorUnsubscribed(const ActorID &id);

Status SubscribeAllActors(
const SubscribeCallback<ActorID, rpc::ActorTableData> &subscribe,
const StatusCallback &done);

Status SubscribeAllJobs(const SubscribeCallback<JobID, rpc::JobTableData> &subscribe,
const StatusCallback &done);

Expand Down
52 changes: 42 additions & 10 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,12 @@ ray::Status NodeManager::RegisterGcs() {
RAY_RETURN_NOT_OK(
gcs_client_->Jobs().AsyncSubscribeAll(job_subscribe_handler, nullptr));

RAY_RETURN_NOT_OK(gcs_client_->Actors().AsyncSubscribeAll(
[this](const ActorID &actor_id, const rpc::ActorTableData &actor_data) {
HandleActorUpdate(actor_id, actor_data);
},
nullptr));

periodical_runner_.RunFnPeriodically(
[this] {
DumpDebugState();
Expand Down Expand Up @@ -618,20 +624,13 @@ void NodeManager::HandleJobStarted(const JobID &job_id, const JobTableData &job_
cluster_task_manager_->ScheduleAndDispatchTasks();
}

void NodeManager::HandleJobFinished(const JobID &job_id, const JobTableData &job_data) {
RAY_LOG(DEBUG) << "HandleJobFinished " << job_id;
RAY_CHECK(job_data.is_dead());
void NodeManager::KillLeasedWorkersByPredicate(
std::function<bool(const WorkerInterface &)> predicate) {
// Force kill all the worker processes belonging to the finished job
// so that no worker processes is leaked.
for (const auto &pair : leased_workers_) {
auto &worker = pair.second;
RAY_CHECK(!worker->GetAssignedJobId().IsNil());
if (worker->GetRootDetachedActorId().IsNil() &&
(worker->GetAssignedJobId() == job_id)) {
// Don't kill worker processes belonging to the detached actor
// since those are expected to outlive the job.
RAY_LOG(INFO) << "The leased worker " << worker->WorkerId()
<< " is killed because the job " << job_id << " finished.";
if (predicate(*worker)) {
rpc::ExitRequest request;
request.set_force_exit(true);
worker->rpc_client()->Exit(
Expand All @@ -646,9 +645,42 @@ void NodeManager::HandleJobFinished(const JobID &job_id, const JobTableData &job
});
}
}
}
void NodeManager::HandleJobFinished(const JobID &job_id, const JobTableData &job_data) {
RAY_LOG(DEBUG) << "HandleJobFinished " << job_id;
RAY_CHECK(job_data.is_dead());
KillLeasedWorkersByPredicate([job_id](const WorkerInterface &worker) {
RAY_CHECK(!worker.GetAssignedJobId().IsNil());
bool should_kill =
worker.GetRootDetachedActorId().IsNil() && (worker.GetAssignedJobId() == job_id);
if (should_kill) {
RAY_LOG(INFO) << "The leased worker " << worker.WorkerId()
<< " is killed because the job " << job_id << " finished.";
}
return should_kill;
});
worker_pool_.HandleJobFinished(job_id);
}

void NodeManager::HandleActorUpdate(const ActorID &actor_id,
const rpc::ActorTableData &actor_data) {
// If the actor is detached and dead, kill all its transitive children.
// Running workers are in the leased_workers_, while idle workers are in the
// worker_pool_.
if (actor_data.is_detached() && actor_data.state() == rpc::ActorTableData::DEAD) {
KillLeasedWorkersByPredicate([actor_id](const WorkerInterface &worker) {
bool should_kill = worker.GetRootDetachedActorId() == actor_id;
if (should_kill) {
RAY_LOG(INFO) << "The leased worker " << worker.WorkerId()
<< " is killed because the root detached actor " << actor_id
<< " is dead.";
}
return should_kill;
});
worker_pool_.OnDetachedActorDied(actor_id);
}
}

void NodeManager::DoLocalGC(bool triggered_by_global_gc) {
auto all_workers = worker_pool_.GetAllRegisteredWorkers();
for (const auto &driver : worker_pool_.GetAllRegisteredDrivers()) {
Expand Down
7 changes: 7 additions & 0 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,13 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
/// \return Void.
void HandleJobFinished(const JobID &job_id, const JobTableData &job_data);

void HandleActorUpdate(const ActorID &actor_id, const rpc::ActorTableData &actor_data);

/// Kills leased workers if `predicate(worker) == true`. Leased workers are scanned
/// exactly once.
void KillLeasedWorkersByPredicate(
std::function<bool(const WorkerInterface &)> predicate);

/// Process client message of NotifyDirectCallTaskBlocked
///
/// \param message_data A pointer to the message data.
Expand Down
Loading
Loading