Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Streaming Generator] Fix a reference leak when pinning requests are received after refs are consumed. #35712

Conversation

rkooo567
Copy link
Contributor

@rkooo567 rkooo567 commented May 24, 2023

Why are these changes needed?

When we put a new object or an object is spilled, raylet sends a RPC to the owner. For example, it sends a request to the owner to pin the plasma object until the ref goes out of scope.

For none-generator tasks, we always guarantee to create return references, so we can handle these RPCs properly, However, when a generator task is used, we don't know the references ahead of time, which means it is not guaranteed to own the references when the RPC is received. In this case, we own a reference before it is reported from the executor. See the code below for more details.

reference_counter_->OwnDynamicStreamingTaskReturnRef(object_id, generator_id);

However, this code is prone to error and causes reference leaks. Here are some examples. Imagine "WRITE 0" means the generator task return is written to a stream index 0. PinObjectRPCRecieved means the raylet RPC is received. READ 0 means we read the index 0 from a stream.

Example 1

WRITE 0
READ 0
PinObjectRPCRecieved

-> This means the reference is already consumed, and the PinObjectRPCRecieved comes after that. In this case, we shouldn't add a reference to the object, otherwise, it will leak because we cannot read this ref anymore (cuz it is already consumed).

Example 2

PinObjectRPCRecieved
# And WRITE 0 failed

In this case, WRITE 0 is failed. So, the when the object is owned by PinObjectRPCRecieved, it will be never be cleaned up.

To handle all these cases, we introduce a new API TemporarilyInsertToStreamIfNeeded. This API will own the object only when

  • the corresponding ref was never consumed
  • The stream has not been deleted.
  • And add the object ref to the temporary refs until it is reported. If the report fails, all the references will be removed when the stream is deleted.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

…are consumed.

