From 65bf1da495a6cd92f04c29669a2014e625c1437a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Fri, 28 Mar 2025 13:21:22 +0100 Subject: [PATCH 01/11] cluster error handling --- executorlib/backend/cache_parallel.py | 23 ++++++++++++++++------- executorlib/cache/backend.py | 20 +++++++++++++++----- executorlib/cache/shared.py | 12 ++++++++---- executorlib/interactive/shared.py | 7 +++++-- executorlib/standalone/hdf.py | 11 +++++++---- tests/test_standalone_hdf.py | 12 ++++++++---- 6 files changed, 59 insertions(+), 26 deletions(-) diff --git a/executorlib/backend/cache_parallel.py b/executorlib/backend/cache_parallel.py index 1ebdeb4a..d9df58c4 100644 --- a/executorlib/backend/cache_parallel.py +++ b/executorlib/backend/cache_parallel.py @@ -39,13 +39,22 @@ def main() -> None: apply_dict = backend_load_file(file_name=file_name) apply_dict = MPI.COMM_WORLD.bcast(apply_dict, root=0) output = apply_dict["fn"].__call__(*apply_dict["args"], **apply_dict["kwargs"]) - result = MPI.COMM_WORLD.gather(output, root=0) if mpi_size_larger_one else output - if mpi_rank_zero: - backend_write_file( - file_name=file_name, - output=result, - runtime=time.time() - time_start, - ) + try: + result = MPI.COMM_WORLD.gather(output, root=0) if mpi_size_larger_one else output + except Exception as error: + if mpi_rank_zero: + backend_write_file( + file_name=file_name, + output={"error": error}, + runtime=time.time() - time_start, + ) + else: + if mpi_rank_zero: + backend_write_file( + file_name=file_name, + output={"result": result}, + runtime=time.time() - time_start, + ) MPI.COMM_WORLD.Barrier() diff --git a/executorlib/cache/backend.py b/executorlib/cache/backend.py index 0e0bf779..3f8368b3 100644 --- a/executorlib/cache/backend.py +++ b/executorlib/cache/backend.py @@ -44,10 +44,16 @@ def backend_write_file(file_name: str, output: Any, runtime: float) -> None: """ file_name_out = os.path.splitext(file_name)[0] os.rename(file_name, file_name_out + ".h5ready") - dump( - file_name=file_name_out + ".h5ready", - data_dict={"output": output, "runtime": runtime}, - ) + if "result" in output: + dump( + file_name=file_name_out + ".h5ready", + data_dict={"output": output["result"], "runtime": runtime}, + ) + else: + dump( + file_name=file_name_out + ".h5ready", + data_dict={"error": output["error"], "runtime": runtime}, + ) os.rename(file_name_out + ".h5ready", file_name_out + ".h5out") @@ -63,7 +69,11 @@ def backend_execute_task_in_file(file_name: str) -> None: """ apply_dict = backend_load_file(file_name=file_name) time_start = time.time() - result = apply_dict["fn"].__call__(*apply_dict["args"], **apply_dict["kwargs"]) + try: + result = {"result": apply_dict["fn"].__call__(*apply_dict["args"], **apply_dict["kwargs"])} + except Exception as error: + result = {"error": error} + backend_write_file( file_name=file_name, output=result, diff --git a/executorlib/cache/shared.py b/executorlib/cache/shared.py index 7e15764c..36c3f693 100644 --- a/executorlib/cache/shared.py +++ b/executorlib/cache/shared.py @@ -30,9 +30,11 @@ def result(self) -> Any: str: The result of the future item. """ - exec_flag, result = get_output(file_name=self._file_name) - if exec_flag: + exec_flag, no_error_flag, result = get_output(file_name=self._file_name) + if exec_flag and no_error_flag: return result + elif exec_flag: + raise result else: return self.result() @@ -198,9 +200,11 @@ def _check_task_output( file_name = os.path.join(cache_directory, task_key, "cache.h5out") if not os.path.exists(file_name): return future_obj - exec_flag, result = get_output(file_name=file_name) - if exec_flag: + exec_flag, no_error_flag, result = get_output(file_name=file_name) + if exec_flag and no_error_flag: future_obj.set_result(result) + elif exec_flag: + future_obj.set_exception(result) return future_obj diff --git a/executorlib/interactive/shared.py b/executorlib/interactive/shared.py index 86fa62dd..bf9153ca 100644 --- a/executorlib/interactive/shared.py +++ b/executorlib/interactive/shared.py @@ -171,9 +171,12 @@ def _execute_task_with_cache( else: _task_done(future_queue=future_queue) else: - _, result = get_output(file_name=file_name) + _, no_error_flag, result = get_output(file_name=file_name) future = task_dict["future"] - future.set_result(result) + if no_error_flag: + future.set_result(result) + else: + future.set_exception(result) _task_done(future_queue=future_queue) diff --git a/executorlib/standalone/hdf.py b/executorlib/standalone/hdf.py index aa765555..0e67332d 100644 --- a/executorlib/standalone/hdf.py +++ b/executorlib/standalone/hdf.py @@ -10,6 +10,7 @@ "args": "input_args", "kwargs": "input_kwargs", "output": "output", + "error": "error", "runtime": "runtime", "queue_id": "queue_id", } @@ -60,7 +61,7 @@ def load(file_name: str) -> dict: return data_dict -def get_output(file_name: str) -> tuple[bool, Any]: +def get_output(file_name: str) -> tuple[bool, bool, Any]: """ Check if output is available in the HDF5 file @@ -68,13 +69,15 @@ def get_output(file_name: str) -> tuple[bool, Any]: file_name (str): file name of the HDF5 file as absolute path Returns: - Tuple[bool, object]: boolean flag indicating if output is available and the output object itself + Tuple[bool, bool, object]: boolean flag indicating if output is available and the output object itself """ with h5py.File(file_name, "r") as hdf: if "output" in hdf: - return True, cloudpickle.loads(np.void(hdf["/output"])) + return True, True, cloudpickle.loads(np.void(hdf["/output"])) + elif "error" in hdf: + return True, False, cloudpickle.loads(np.void(hdf["/output"])) else: - return False, None + return False, False, None def get_runtime(file_name: str) -> float: diff --git a/tests/test_standalone_hdf.py b/tests/test_standalone_hdf.py index 56cae425..d3d5d1f7 100644 --- a/tests/test_standalone_hdf.py +++ b/tests/test_standalone_hdf.py @@ -39,8 +39,9 @@ def test_hdf_mixed(self): self.assertTrue("fn" in data_dict.keys()) self.assertEqual(data_dict["args"], [a]) self.assertEqual(data_dict["kwargs"], {"b": b}) - flag, output = get_output(file_name=file_name) + flag, no_error, output = get_output(file_name=file_name) self.assertTrue(get_runtime(file_name=file_name) == 0.0) + self.assertFalse(no_error) self.assertFalse(flag) self.assertIsNone(output) @@ -55,9 +56,10 @@ def test_hdf_args(self): self.assertTrue("fn" in data_dict.keys()) self.assertEqual(data_dict["args"], [a, b]) self.assertEqual(data_dict["kwargs"], {}) - flag, output = get_output(file_name=file_name) + flag, no_error, output = get_output(file_name=file_name) self.assertTrue(get_runtime(file_name=file_name) == 0.0) self.assertFalse(flag) + self.assertFalse(no_error) self.assertIsNone(output) def test_hdf_kwargs(self): @@ -80,9 +82,10 @@ def test_hdf_kwargs(self): self.assertEqual(data_dict["args"], ()) self.assertEqual(data_dict["kwargs"], {"a": a, "b": b}) self.assertEqual(get_queue_id(file_name=file_name), 123) - flag, output = get_output(file_name=file_name) + flag, no_error, output = get_output(file_name=file_name) self.assertTrue(get_runtime(file_name=file_name) == 0.0) self.assertFalse(flag) + self.assertFalse(no_error) self.assertIsNone(output) def test_hdf_queue_id(self): @@ -95,9 +98,10 @@ def test_hdf_queue_id(self): data_dict={"queue_id": queue_id}, ) self.assertEqual(get_queue_id(file_name=file_name), 123) - flag, output = get_output(file_name=file_name) + flag, no_error, output = get_output(file_name=file_name) self.assertTrue(get_runtime(file_name=file_name) == 0.0) self.assertFalse(flag) + self.assertFalse(no_error) self.assertIsNone(output) def tearDown(self): From c36d437a278cd69f70efc5ee42588b723e13d6d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Fri, 28 Mar 2025 13:24:04 +0100 Subject: [PATCH 02/11] fix error read --- executorlib/standalone/hdf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/standalone/hdf.py b/executorlib/standalone/hdf.py index 0e67332d..8fb26f72 100644 --- a/executorlib/standalone/hdf.py +++ b/executorlib/standalone/hdf.py @@ -75,7 +75,7 @@ def get_output(file_name: str) -> tuple[bool, bool, Any]: if "output" in hdf: return True, True, cloudpickle.loads(np.void(hdf["/output"])) elif "error" in hdf: - return True, False, cloudpickle.loads(np.void(hdf["/output"])) + return True, False, cloudpickle.loads(np.void(hdf["/error"])) else: return False, False, None From 61058650ec2aa9cee2af0deee397c2d7004b7f05 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 28 Mar 2025 12:25:14 +0000 Subject: [PATCH 03/11] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/backend/cache_parallel.py | 4 +++- executorlib/cache/backend.py | 6 +++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/executorlib/backend/cache_parallel.py b/executorlib/backend/cache_parallel.py index d9df58c4..f094446d 100644 --- a/executorlib/backend/cache_parallel.py +++ b/executorlib/backend/cache_parallel.py @@ -40,7 +40,9 @@ def main() -> None: apply_dict = MPI.COMM_WORLD.bcast(apply_dict, root=0) output = apply_dict["fn"].__call__(*apply_dict["args"], **apply_dict["kwargs"]) try: - result = MPI.COMM_WORLD.gather(output, root=0) if mpi_size_larger_one else output + result = ( + MPI.COMM_WORLD.gather(output, root=0) if mpi_size_larger_one else output + ) except Exception as error: if mpi_rank_zero: backend_write_file( diff --git a/executorlib/cache/backend.py b/executorlib/cache/backend.py index 3f8368b3..cbb649e8 100644 --- a/executorlib/cache/backend.py +++ b/executorlib/cache/backend.py @@ -70,7 +70,11 @@ def backend_execute_task_in_file(file_name: str) -> None: apply_dict = backend_load_file(file_name=file_name) time_start = time.time() try: - result = {"result": apply_dict["fn"].__call__(*apply_dict["args"], **apply_dict["kwargs"])} + result = { + "result": apply_dict["fn"].__call__( + *apply_dict["args"], **apply_dict["kwargs"] + ) + } except Exception as error: result = {"error": error} From 6dd9817cfc38cf92354985dac62e5edbfb66fa0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Fri, 28 Mar 2025 13:36:48 +0100 Subject: [PATCH 04/11] Add additional tests --- tests/test_cache_fileexecutor_serial.py | 13 +++++++++++++ tests/test_standalone_hdf.py | 15 +++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/tests/test_cache_fileexecutor_serial.py b/tests/test_cache_fileexecutor_serial.py index 2a923965..c6ac1b1a 100644 --- a/tests/test_cache_fileexecutor_serial.py +++ b/tests/test_cache_fileexecutor_serial.py @@ -27,6 +27,10 @@ def list_files_in_working_directory(): return os.listdir(os.getcwd()) +def get_error(a): + raise ValueError(a) + + @unittest.skipIf( skip_h5py_test, "h5py is not installed, so the h5py tests are skipped." ) @@ -68,6 +72,15 @@ def test_executor_working_directory(self): fs1 = exe.submit(list_files_in_working_directory) self.assertEqual(fs1.result(), os.listdir(cwd)) + def test_executor_error(self): + cwd = os.path.join(os.path.dirname(__file__), "executables") + with FileExecutor( + resource_dict={"cwd": cwd}, execute_function=execute_in_subprocess + ) as exe: + fs1 = exe.submit(get_error, a=1) + with self.assertRaises(ValueError): + fs1.result() + def test_executor_function(self): fs1 = Future() q = Queue() diff --git a/tests/test_standalone_hdf.py b/tests/test_standalone_hdf.py index d3d5d1f7..95d1aa6e 100644 --- a/tests/test_standalone_hdf.py +++ b/tests/test_standalone_hdf.py @@ -104,6 +104,21 @@ def test_hdf_queue_id(self): self.assertFalse(no_error) self.assertIsNone(output) + def test_hdf_error(self): + cache_directory = os.path.abspath("cache") + os.makedirs(cache_directory, exist_ok=True) + file_name = os.path.join(cache_directory, "test_error.h5") + error = ValueError() + dump( + file_name=file_name, + data_dict={"error": error}, + ) + flag, no_error, output = get_output(file_name=file_name) + self.assertTrue(get_runtime(file_name=file_name) == 0.0) + self.assertTrue(flag) + self.assertFalse(no_error) + self.assertTrue(isinstance(output, error.__class__)) + def tearDown(self): if os.path.exists("cache"): shutil.rmtree("cache") From 73dca53b76e034f39600bed5a750b01046ba0ff8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Fri, 28 Mar 2025 13:46:46 +0100 Subject: [PATCH 05/11] extend error handling --- tests/test_cache_fileexecutor_serial.py | 2 +- tests/test_singlenodeexecutor_cache.py | 13 +++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/tests/test_cache_fileexecutor_serial.py b/tests/test_cache_fileexecutor_serial.py index c6ac1b1a..6992d9c2 100644 --- a/tests/test_cache_fileexecutor_serial.py +++ b/tests/test_cache_fileexecutor_serial.py @@ -73,7 +73,7 @@ def test_executor_working_directory(self): self.assertEqual(fs1.result(), os.listdir(cwd)) def test_executor_error(self): - cwd = os.path.join(os.path.dirname(__file__), "executables") + cwd = os.path.join(os.path.dirname(__file__), "error") with FileExecutor( resource_dict={"cwd": cwd}, execute_function=execute_in_subprocess ) as exe: diff --git a/tests/test_singlenodeexecutor_cache.py b/tests/test_singlenodeexecutor_cache.py index cc5752da..790b69a6 100644 --- a/tests/test_singlenodeexecutor_cache.py +++ b/tests/test_singlenodeexecutor_cache.py @@ -12,6 +12,10 @@ skip_h5py_test = True +def get_error(a): + raise ValueError(a) + + @unittest.skipIf( skip_h5py_test, "h5py is not installed, so the h5io tests are skipped." ) @@ -28,6 +32,15 @@ def test_cache_data(self): sum([sum(c["input_args"][0]) for c in cache_lst]), sum(result_lst) ) + def test_cache_error(self): + cache_directory = "./cache_error" + with SingleNodeExecutor(cache_directory=cache_directory) as exe: + f = exe.submit(get_error, a=1) + with self.assertRaises(ValueError): + print(f.result()) + def tearDown(self): if os.path.exists("cache"): shutil.rmtree("cache") + if os.path.exists("cache_error"): + shutil.rmtree("cache_error") From f312b419b3eefba539ace405c8de6deeac5c6e07 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Fri, 28 Mar 2025 13:53:03 +0100 Subject: [PATCH 06/11] folder error --- tests/test_cache_fileexecutor_serial.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_cache_fileexecutor_serial.py b/tests/test_cache_fileexecutor_serial.py index 6992d9c2..c6ac1b1a 100644 --- a/tests/test_cache_fileexecutor_serial.py +++ b/tests/test_cache_fileexecutor_serial.py @@ -73,7 +73,7 @@ def test_executor_working_directory(self): self.assertEqual(fs1.result(), os.listdir(cwd)) def test_executor_error(self): - cwd = os.path.join(os.path.dirname(__file__), "error") + cwd = os.path.join(os.path.dirname(__file__), "executables") with FileExecutor( resource_dict={"cwd": cwd}, execute_function=execute_in_subprocess ) as exe: From cea1a4ed28c57ae47beb2e6542088c9cff72a3d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Fri, 28 Mar 2025 14:03:29 +0100 Subject: [PATCH 07/11] fix testing with flux --- tests/test_singlenodeexecutor_cache.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_singlenodeexecutor_cache.py b/tests/test_singlenodeexecutor_cache.py index 790b69a6..71bb49b8 100644 --- a/tests/test_singlenodeexecutor_cache.py +++ b/tests/test_singlenodeexecutor_cache.py @@ -3,6 +3,7 @@ import unittest from executorlib import SingleNodeExecutor +from executorlib.standalone.serialize import cloudpickle_register try: from executorlib.standalone.hdf import get_cache_data @@ -35,6 +36,7 @@ def test_cache_data(self): def test_cache_error(self): cache_directory = "./cache_error" with SingleNodeExecutor(cache_directory=cache_directory) as exe: + cloudpickle_register(ind=1) f = exe.submit(get_error, a=1) with self.assertRaises(ValueError): print(f.result()) From 9d03eddd568f0256ffa881b4aca8b80b547f6fde Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Fri, 28 Mar 2025 14:10:14 +0100 Subject: [PATCH 08/11] another test --- tests/test_cache_backend_execute.py | 34 +++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/tests/test_cache_backend_execute.py b/tests/test_cache_backend_execute.py index 46cbb718..0dce06c6 100644 --- a/tests/test_cache_backend_execute.py +++ b/tests/test_cache_backend_execute.py @@ -19,6 +19,10 @@ def my_funct(a, b): return a + b +def get_error(a): + raise ValueError(a) + + @unittest.skipIf( skip_h5io_test, "h5io is not installed, so the h5io tests are skipped." ) @@ -107,6 +111,36 @@ def test_execute_function_kwargs(self): self.assertTrue(future_file_obj.done()) self.assertEqual(future_file_obj.result(), 3) + def test_execute_function_error(self): + cache_directory = os.path.abspath("cache") + os.makedirs(cache_directory, exist_ok=True) + task_key, data_dict = serialize_funct_h5( + fn=get_error, + fn_args=[], + fn_kwargs={"a": 1}, + ) + file_name = os.path.join(cache_directory, task_key, "cache.h5in") + os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True) + dump(file_name=file_name, data_dict=data_dict) + backend_execute_task_in_file(file_name=file_name) + future_obj = Future() + _check_task_output( + task_key=task_key, future_obj=future_obj, cache_directory=cache_directory + ) + self.assertTrue(future_obj.done()) + with self.assertRaises(ValueError): + future_obj.result() + self.assertTrue( + get_runtime(file_name=os.path.join(cache_directory, task_key, "cache.h5out")) + > 0.0 + ) + future_file_obj = FutureItem( + file_name=os.path.join(cache_directory, task_key, "cache.h5out") + ) + self.assertTrue(future_file_obj.done()) + with self.assertRaises(ValueError): + future_file_obj.result() + def tearDown(self): if os.path.exists("cache"): shutil.rmtree("cache") From bc4c07d5604c881d3d9cfd3b03d579f0ef918f56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Fri, 28 Mar 2025 14:25:46 +0100 Subject: [PATCH 09/11] hit cache again --- tests/test_singlenodeexecutor_cache.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/test_singlenodeexecutor_cache.py b/tests/test_singlenodeexecutor_cache.py index 71bb49b8..6fae449b 100644 --- a/tests/test_singlenodeexecutor_cache.py +++ b/tests/test_singlenodeexecutor_cache.py @@ -40,6 +40,11 @@ def test_cache_error(self): f = exe.submit(get_error, a=1) with self.assertRaises(ValueError): print(f.result()) + with SingleNodeExecutor(cache_directory=cache_directory) as exe: + cloudpickle_register(ind=1) + f = exe.submit(get_error, a=1) + with self.assertRaises(ValueError): + print(f.result()) def tearDown(self): if os.path.exists("cache"): From 733f1577b6e8923fdc31d0e24ab1ea29fdb63a71 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Fri, 28 Mar 2025 14:40:07 +0100 Subject: [PATCH 10/11] in interactive mode only successful calculation are stored --- executorlib/interactive/shared.py | 5 +---- tests/test_singlenodeexecutor_cache.py | 5 ----- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/executorlib/interactive/shared.py b/executorlib/interactive/shared.py index bf9153ca..44268e4d 100644 --- a/executorlib/interactive/shared.py +++ b/executorlib/interactive/shared.py @@ -173,10 +173,7 @@ def _execute_task_with_cache( else: _, no_error_flag, result = get_output(file_name=file_name) future = task_dict["future"] - if no_error_flag: - future.set_result(result) - else: - future.set_exception(result) + future.set_result(result) _task_done(future_queue=future_queue) diff --git a/tests/test_singlenodeexecutor_cache.py b/tests/test_singlenodeexecutor_cache.py index 6fae449b..71bb49b8 100644 --- a/tests/test_singlenodeexecutor_cache.py +++ b/tests/test_singlenodeexecutor_cache.py @@ -40,11 +40,6 @@ def test_cache_error(self): f = exe.submit(get_error, a=1) with self.assertRaises(ValueError): print(f.result()) - with SingleNodeExecutor(cache_directory=cache_directory) as exe: - cloudpickle_register(ind=1) - f = exe.submit(get_error, a=1) - with self.assertRaises(ValueError): - print(f.result()) def tearDown(self): if os.path.exists("cache"): From bf1074712579603aaa4f65405253dbb8be539485 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Fri, 28 Mar 2025 14:40:51 +0100 Subject: [PATCH 11/11] clean up --- executorlib/interactive/shared.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/interactive/shared.py b/executorlib/interactive/shared.py index 44268e4d..cc582347 100644 --- a/executorlib/interactive/shared.py +++ b/executorlib/interactive/shared.py @@ -171,7 +171,7 @@ def _execute_task_with_cache( else: _task_done(future_queue=future_queue) else: - _, no_error_flag, result = get_output(file_name=file_name) + _, _, result = get_output(file_name=file_name) future = task_dict["future"] future.set_result(result) _task_done(future_queue=future_queue)