Skip to content

Commit

Permalink
fix exception handling
Browse files Browse the repository at this point in the history
Signed-off-by: Hao Chen <chenh1024@gmail.com>
  • Loading branch information
raulchen committed Aug 2, 2023
1 parent 6a96030 commit d04e69b
Showing 1 changed file with 4 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,13 @@ def get_waitable(self) -> StreamingObjectRefGenerator:
def on_waitable_ready(self):
try:
block_ref = next(self._streaming_gen)
except StopIteration:
self._task_done_callback()
return

try:
if self._streaming_gen._generator_task_exception is not None:
raise self._streaming_gen._generator_task_exception
meta = ray.get(next(self._streaming_gen))
except StopIteration:
ex = ray.get(block_ref)
self._task_done_callback()
raise ex
self._data_ready_callback(RefBundle([(block_ref, meta)], owns_blocks=True))
else:
self._data_ready_callback(RefBundle([(block_ref, meta)], owns_blocks=True))


class PhysicalOperator(Operator):
Expand Down

0 comments on commit d04e69b

Please sign in to comment.