Signed-off-by: SangBin Cho <rkooo567@gmail.com>
Signed-off-by: SangBin Cho <rkooo567@gmail.com>
@@ -413,7 +444,7 @@ void TaskManager::DelObjectRefStream(const ObjectID &generator_id) {
for (const auto &object_id : object_ids_unconsumed) {
std::vector<ObjectID> deleted;
reference_counter_->RemoveLocalReference(object_id, &deleted);
RAY_CHECK_EQ(deleted.size(), 1UL);
RAY_CHECK_GE(deleted.size(), 1UL);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unrelated, but this is other issue I found. When a return reference includes a nested reference, this could be bigger than 1.

…plasma-bug

Signed-off-by: SangBin Cho <rkooo567@gmail.com>
@rkooo567
Copy link
Contributor Author

Failures seem unrelated to this PR

@rkooo567 rkooo567 merged commit 4aba87b into ray-project:master May 25, 2023
rkooo567 added a commit to rkooo567/ray that referenced this pull request May 25, 2023
…received after refs are consumed. (ray-project#35712)

When we put a new object or an object is spilled, raylet sends a RPC to the owner. For example, it sends a request to the owner to pin the plasma object until the ref goes out of scope.

For none-generator tasks, we always guarantee to create return references, so we can handle these RPCs properly, However, when a generator task is used, we don't know the references ahead of time, which means it is not guaranteed to own the references when the RPC is received. In this case, we own a reference before it is reported from the executor. See the code below for more details.

ray/src/ray/core_worker/core_worker.cc

Line 3292 in 5acf41e

 reference_counter_->OwnDynamicStreamingTaskReturnRef(object_id, generator_id);
However, this code is prone to error and causes reference leaks. Here are some examples. Imagine "WRITE 0" means the generator task return is written to a stream index 0. PinObjectRPCRecieved means the raylet RPC is received. READ 0 means we read the index 0 from a stream.

Example 1
WRITE 0
READ 0
PinObjectRPCRecieved
-> This means the reference is already consumed, and the PinObjectRPCRecieved comes after that. In this case, we shouldn't add a reference to the object, otherwise, it will leak because we cannot read this ref anymore (cuz it is already consumed).

Example 2
PinObjectRPCRecieved
In this case, WRITE 0 is failed. So, the when the object is owned by PinObjectRPCRecieved, it will be never be cleaned up.

To handle all these cases, we introduce a new API TemporarilyInsertToStreamIfNeeded. This API will own the object only when

the corresponding ref was never consumed
The stream has not been deleted.
And add the object ref to the temporary refs until it is reported. If the report fails, all the references will be removed when the stream is deleted.

Signed-off-by: SangBin Cho <rkooo567@gmail.com>
ArturNiederfahrenhorst pushed a commit that referenced this pull request May 26, 2023
…received after refs are consumed. (#35712) (#35794)

When we put a new object or an object is spilled, raylet sends a RPC to the owner. For example, it sends a request to the owner to pin the plasma object until the ref goes out of scope.

For none-generator tasks, we always guarantee to create return references, so we can handle these RPCs properly, However, when a generator task is used, we don't know the references ahead of time, which means it is not guaranteed to own the references when the RPC is received. In this case, we own a reference before it is reported from the executor. See the code below for more details.

ray/src/ray/core_worker/core_worker.cc

Line 3292 in 5acf41e

 reference_counter_->OwnDynamicStreamingTaskReturnRef(object_id, generator_id);
However, this code is prone to error and causes reference leaks. Here are some examples. Imagine "WRITE 0" means the generator task return is written to a stream index 0. PinObjectRPCRecieved means the raylet RPC is received. READ 0 means we read the index 0 from a stream.

Example 1
WRITE 0
READ 0
PinObjectRPCRecieved
-> This means the reference is already consumed, and the PinObjectRPCRecieved comes after that. In this case, we shouldn't add a reference to the object, otherwise, it will leak because we cannot read this ref anymore (cuz it is already consumed).

Example 2
PinObjectRPCRecieved
In this case, WRITE 0 is failed. So, the when the object is owned by PinObjectRPCRecieved, it will be never be cleaned up.

To handle all these cases, we introduce a new API TemporarilyInsertToStreamIfNeeded. This API will own the object only when

the corresponding ref was never consumed
The stream has not been deleted.
And add the object ref to the temporary refs until it is reported. If the report fails, all the references will be removed when the stream is deleted.

Signed-off-by: SangBin Cho <rkooo567@gmail.com>
scv119 pushed a commit to scv119/ray that referenced this pull request Jun 16, 2023
…received after refs are consumed. (ray-project#35712)

When we put a new object or an object is spilled, raylet sends a RPC to the owner. For example, it sends a request to the owner to pin the plasma object until the ref goes out of scope.

For none-generator tasks, we always guarantee to create return references, so we can handle these RPCs properly, However, when a generator task is used, we don't know the references ahead of time, which means it is not guaranteed to own the references when the RPC is received. In this case, we own a reference before it is reported from the executor. See the code below for more details.

ray/src/ray/core_worker/core_worker.cc

Line 3292 in 5acf41e

 reference_counter_->OwnDynamicStreamingTaskReturnRef(object_id, generator_id); 
However, this code is prone to error and causes reference leaks. Here are some examples. Imagine "WRITE 0" means the generator task return is written to a stream index 0. PinObjectRPCRecieved means the raylet RPC is received. READ 0 means we read the index 0 from a stream.

Example 1
WRITE 0
READ 0
PinObjectRPCRecieved
-> This means the reference is already consumed, and the PinObjectRPCRecieved comes after that. In this case, we shouldn't add a reference to the object, otherwise, it will leak because we cannot read this ref anymore (cuz it is already consumed).

Example 2
PinObjectRPCRecieved
# And WRITE 0 failed
In this case, WRITE 0 is failed. So, the when the object is owned by PinObjectRPCRecieved, it will be never be cleaned up.

To handle all these cases, we introduce a new API TemporarilyInsertToStreamIfNeeded. This API will own the object only when

the corresponding ref was never consumed
The stream has not been deleted.
And add the object ref to the temporary refs until it is reported. If the report fails, all the references will be removed when the stream is deleted.
arvind-chandra pushed a commit to lmco/ray that referenced this pull request Aug 31, 2023
…received after refs are consumed. (ray-project#35712)

When we put a new object or an object is spilled, raylet sends a RPC to the owner. For example, it sends a request to the owner to pin the plasma object until the ref goes out of scope.

For none-generator tasks, we always guarantee to create return references, so we can handle these RPCs properly, However, when a generator task is used, we don't know the references ahead of time, which means it is not guaranteed to own the references when the RPC is received. In this case, we own a reference before it is reported from the executor. See the code below for more details.

ray/src/ray/core_worker/core_worker.cc

Line 3292 in 5acf41e

 reference_counter_->OwnDynamicStreamingTaskReturnRef(object_id, generator_id);
However, this code is prone to error and causes reference leaks. Here are some examples. Imagine "WRITE 0" means the generator task return is written to a stream index 0. PinObjectRPCRecieved means the raylet RPC is received. READ 0 means we read the index 0 from a stream.

Example 1
WRITE 0
READ 0
PinObjectRPCRecieved
-> This means the reference is already consumed, and the PinObjectRPCRecieved comes after that. In this case, we shouldn't add a reference to the object, otherwise, it will leak because we cannot read this ref anymore (cuz it is already consumed).

Example 2
PinObjectRPCRecieved
# And WRITE 0 failed
In this case, WRITE 0 is failed. So, the when the object is owned by PinObjectRPCRecieved, it will be never be cleaned up.

To handle all these cases, we introduce a new API TemporarilyInsertToStreamIfNeeded. This API will own the object only when

the corresponding ref was never consumed
The stream has not been deleted.
And add the object ref to the temporary refs until it is reported. If the report fails, all the references will be removed when the stream is deleted.

Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants