From 5ca1c135f24d1148d11511c8ef81cfafa4b08488 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Thu, 13 Feb 2025 21:42:53 +0100 Subject: [PATCH 1/8] Debug error handling --- executorlib/interactive/shared.py | 24 +++- tests/test_dependencies_executor.py | 163 +++++++++++++++++++++++++++- 2 files changed, 179 insertions(+), 8 deletions(-) diff --git a/executorlib/interactive/shared.py b/executorlib/interactive/shared.py index 4ce64cce..5c0dbc1e 100644 --- a/executorlib/interactive/shared.py +++ b/executorlib/interactive/shared.py @@ -3,6 +3,7 @@ import queue import sys import time +from asyncio.exceptions import CancelledError from concurrent.futures import Future from time import sleep from typing import Any, Callable, Optional, Union @@ -307,7 +308,7 @@ def execute_separate_tasks( task_dict = future_queue.get() if "shutdown" in task_dict and task_dict["shutdown"]: if task_dict["wait"]: - _ = [process.join() for process in process_lst] + _ = [process.join() for process in process_lst if process.is_alive()] future_queue.task_done() future_queue.join() break @@ -361,7 +362,10 @@ def execute_tasks_with_dependencies( task_dict is not None and "fn" in task_dict and "future" in task_dict ): future_lst, ready_flag = _get_future_objects_from_input(task_dict=task_dict) - if len(future_lst) == 0 or ready_flag: + exception_lst = _get_exception_lst(future_lst=future_lst) + if len(exception_lst) > 0: + task_dict["future"].set_exception(exception_lst[0]) + elif len(future_lst) == 0 or ready_flag: # No future objects are used in the input or all future objects are already done task_dict["args"], task_dict["kwargs"] = _update_futures_in_input( args=task_dict["args"], kwargs=task_dict["kwargs"] @@ -455,7 +459,10 @@ def _submit_waiting_task(wait_lst: list[dict], executor_queue: queue.Queue) -> l """ wait_tmp_lst = [] for task_wait_dict in wait_lst: - if all(future.done() for future in task_wait_dict["future_lst"]): + exception_lst = _get_exception_lst(future_lst=task_wait_dict["future_lst"]) + if len(exception_lst) > 0: + task_wait_dict["future"].set_exception(exception_lst[0]) + elif all(future.done() for future in task_wait_dict["future_lst"]): del task_wait_dict["future_lst"] task_wait_dict["args"], task_wait_dict["kwargs"] = _update_futures_in_input( args=task_wait_dict["args"], kwargs=task_wait_dict["kwargs"] @@ -663,3 +670,14 @@ def _execute_task_with_cache( future = task_dict["future"] future.set_result(result) future_queue.task_done() + + +def _get_exception_lst(future_lst: list) -> list: + def get_exception(future_obj: Future) -> bool: + try: + excp = future_obj.exception(timeout=10 ^ -10) + return excp is not None and not isinstance(excp, CancelledError) + except TimeoutError: + return False + + return [f.exception() for f in future_lst if get_exception(future_obj=f)] diff --git a/tests/test_dependencies_executor.py b/tests/test_dependencies_executor.py index 0aa1d835..1a62a319 100644 --- a/tests/test_dependencies_executor.py +++ b/tests/test_dependencies_executor.py @@ -42,7 +42,7 @@ def return_input_dict(input_dict): return input_dict -def raise_error(): +def raise_error(parameter): raise RuntimeError @@ -106,6 +106,111 @@ def test_dependency_steps(self): self.assertTrue(fs2.done()) q.put({"shutdown": True, "wait": True}) + def test_dependency_steps_error(self): + cloudpickle_register(ind=1) + fs1 = Future() + fs2 = Future() + q = Queue() + q.put( + { + "fn": raise_error, + "args": (), + "kwargs": {"parameter": 0}, + "future": fs1, + "resource_dict": {"cores": 1}, + } + ) + q.put( + { + "fn": add_function, + "args": (), + "kwargs": {"parameter_1": 1, "parameter_2": fs1}, + "future": fs2, + "resource_dict": {"cores": 1}, + } + ) + executor = create_single_node_executor( + max_workers=1, + max_cores=2, + resource_dict={ + "cores": 1, + "threads_per_core": 1, + "gpus_per_core": 0, + "cwd": None, + "openmpi_oversubscribe": False, + "slurm_cmd_args": [], + }, + ) + process = RaisingThread( + target=execute_tasks_with_dependencies, + kwargs={ + "future_queue": q, + "executor_queue": executor._future_queue, + "executor": executor, + "refresh_rate": 0.01, + }, + ) + process.start() + self.assertFalse(fs1.done()) + self.assertFalse(fs2.done()) + self.assertTrue(fs1.exception() is not None) + self.assertTrue(fs2.exception() is not None) + with self.assertRaises(RuntimeError): + fs2.result() + q.put({"shutdown": True, "wait": True}) + + def test_dependency_steps_error_before(self): + cloudpickle_register(ind=1) + fs1 = Future() + fs1.set_exception(RuntimeError) + fs2 = Future() + q = Queue() + q.put( + { + "fn": add_function, + "args": (), + "kwargs": {"parameter_1": 1, "parameter_2": 2}, + "future": fs1, + "resource_dict": {"cores": 1}, + } + ) + q.put( + { + "fn": add_function, + "args": (), + "kwargs": {"parameter_1": 1, "parameter_2": fs1}, + "future": fs2, + "resource_dict": {"cores": 1}, + } + ) + executor = create_single_node_executor( + max_workers=1, + max_cores=2, + resource_dict={ + "cores": 1, + "threads_per_core": 1, + "gpus_per_core": 0, + "cwd": None, + "openmpi_oversubscribe": False, + "slurm_cmd_args": [], + }, + ) + process = RaisingThread( + target=execute_tasks_with_dependencies, + kwargs={ + "future_queue": q, + "executor_queue": executor._future_queue, + "executor": executor, + "refresh_rate": 0.01, + }, + ) + process.start() + self.assertTrue(fs1.exception() is not None) + self.assertTrue(fs2.exception() is not None) + with self.assertRaises(RuntimeError): + fs2.result() + q.put({"shutdown": True, "wait": True}) + def test_many_to_one(self): length = 5 parameter = 1 @@ -148,22 +253,70 @@ def test_block_allocation_false_one_worker(self): with self.assertRaises(RuntimeError): with SingleNodeExecutor(max_cores=1, block_allocation=False) as exe: cloudpickle_register(ind=1) - _ = exe.submit(raise_error) + _ = exe.submit(raise_error, parameter=0) def test_block_allocation_true_one_worker(self): with self.assertRaises(RuntimeError): with SingleNodeExecutor(max_cores=1, block_allocation=True) as exe: cloudpickle_register(ind=1) - _ = exe.submit(raise_error) + _ = exe.submit(raise_error, parameter=0) def test_block_allocation_false_two_workers(self): with self.assertRaises(RuntimeError): with SingleNodeExecutor(max_cores=2, block_allocation=False) as exe: cloudpickle_register(ind=1) - _ = exe.submit(raise_error) + _ = exe.submit(raise_error, parameter=0) def test_block_allocation_true_two_workers(self): with self.assertRaises(RuntimeError): with SingleNodeExecutor(max_cores=2, block_allocation=True) as exe: cloudpickle_register(ind=1) - _ = exe.submit(raise_error) + _ = exe.submit(raise_error, parameter=0) + + def test_block_allocation_false_one_worker_loop(self): + with self.assertRaises(RuntimeError): + with SingleNodeExecutor(max_cores=1, block_allocation=False) as exe: + cloudpickle_register(ind=1) + lst = [] + for i in range(1, 4): + lst = exe.submit( + raise_error, + parameter=lst, + ) + lst.result() + + def test_block_allocation_true_one_worker_loop(self): + with self.assertRaises(RuntimeError): + with SingleNodeExecutor(max_cores=1, block_allocation=True) as exe: + cloudpickle_register(ind=1) + lst = [] + for i in range(1, 4): + lst = exe.submit( + raise_error, + parameter=lst, + ) + lst.result() + + def test_block_allocation_false_two_workers_loop(self): + with self.assertRaises(RuntimeError): + with SingleNodeExecutor(max_cores=2, block_allocation=False) as exe: + cloudpickle_register(ind=1) + lst = [] + for i in range(1, 4): + lst = exe.submit( + raise_error, + parameter=lst, + ) + lst.result() + + def test_block_allocation_true_two_workers_loop(self): + with self.assertRaises(RuntimeError): + with SingleNodeExecutor(max_cores=2, block_allocation=True) as exe: + cloudpickle_register(ind=1) + lst = [] + for i in range(1, 4): + lst = exe.submit( + raise_error, + parameter=lst, + ) + lst.result() From 8efecc6ff62dd1c67755d101dc8909c70c1e21e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Thu, 13 Feb 2025 22:11:34 +0100 Subject: [PATCH 2/8] skip tests with lower python versions --- tests/test_dependencies_executor.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/test_dependencies_executor.py b/tests/test_dependencies_executor.py index 1a62a319..fd83e50a 100644 --- a/tests/test_dependencies_executor.py +++ b/tests/test_dependencies_executor.py @@ -1,5 +1,6 @@ from concurrent.futures import Future import unittest +import sys from time import sleep from queue import Queue @@ -106,6 +107,7 @@ def test_dependency_steps(self): self.assertTrue(fs2.done()) q.put({"shutdown": True, "wait": True}) + @unittest.skipIf(not (sys.version_info[0] >= 3 and sys.version_info[1] >= 10)) def test_dependency_steps_error(self): cloudpickle_register(ind=1) fs1 = Future() @@ -273,6 +275,7 @@ def test_block_allocation_true_two_workers(self): cloudpickle_register(ind=1) _ = exe.submit(raise_error, parameter=0) + @unittest.skipIf(not (sys.version_info[0] >= 3 and sys.version_info[1] >= 10)) def test_block_allocation_false_one_worker_loop(self): with self.assertRaises(RuntimeError): with SingleNodeExecutor(max_cores=1, block_allocation=False) as exe: @@ -285,6 +288,7 @@ def test_block_allocation_false_one_worker_loop(self): ) lst.result() + @unittest.skipIf(not (sys.version_info[0] >= 3 and sys.version_info[1] >= 10)) def test_block_allocation_true_one_worker_loop(self): with self.assertRaises(RuntimeError): with SingleNodeExecutor(max_cores=1, block_allocation=True) as exe: @@ -297,6 +301,7 @@ def test_block_allocation_true_one_worker_loop(self): ) lst.result() + @unittest.skipIf(not (sys.version_info[0] >= 3 and sys.version_info[1] >= 10)) def test_block_allocation_false_two_workers_loop(self): with self.assertRaises(RuntimeError): with SingleNodeExecutor(max_cores=2, block_allocation=False) as exe: @@ -309,6 +314,7 @@ def test_block_allocation_false_two_workers_loop(self): ) lst.result() + @unittest.skipIf(not (sys.version_info[0] >= 3 and sys.version_info[1] >= 10)) def test_block_allocation_true_two_workers_loop(self): with self.assertRaises(RuntimeError): with SingleNodeExecutor(max_cores=2, block_allocation=True) as exe: From d4946f96ccd6531d4246f3c1eeb20563140409a6 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 13 Feb 2025 22:20:31 +0100 Subject: [PATCH 3/8] Update shared.py --- executorlib/interactive/shared.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/interactive/shared.py b/executorlib/interactive/shared.py index 5c0dbc1e..fd0d05e9 100644 --- a/executorlib/interactive/shared.py +++ b/executorlib/interactive/shared.py @@ -675,7 +675,7 @@ def _execute_task_with_cache( def _get_exception_lst(future_lst: list) -> list: def get_exception(future_obj: Future) -> bool: try: - excp = future_obj.exception(timeout=10 ^ -10) + excp = future_obj.exception(timeout=10**-10) return excp is not None and not isinstance(excp, CancelledError) except TimeoutError: return False From 8b0898122543b7c0144e0898d349be5d2a6af3aa Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 13 Feb 2025 22:34:25 +0100 Subject: [PATCH 4/8] Update shared.py --- executorlib/interactive/shared.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/interactive/shared.py b/executorlib/interactive/shared.py index fd0d05e9..cb2abcf7 100644 --- a/executorlib/interactive/shared.py +++ b/executorlib/interactive/shared.py @@ -308,7 +308,7 @@ def execute_separate_tasks( task_dict = future_queue.get() if "shutdown" in task_dict and task_dict["shutdown"]: if task_dict["wait"]: - _ = [process.join() for process in process_lst if process.is_alive()] + _ = [process.join() for process in process_lst] future_queue.task_done() future_queue.join() break From 39547c5d183c69e64b50b595ae15fb67ed334e9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Thu, 13 Feb 2025 22:41:27 +0100 Subject: [PATCH 5/8] fix tests --- tests/test_dependencies_executor.py | 31 +++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/tests/test_dependencies_executor.py b/tests/test_dependencies_executor.py index fd83e50a..06a932fc 100644 --- a/tests/test_dependencies_executor.py +++ b/tests/test_dependencies_executor.py @@ -107,7 +107,10 @@ def test_dependency_steps(self): self.assertTrue(fs2.done()) q.put({"shutdown": True, "wait": True}) - @unittest.skipIf(not (sys.version_info[0] >= 3 and sys.version_info[1] >= 10)) + @unittest.skipIf( + condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 10), + reason="requires Python 3.10 or higher", + ) def test_dependency_steps_error(self): cloudpickle_register(ind=1) fs1 = Future() @@ -161,10 +164,14 @@ def test_dependency_steps_error(self): fs2.result() q.put({"shutdown": True, "wait": True}) + @unittest.skipIf( + condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 10), + reason="requires Python 3.10 or higher", + ) def test_dependency_steps_error_before(self): cloudpickle_register(ind=1) fs1 = Future() - fs1.set_exception(RuntimeError) + fs1.set_exception(RuntimeError()) fs2 = Future() q = Queue() q.put( @@ -275,7 +282,10 @@ def test_block_allocation_true_two_workers(self): cloudpickle_register(ind=1) _ = exe.submit(raise_error, parameter=0) - @unittest.skipIf(not (sys.version_info[0] >= 3 and sys.version_info[1] >= 10)) + @unittest.skipIf( + condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 10), + reason="requires Python 3.10 or higher", + ) def test_block_allocation_false_one_worker_loop(self): with self.assertRaises(RuntimeError): with SingleNodeExecutor(max_cores=1, block_allocation=False) as exe: @@ -288,7 +298,10 @@ def test_block_allocation_false_one_worker_loop(self): ) lst.result() - @unittest.skipIf(not (sys.version_info[0] >= 3 and sys.version_info[1] >= 10)) + @unittest.skipIf( + condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 10), + reason="requires Python 3.10 or higher", + ) def test_block_allocation_true_one_worker_loop(self): with self.assertRaises(RuntimeError): with SingleNodeExecutor(max_cores=1, block_allocation=True) as exe: @@ -301,7 +314,10 @@ def test_block_allocation_true_one_worker_loop(self): ) lst.result() - @unittest.skipIf(not (sys.version_info[0] >= 3 and sys.version_info[1] >= 10)) + @unittest.skipIf( + condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 10), + reason="requires Python 3.10 or higher", + ) def test_block_allocation_false_two_workers_loop(self): with self.assertRaises(RuntimeError): with SingleNodeExecutor(max_cores=2, block_allocation=False) as exe: @@ -314,7 +330,10 @@ def test_block_allocation_false_two_workers_loop(self): ) lst.result() - @unittest.skipIf(not (sys.version_info[0] >= 3 and sys.version_info[1] >= 10)) + @unittest.skipIf( + condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 10), + reason="requires Python 3.10 or higher", + ) def test_block_allocation_true_two_workers_loop(self): with self.assertRaises(RuntimeError): with SingleNodeExecutor(max_cores=2, block_allocation=True) as exe: From 1b992a1a71cf8f76f9512949b7f6febeb8f13405 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 13 Feb 2025 23:02:02 +0100 Subject: [PATCH 6/8] Update shared.py --- executorlib/interactive/shared.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/executorlib/interactive/shared.py b/executorlib/interactive/shared.py index cb2abcf7..a5703054 100644 --- a/executorlib/interactive/shared.py +++ b/executorlib/interactive/shared.py @@ -680,4 +680,7 @@ def get_exception(future_obj: Future) -> bool: except TimeoutError: return False - return [f.exception() for f in future_lst if get_exception(future_obj=f)] + if sys.version_info[0] >= 3 and sys.version_info[1] >= 10: + return [f.exception() for f in future_lst if get_exception(future_obj=f)] + else: + return [] From 1dba8be1c38130d0c4949085e7db671d29e505b2 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 13 Feb 2025 23:08:25 +0100 Subject: [PATCH 7/8] Update shared.py --- executorlib/interactive/shared.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/interactive/shared.py b/executorlib/interactive/shared.py index a5703054..4439145e 100644 --- a/executorlib/interactive/shared.py +++ b/executorlib/interactive/shared.py @@ -680,7 +680,7 @@ def get_exception(future_obj: Future) -> bool: except TimeoutError: return False - if sys.version_info[0] >= 3 and sys.version_info[1] >= 10: + if sys.version_info[0] >= 3 and sys.version_info[1] >= 11: return [f.exception() for f in future_lst if get_exception(future_obj=f)] else: return [] From d3dcbbc0d7fb3ed6097dea3acad191e332b77118 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 13 Feb 2025 23:09:49 +0100 Subject: [PATCH 8/8] Update test_dependencies_executor.py --- tests/test_dependencies_executor.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/tests/test_dependencies_executor.py b/tests/test_dependencies_executor.py index 06a932fc..69f5ddff 100644 --- a/tests/test_dependencies_executor.py +++ b/tests/test_dependencies_executor.py @@ -108,8 +108,8 @@ def test_dependency_steps(self): q.put({"shutdown": True, "wait": True}) @unittest.skipIf( - condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 10), - reason="requires Python 3.10 or higher", + condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 11), + reason="requires Python 3.11 or higher", ) def test_dependency_steps_error(self): cloudpickle_register(ind=1) @@ -165,8 +165,8 @@ def test_dependency_steps_error(self): q.put({"shutdown": True, "wait": True}) @unittest.skipIf( - condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 10), - reason="requires Python 3.10 or higher", + condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 11), + reason="requires Python 3.11 or higher", ) def test_dependency_steps_error_before(self): cloudpickle_register(ind=1) @@ -283,8 +283,8 @@ def test_block_allocation_true_two_workers(self): _ = exe.submit(raise_error, parameter=0) @unittest.skipIf( - condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 10), - reason="requires Python 3.10 or higher", + condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 11), + reason="requires Python 3.11 or higher", ) def test_block_allocation_false_one_worker_loop(self): with self.assertRaises(RuntimeError): @@ -299,8 +299,8 @@ def test_block_allocation_false_one_worker_loop(self): lst.result() @unittest.skipIf( - condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 10), - reason="requires Python 3.10 or higher", + condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 11), + reason="requires Python 3.11 or higher", ) def test_block_allocation_true_one_worker_loop(self): with self.assertRaises(RuntimeError): @@ -315,8 +315,8 @@ def test_block_allocation_true_one_worker_loop(self): lst.result() @unittest.skipIf( - condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 10), - reason="requires Python 3.10 or higher", + condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 11), + reason="requires Python 3.11 or higher", ) def test_block_allocation_false_two_workers_loop(self): with self.assertRaises(RuntimeError): @@ -331,8 +331,8 @@ def test_block_allocation_false_two_workers_loop(self): lst.result() @unittest.skipIf( - condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 10), - reason="requires Python 3.10 or higher", + condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 11), + reason="requires Python 3.11 or higher", ) def test_block_allocation_true_two_workers_loop(self): with self.assertRaises(RuntimeError):