[DO NOT MERGE][core] prototype plasma move semantics. #62896
Conversation
Signed-off-by: Kartica Modi <karticamodi@gmail.com>
Signed-off-by: Kartica Modi <karticamodi@gmail.com>
prefix to logs Signed-off-by: Kartica Modi <karticamodi@gmail.com>
Signed-off-by: Kartica Modi <karticamodi@gmail.com>
Signed-off-by: Kartica Modi <karticamodi@gmail.com>
Signed-off-by: Kartica Modi <karticamodi@gmail.com>
second ReleaseFreedObject early returning Signed-off-by: Kartica Modi <karticamodi@gmail.com>
Signed-off-by: Kartica Modi <karticamodi@gmail.com>
Signed-off-by: Kartica Modi <karticamodi@gmail.com>
There was a problem hiding this comment.
Code Review
This pull request introduces plasma move semantics to Ray, allowing local object copies to be freed after successful pushes to other nodes. The implementation includes push acknowledgment tracking in the ObjectManager and a modified deletion flow in the LocalObjectManager. Review feedback highlights a critical logic bug in the deletion flushing mechanism that could lead to memory leaks, concurrency concerns regarding push tracking keys, and performance risks due to excessive logging and significantly reduced reporting intervals.
| f"cmd={cmdline}" | ||
| ) | ||
|
|
||
| logger.info("\n".join(lines)) |
There was a problem hiding this comment.
Logging per-process memory statistics at the INFO level every 500ms (as configured in reporter_consts.py) will generate an excessive amount of log data, especially on nodes with many workers. This can lead to performance degradation and disk space issues. Please consider moving this to DEBUG level and removing the personal debug tag [karticam].
| logger.info("\n".join(lines)) | |
| logger.debug("\n".join(lines)) |
| # The reporter will report its statistics this often (milliseconds). | ||
| REPORTER_UPDATE_INTERVAL_MS = ray_constants.env_integer( | ||
| "REPORTER_UPDATE_INTERVAL_MS", 5000 | ||
| "REPORTER_UPDATE_INTERVAL_MS", 500 |
There was a problem hiding this comment.
Reducing the reporter update interval from 5000ms to 500ms increases the reporting frequency by 10x. This significantly increases the CPU and network overhead of the dashboard reporter on every node. Unless this is strictly required for the move semantics prototype, it should be reverted to the default value.
| "REPORTER_UPDATE_INTERVAL_MS", 500 | |
| "REPORTER_UPDATE_INTERVAL_MS", 5000 |
| /// in the associated process's log file. | ||
| /// NOTE: This requires event_stats=1. | ||
| RAY_CONFIG(int64_t, event_stats_print_interval_ms, 60000) | ||
| RAY_CONFIG(int64_t, event_stats_print_interval_ms, 2000) |
| if (free_objects_period_ms_ >= 0) { | ||
| objects_pending_deletion_.emplace(object_id); | ||
| } | ||
| if (objects_pending_deletion_.size() == free_objects_batch_size_ || | ||
| free_objects_period_ms_ == 0) { | ||
| FlushFreeObjects(); | ||
| free_objects_period_ms_ == 0 || local_only) { | ||
| FlushFreeObjects(local_only); |
There was a problem hiding this comment.
There is a logic bug here: if local_only is true, FlushFreeObjects(true) is called, which flushes all objects currently in objects_pending_deletion_ as local-only. If there were objects in that set waiting for a broadcasted deletion (from previous calls where local_only was false), they will now incorrectly skip the broadcast, leading to memory leaks on other nodes. Local-only deletions should be handled separately from the batched broadcast deletions.
if (local_only) {\n on_objects_freed_({object_id}, /*local_only=*/true);\n ProcessSpilledObjectsDeleteQueue(free_objects_batch_size_);\n } else {\n if (free_objects_period_ms_ >= 0) {\n objects_pending_deletion_.emplace(object_id);\n }\n if (objects_pending_deletion_.size() == free_objects_batch_size_ ||\n free_objects_period_ms_ == 0) {\n FlushFreeObjects(/*local_only=*/false);\n }\n }| RAY_LOG(INFO) << "[karticam] Object added " << object_id | ||
| << " size=" << (object_info.data_size + object_info.metadata_size) | ||
| << " used_memory=" << used_memory_ | ||
| << " num_local_objects=" << local_objects_.size(); |
There was a problem hiding this comment.
Logging every object addition at INFO level is too noisy for Ray's object manager, which can handle thousands of objects per second. This should be moved to DEBUG or VLOG and the personal tag [karticam] should be removed.
RAY_LOG(DEBUG) << "Object added " << object_id\n << " size=" << (object_info.data_size + object_info.metadata_size)\n << " used_memory=" << used_memory_\n << " num_local_objects=" << local_objects_.size();| auto push_key = std::make_pair(object_id, node_id); | ||
| push_ack_tracking_[push_key] = { | ||
| static_cast<int64_t>(num_chunks), 0, /*failed=*/false}; |
There was a problem hiding this comment.
Using std::pair<ObjectID, NodeID> as a key for push_ack_tracking_ is potentially unsafe if multiple pushes for the same object to the same node are initiated concurrently (e.g., due to retries or multiple pull requests). It is safer to use the unique push_id as the key. Note that you will also need to update the capture list of the lambda on line 495 to include push_id.
push_ack_tracking_[push_id] = {\n static_cast<int64_t>(num_chunks), 0, /*failed=*/false};| int64_t acked_chunks = 0; | ||
| bool failed = false; | ||
| }; | ||
| absl::flat_hash_map<std::pair<ObjectID, NodeID>, PushAckState> push_ack_tracking_; |
There was a problem hiding this comment.
As mentioned in the implementation file, consider using UniqueID (the push_id) as the key for push_ack_tracking_ to avoid collisions between concurrent pushes of the same object to the same node.
| absl::flat_hash_map<std::pair<ObjectID, NodeID>, PushAckState> push_ack_tracking_; | |
| absl::flat_hash_map<UniqueID, PushAckState> push_ack_tracking_; |
| // TODO(karticam): CHECK THESE EDGE CASES: | ||
| // consider driver handling producer and consumer. | ||
| // say push completes, and move semantic calls ReleaseFreedObject. now producer | ||
| // is executing ReleaseFreedObject. but while its doing so, the consumer function | ||
| // using the object finishes and the driver also makes ref go out of scope, which | ||
| // means that ref_count goes to 0 => which triggers pub-sub callback and | ||
| // releaseFreedObject is called again. This might RACE??? Since | ||
| // its highly possible that both calls to ReleaseFreedObject are posted on the same | ||
| // thread and are serial, this might be fine. CHECK IF THIS IS HAPPENING. Also, even | ||
| // if this is the case, the order of execution might be reversed. Say the consumer | ||
| // execution finishes before move semantics callback is triggered, then the ref count | ||
| // call of ReleaseFreedObject will be called before move semantic triggered | ||
| // ReleaseFreedObject. CHECK if that's fine!!! |
| free_objects_period_ms_ == 0) { | ||
| FlushFreeObjects(); | ||
| free_objects_period_ms_ == 0 || local_only) { | ||
| FlushFreeObjects(local_only); |
There was a problem hiding this comment.
FlushFreeObjects applies single local_only to mixed batch
High Severity
objects_pending_deletion_ accumulates objects from both move-semantics releases (local_only=true) and normal owner evictions (local_only=false). When FlushFreeObjects is called, it passes a single local_only flag to on_objects_freed_ for the entire batch. If a move-semantics release triggers an immediate flush while non-local objects are still queued, those objects get freed with local_only=true, preventing the broadcast of FreeObjectsRequest to remote nodes — causing a memory leak on those nodes.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit df1ea5b. Configure here.
| auto push_key = std::make_pair(object_id, node_id); | ||
| push_ack_tracking_[push_key] = { | ||
| static_cast<int64_t>(num_chunks), 0, /*failed=*/false}; | ||
| } |
There was a problem hiding this comment.
Duplicate push resets tracking causing premature completion
High Severity
push_ack_tracking_ is unconditionally overwritten (resetting acked_chunks to 0) before push_manager_->StartPush is called. When PushManager handles a duplicate push for the same (object_id, node_id), it calls ResendAllChunks, re-queuing all N chunks while old in-flight chunk callbacks are still pending. Those old callbacks increment acked_chunks on the freshly reset state, so on_push_complete_ can fire after only N total acks — mixing old and new callbacks — before the resent chunks are actually acknowledged. This frees the local copy while the remote node may have incomplete data.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit df1ea5b. Configure here.
Signed-off-by: Kartica Modi <karticamodi@gmail.com>
| << object_manager_.GetUsedMemoryPercentage() * 100 << "%)"; | ||
| local_object_manager_.ReleaseFreedObject(object_id, /*local_only=*/true); | ||
| }); | ||
| } |
There was a problem hiding this comment.
Move semantics fires on every push indiscriminately
High Severity
The on_push_complete_ callback calls ReleaseFreedObject(object_id, true) after any successful push, not just intentional "moves." When Node A pushes an object to Node B in response to a pull request, the local copy on Node A is freed — even if Node A's workers still hold references to that object. The only guard is checking local_objects_ (pinned objects), meaning any pinned object pushed to another node will be immediately unpinned and deleted from local plasma, potentially breaking local tasks that still need it.
Reviewed by Cursor Bugbot for commit 94eb751. Configure here.
| std::stringstream result; | ||
| result << "ObjectManager:"; | ||
| result << "\n- num local objects: " << local_objects_.size(); | ||
| result << "\n- [karticam] object store used (bytes): " << used_memory_; |
There was a problem hiding this comment.
HandleNodeRemoved leaks push_ack_tracking_ entries for removed nodes
Medium Severity
HandleNodeRemoved cleans up push_manager_ and remote_object_manager_clients_ but does not clean up push_ack_tracking_ entries keyed by the removed node_id. If the PushManager::HandleNodeRemoved cancels remaining chunk issuance, those in-progress entries will never reach acked_chunks == total_chunks and will remain in the map indefinitely. In a long-running cluster with node churn and large-object pushes, these orphaned entries accumulate as a memory leak and also prevent the on_push_complete_ callback from ever firing for those objects.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 94eb751. Configure here.
Signed-off-by: Kartica Modi <karticamodi@gmail.com>
…o race hypothesis Signed-off-by: Kartica Modi <karticamodi@gmail.com>
Signed-off-by: Kartica Modi <karticamodi@gmail.com>
| << "Received pull request from node for object"; | ||
| // RAY_LOG(INFO) << "[karticam] HandlePull: local_node=" << self_node_id_ | ||
| // << " received pull request from remote_node=" << node_id | ||
| // << " for object_id=" << object_id; |
There was a problem hiding this comment.
Existing DEBUG log lines deleted, replaced with commented-out code
Medium Severity
Two production RAY_LOG(DEBUG) statements were deleted and replaced with commented-out personal debug logs. In HandlePull, the log "Received pull request from node for object" (with structured fields) was removed. In HandleActorCallArgWaitComplete, the log "Actor task args are ready for tag" was removed. These provided useful debug-level observability that is now lost.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 27d8c43. Configure here.
| push_ack_tracking_.erase(it); | ||
| if (success && on_push_complete_) { | ||
| on_push_complete_(object_id); | ||
| } |
There was a problem hiding this comment.
Move semantics frees object after first destination push
High Severity
on_push_complete_ fires independently for each (object_id, node_id) pair, but immediately frees the local object copy. When an object is pushed to multiple nodes (e.g., two actors on different nodes pull the same object), the first completed push deletes the local copy from local_objects_, causing all subsequent Push calls for the same object to fail — Push checks local_objects_.count(object_id) and falls through to the unfulfilled-request path. In-flight pushes may survive via chunk_reader reference counting, but new pull requests from other nodes can no longer be serviced from this node.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 27d8c43. Configure here.
| # ) | ||
|
|
||
| # logger.info("\n".join(lines)) | ||
| return |
There was a problem hiding this comment.
No-op method and extensive commented-out debugging code committed
Low Severity
_log_process_memory_stats is a 50-line method whose entire body is commented out — it just does return. It's called every reporter cycle from _async_compose_stats_payload. Additionally, dozens of [karticam] commented-out debug logging blocks are scattered across multiple C++ files. These are clearly personal debugging artifacts from the prototype development that were not cleaned up before committing.
Reviewed by Cursor Bugbot for commit 27d8c43. Configure here.
|
This pull request has been automatically marked as stale because it has not had You can always ask for help on our discussion forum or Ray's public slack channel. If you'd like to keep this open, just leave any comment, and the stale label will be removed. |
Signed-off-by: Kartica Modi <karticamodi@gmail.com>
| /// in the associated process's log file. | ||
| /// NOTE: This requires event_stats=1. | ||
| RAY_CONFIG(int64_t, event_stats_print_interval_ms, 60000) | ||
| RAY_CONFIG(int64_t, event_stats_print_interval_ms, 2000) |
There was a problem hiding this comment.
Event stats interval reduced 30x for debugging
Medium Severity
event_stats_print_interval_ms was changed from 60000 (1 minute) to 2000 (2 seconds), a 30x increase in frequency of periodic event loop stats printing. This is a debugging change that would cause excessive log output in production.
Reviewed by Cursor Bugbot for commit 0ee2162. Configure here.
| RAY_LOG(DEBUG) << "Actor task args are ready for tag: " << tag; | ||
| // RAY_LOG(INFO) << "[karticam] MarkActorTaskArgsReady (posted callback running): | ||
| // " | ||
| // << "tag=" << tag << " thread=" << std::this_thread::get_id(); |
There was a problem hiding this comment.
Existing debug log statements accidentally deleted
Medium Severity
Existing RAY_LOG(DEBUG) statements were removed and replaced with commented-out [karticam] RAY_LOG(INFO) variants. In HandleActorCallArgWaitComplete, the "Actor task args are ready for tag" debug log is gone. In HandlePull, the "Received pull request from node for object" debug log is gone. These reduce observability for debugging production issues.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 0ee2162. Configure here.
| @@ -0,0 +1,202 @@ | |||
| # Plasma Move Semantics — State of the Branch | |||
There was a problem hiding this comment.
PR description is blank template boilerplate
Low Severity
To help reviewers, please ensure your PR includes:
- Title: A concise summary of the change
- Description:
- What problem does this solve?
- How does this PR solve it?
- Any relevant context for reviewers such as:
- Why is the problem important to solve?
- Why was this approach chosen over others?
See this list of PRs as examples for PRs that have gone above and beyond:
- [Core] Introduce local port service discovery #59613
- [Core] Improve Large-Scale Resource View Synchronization Through Sync Message Batching #57641
- Remove node observability information from hot path of core components #56474
- [core][rdt] Support out-of-order actors by extracting metadata when creating #59610
- [core] fix open leak for plasma store memory (shm/fallback) by workers #52622
Triggered by project rule: Bugbot Rules
Reviewed by Cursor Bugbot for commit 0ee2162. Configure here.
issue Signed-off-by: Kartica Modi <karticamodi@gmail.com>
| // Notify the receiver that a move-semantics push has completed and it is now | ||
| // the primary copy holder. Fire-and-forget; the sender (producer) does not | ||
| // need the reply to make progress. | ||
| rpc MoveCompleted(MoveCompletedRequest) returns (MoveCompletedReply); |
There was a problem hiding this comment.
Proto files modified require fault-tolerance review
Low Severity
.proto files.
Please review the RPC fault-tolerance & idempotency standards guide here:
https://github.com/ray-project/ray/tree/master/doc/source/ray-core/internals/rpc-fault-tolerance.rst
Additional Locations (1)
Triggered by project rule: Bugbot Rules
Reviewed by Cursor Bugbot for commit 5e1b292. Configure here.
Signed-off-by: Kartica Modi <karticamodi@gmail.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
There are 12 total unresolved issues (including 11 from previous reviews).
Reviewed by Cursor Bugbot for commit 86dccc2. Configure here.
| # The reporter will report its statistics this often (milliseconds). | ||
| REPORTER_UPDATE_INTERVAL_MS = ray_constants.env_integer( | ||
| "REPORTER_UPDATE_INTERVAL_MS", 5000 | ||
| "REPORTER_UPDATE_INTERVAL_MS", 500 |
There was a problem hiding this comment.
Prototype config defaults committed: intervals and feature flag
High Severity
Several config defaults were changed for debugging/benchmarking purposes: REPORTER_UPDATE_INTERVAL_MS 5000→500 (10x more frequent reporting), event_stats_print_interval_ms 60000→2000 (30x more frequent stats printing), and enable_plasma_move_semantics defaults to true. These would significantly increase CPU/log overhead in production and enable an incomplete prototype feature by default.
Additional Locations (2)
Reviewed by Cursor Bugbot for commit 86dccc2. Configure here.


Description
Related issues
Additional information