Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bug][core] streaming generator data loss #38232

Closed
raulchen opened this issue Aug 8, 2023 · 0 comments
Closed

[bug][core] streaming generator data loss #38232

raulchen opened this issue Aug 8, 2023 · 0 comments
Assignees
Labels
bug Something that is supposed to be working; but isn't core Issues that should be addressed in Ray Core P0 Issues that should be fixed in short order Ray 2.7

Comments

@raulchen
Copy link
Contributor

raulchen commented Aug 8, 2023

What happened + What you expected to happen

Ray Data uses streaming generator with the following pattern. there is a small chance that the returned objects may get lost due to a race condition. And then makes the loop never stops.

import ray
import time


@ray.remote(num_returns="streaming")
def f(n):
    for i in range(n):
        yield i


gens = [f.remote(50) for _ in range(100)]


start = time.time()
while gens:
    ready, _  = ray.wait(gens, num_returns=(len(gens)), timeout=0.1)
    for gen in ready:
        while True:
            try:
                block_ref = gen._next_sync(0)
                if block_ref.is_nil():
                    break
                ray.get(block_ref)
            except StopIteration:
                gens.remove(gen)
                break

This is what happens:

  1. When the driver receives a return object, it puts it into the local memory store.
  2. When doing ray.wait(gens), we will temporarily create an ObjectRef with the next ID and use it for wait (code). This wait will succeed.
  3. When the temporary ObjectRef is GC'ed, the ref count goes to 0 and we delete the object from the memory store.
  4. Then when doing gen._next_sync(0), the object is no longer available.

I've verified that this issue can be fixed by storing the next ObjectRef in the streaming generator.

diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx
index eaa52e1dab..23bb52572a 100644
--- a/python/ray/_raylet.pyx
+++ b/python/ray/_raylet.pyx
@@ -253,13 +253,12 @@ class StreamingObjectRefGenerator:
         # Ray's worker class. ray._private.worker.global_worker
         self.worker = worker
         self.worker.check_connected()
+        self._next_ref = self.worker.core_worker.peek_object_ref_stream(
+            self._generator_ref)
         assert hasattr(worker, "core_worker")
 
     def get_next_ref(self) -> ObjectRef:
-        self.worker.check_connected()
-        core_worker = self.worker.core_worker
-        return core_worker.peek_object_ref_stream(
-            self._generator_ref)
+        return self._next_ref
 
     def __iter__(self) -> "StreamingObjectRefGenerator":
         return self
@@ -312,16 +311,16 @@ class StreamingObjectRefGenerator:
         core_worker = self.worker.core_worker
 
         # Wait for the next ObjectRef to become ready.
-        expected_ref = core_worker.peek_object_ref_stream(
-            self._generator_ref)
         ready, unready = ray.wait(
-            [expected_ref], timeout=timeout_s, fetch_local=False)
+            [self._next_ref], timeout=timeout_s, fetch_local=False)
         if len(unready) > 0:
             return ObjectRef.nil()
 
         try:
             ref = core_worker.try_read_next_object_ref_stream(
                 self._generator_ref)
+            self._next_ref = core_worker.peek_object_ref_stream(
+                self._generator_ref)
             assert not ref.is_nil()
         except ObjectRefStreamEndOfStreamError:
             if self._generator_task_exception:
@@ -359,8 +358,7 @@ class StreamingObjectRefGenerator:
         self.worker.check_connected()
         core_worker = self.worker.core_worker
 
-        ref = core_worker.peek_object_ref_stream(
-            self._generator_ref)
+        ref = self._next_ref
         # TODO(swang): Avoid fetching the value.
         ready, unready = await asyncio.wait([self.suppress_exceptions(ref)],
                                             timeout=timeout_s)
@@ -371,6 +369,8 @@ class StreamingObjectRefGenerator:
             ref = core_worker.try_read_next_object_ref_stream(
                 self._generator_ref)
             assert not ref.is_nil()
+            self._next_ref = core_worker.peek_object_ref_stream(
+                self._generator_ref)
         except ObjectRefStreamEndOfStreamError:
             if self._generator_task_exception:
                 # Exception has been returned. raise StopIteration.

Versions / Dependencies

master

Reproduction script

I found this issue when running this benchmark with this branch.

The use pattern is pretty much the same as the above script. But I wasn't able to reproduce the issue with the above script.

Issue Severity

High: It blocks me from completing my task.

@raulchen raulchen added bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) core Issues that should be addressed in Ray Core labels Aug 8, 2023
@rkooo567 rkooo567 added P0 Issues that should be fixed in short order Ray 2.7 and removed triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Aug 9, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't core Issues that should be addressed in Ray Core P0 Issues that should be fixed in short order Ray 2.7
Projects
None yet
Development

No branches or pull requests

2 participants