From d04e69b1793ee6aab9276c89ddd6bda513637c31 Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Wed, 2 Aug 2023 15:17:41 -0700 Subject: [PATCH] fix exception handling Signed-off-by: Hao Chen --- .../execution/interfaces/physical_operator.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/python/ray/data/_internal/execution/interfaces/physical_operator.py b/python/ray/data/_internal/execution/interfaces/physical_operator.py index ad89c8083670a..43c83850425ef 100644 --- a/python/ray/data/_internal/execution/interfaces/physical_operator.py +++ b/python/ray/data/_internal/execution/interfaces/physical_operator.py @@ -57,17 +57,13 @@ def get_waitable(self) -> StreamingObjectRefGenerator: def on_waitable_ready(self): try: block_ref = next(self._streaming_gen) - except StopIteration: - self._task_done_callback() - return - - try: + if self._streaming_gen._generator_task_exception is not None: + raise self._streaming_gen._generator_task_exception meta = ray.get(next(self._streaming_gen)) except StopIteration: - ex = ray.get(block_ref) self._task_done_callback() - raise ex - self._data_ready_callback(RefBundle([(block_ref, meta)], owns_blocks=True)) + else: + self._data_ready_callback(RefBundle([(block_ref, meta)], owns_blocks=True)) class PhysicalOperator(Operator):