[core] Actor reconstruction when a creation arg is in plasma#51653
[core] Actor reconstruction when a creation arg is in plasma#51653dayshah wants to merge 3 commits intoray-project:masterfrom
Conversation
c48e6c1 to
bb6f5c7
Compare
|
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 14 days if no further activity occurs. Thank you for your contributions.
|
|
This pull request has been automatically marked as stale because it has not had You can always ask for help on our discussion forum or Ray's public slack channel. If you'd like to keep this open, just leave any comment, and the stale label will be removed. |
Signed-off-by: dayshah <dhyey2019@gmail.com>
bb6f5c7 to
ba746ef
Compare
Signed-off-by: dayshah <dhyey2019@gmail.com>
|
@jjyao can you take an initial look at this solution to see if it makes sense |
There was a problem hiding this comment.
Pull Request Overview
This PR ensures that when an actor creation argument lives in plasma, its lineage is kept alive for potential restarts and then properly released once the actor dies.
- Configure
TaskManagerto increment lineage refs for retryable, non-detached actor creation tasks. - Send a new
OwnedActorDeadRPC on actor death to decrement those lineage refs. - Implement
ReferenceCounter::DecrementLineageRefCount, hook it into CoreWorker, and add an end-to-end test.
Reviewed Changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| src/ray/rpc/worker/core_worker_server.h | Register new OwnedActorDead RPC handler |
| src/ray/rpc/worker/core_worker_client.h | Add OwnedActorDead stub and retryable client method |
| src/ray/protobuf/core_worker.proto | Define OwnedActorDeadRequest/Reply and add to CoreWorkerService |
| src/ray/gcs/gcs_server/gcs_server.cc | Provide GCS with a placeholder callback when constructing CoreWorkerClient |
| src/ray/gcs/gcs_server/gcs_actor_manager.cc | Send OwnedActorDead RPC on non-detached actor death |
| src/ray/core_worker/task_manager.cc | Skip lineage release for actor creation tasks retried by GCS |
| src/ray/core_worker/reference_count.h | Declare DecrementLineageRefCount |
| src/ray/core_worker/reference_count.cc | Implement DecrementLineageRefCount |
| src/ray/core_worker/core_worker.h | Track owned_actor_ids_ and declare handler |
| src/ray/core_worker/core_worker.cc | Handle OwnedActorDead, update CreateActor for retry logic |
| src/mock/ray/rpc/worker/core_worker_client.h | Mock OwnedActorDead client method |
| python/ray/tests/test_actor_lineage_reconstruction.py | New test for plasma-based actor reconstruction |
Comments suppressed due to low confidence (2)
src/ray/gcs/gcs_server/gcs_server.cc:483
- The empty-capture lambda passed as the CoreWorkerClient callback likely has the wrong signature. It should match the expected
ClientCallback<...>signature (e.g.(const Status&, const rpc::OwnedActorDeadReply&)) to compile and behave correctly.
address, client_call_manager_, []() {
src/ray/core_worker/task_manager.cc:917
- After erasing
itwithsubmissible_tasks_.erase(it), the function should return early to avoid dereferencing an invalid iterator in subsequent code.
} else if (it->second.num_retries_left != 0 && spec.IsActorCreationTask()) {
One solution here would be to ban spawning detached actors that have args in plasma (always inline all args into the task spec). We can't leave it in a completely broken state, at a minimum we should loudly warn the user if this happens. |
edoakes
left a comment
There was a problem hiding this comment.
The implementation feels very brittle to me given that we are bolting on some special cases in multiple places. Specifically, passing hard-coded max_retries=1 is misusing the TaskManager interface.
Let's explore if there is a way we can make this behavior more explicit, either by:
- Adding a separate argument to
TaskManager. - Not relying on the
TaskManagerto increment the lineage ref count for us in this case, and instead doing it fully manually.
| wait_for_condition(lambda: verify3()) | ||
|
|
||
|
|
||
| def test_actor_reconstruction_relies_on_plasma_object(ray_start_cluster): |
There was a problem hiding this comment.
needs comments. at a minimum, a header docstring comment that describes the high level goal of the test.
| // For named actor, we still go through the sync way because for | ||
| // functions like list actors these actors need to be there, especially | ||
| // for local driver. But the current code all go through the gcs right now. |
There was a problem hiding this comment.
can you clean up the wording of this comment while you're touching it
There was a problem hiding this comment.
this branch in behavior is really quite problematic. surely there's a better way to handle this consistency requirement
| return Status::OK(); | ||
| } | ||
|
|
||
| task_manager_->AddPendingTask( |
There was a problem hiding this comment.
The header comment for AddPendingTask needs to be updated:
ray/src/ray/core_worker/task_manager.h
Line 210 in c54437c
| rpc_address_, | ||
| task_spec, | ||
| CurrentCallSite(), | ||
| // Actor creation task retry happens through the gcs, so the task manager only |
There was a problem hiding this comment.
this should also describe why we have the !task_spec.IsDetachedActor() condition
| // spec here and also don't need to count this against | ||
| // total_lineage_footprint_bytes_. GCS will directly release lineage for the |
There was a problem hiding this comment.
hm why doesn't it count against total_lineage_footprint_bytes_?
| mutable utils::container::ThreadSafeSharedLruCache<std::string, rpc::RuntimeEnvInfo> | ||
| runtime_env_json_serialization_cache_; | ||
|
|
||
| // Reconstructable actors owned by this worker. |
There was a problem hiding this comment.
| // Reconstructable actors owned by this worker. | |
| // Restartable actors owned by this worker. |
also indicate the lifecycle of entries in the map
Ya agreed. Updated the initial description, the issue isn't the increment, the issue is that TaskManager's |
|
I talked to @jjyao about this, and basically there's still an issue here if the argument of the actor creation task is the output of a retryable task. The arg could get evicted even if The solution that simplifies everything is to have an actor manager on the owner worker for the actors it owns. And the gcs actor manager should only worry about detached actors. We can explicitly document the restart limitations for detached actors, and non-detached actors get all the lineage reconstruction benefits that come with objects and can follow the same codepath. |
|
If it's not blocking anything. highly suggest we design and do it properly. Adding more RPCs between core worker and GCS due to split brain will just make things more and more complicated. |
|
We made the decision to do this by killing the split brain. |
…sma (#53713) Currently actor restarts won't because lineage ref counting doesn't work for actors with restarts. See description here #51653 (comment). Minimal repro ``` cluster = ray_start_cluster cluster.add_node(num_cpus=0) # head ray.init(address=cluster.address) worker1 = cluster.add_node(num_cpus=1) @ray.remote(num_cpus=1, max_restarts=1) class Actor: def __init__(self, config): self.config = config def ping(self): return self.config # Arg is >100kb so will go in the object store actor = Actor.remote(np.zeros(100 * 1024 * 1024, dtype=np.uint8)) ray.get(actor.ping.remote()) worker2 = cluster.add_node(num_cpus=1) cluster.remove_node(worker1, allow_graceful=True) # This line will break ray.get(actor.ping.remote()) ``` --------- Signed-off-by: dayshah <dhyey2019@gmail.com> Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com> Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com>
…sma (#53713) Currently actor restarts won't because lineage ref counting doesn't work for actors with restarts. See description here #51653 (comment). Minimal repro ``` cluster = ray_start_cluster cluster.add_node(num_cpus=0) # head ray.init(address=cluster.address) worker1 = cluster.add_node(num_cpus=1) @ray.remote(num_cpus=1, max_restarts=1) class Actor: def __init__(self, config): self.config = config def ping(self): return self.config # Arg is >100kb so will go in the object store actor = Actor.remote(np.zeros(100 * 1024 * 1024, dtype=np.uint8)) ray.get(actor.ping.remote()) worker2 = cluster.add_node(num_cpus=1) cluster.remove_node(worker1, allow_graceful=True) # This line will break ray.get(actor.ping.remote()) ``` --------- Signed-off-by: dayshah <dhyey2019@gmail.com> Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com> Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com> Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
…sma (#53713) Currently actor restarts won't because lineage ref counting doesn't work for actors with restarts. See description here #51653 (comment). Minimal repro ``` cluster = ray_start_cluster cluster.add_node(num_cpus=0) # head ray.init(address=cluster.address) worker1 = cluster.add_node(num_cpus=1) @ray.remote(num_cpus=1, max_restarts=1) class Actor: def __init__(self, config): self.config = config def ping(self): return self.config # Arg is >100kb so will go in the object store actor = Actor.remote(np.zeros(100 * 1024 * 1024, dtype=np.uint8)) ray.get(actor.ping.remote()) worker2 = cluster.add_node(num_cpus=1) cluster.remove_node(worker1, allow_graceful=True) # This line will break ray.get(actor.ping.remote()) ``` --------- Signed-off-by: dayshah <dhyey2019@gmail.com> Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com> Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com> Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
Problem
The current issue is that on success of initial actor creation
lineage_ref_countfor args in plasma is decremented, even if the actor could restart later. Therefore the objects won't stick around and could get deleted by the time the actor needs to restart.For context, the actor creation task is initially submitted by the owner. But all the actor restart, restart scheduling, and actor death logic happens in the gcs actor manager. This necessitates a more complicated solution. An ideal solution would be to use the gcs actor manager only for detached actors and have normal actors managed by an actor manager on the owner worker.
Solution:
lineage_ref_counttask_spec.MaxActorRestarts() != 0 && !task_spec.IsDetachedActor(). Because of this, the task manager will not decrement thelineage_ref_countwhen the actor creation task finishes for the first time.lineage_ref_countOwnedActorDeadrpc on actor death (no restarts left + not detached) to decrement thelineage_ref_countfor all the objects that it was incremented for in step 1.Idempotency of
HandleOwnedActorDead: Keeping a new set of owned actor id's in the core worker. Adding to it when an actor creation task is first submitted. Removing on the first execution ofHandleOwnedActorDeadand never executing it if the actor id is not in the set.Note: This feature will still not work for detached actors because there is inherently no owner.