diff --git a/executorlib/backend/cache_parallel.py b/executorlib/backend/cache_parallel.py index 1ebdeb4a..f094446d 100644 --- a/executorlib/backend/cache_parallel.py +++ b/executorlib/backend/cache_parallel.py @@ -39,13 +39,24 @@ def main() -> None: apply_dict = backend_load_file(file_name=file_name) apply_dict = MPI.COMM_WORLD.bcast(apply_dict, root=0) output = apply_dict["fn"].__call__(*apply_dict["args"], **apply_dict["kwargs"]) - result = MPI.COMM_WORLD.gather(output, root=0) if mpi_size_larger_one else output - if mpi_rank_zero: - backend_write_file( - file_name=file_name, - output=result, - runtime=time.time() - time_start, + try: + result = ( + MPI.COMM_WORLD.gather(output, root=0) if mpi_size_larger_one else output ) + except Exception as error: + if mpi_rank_zero: + backend_write_file( + file_name=file_name, + output={"error": error}, + runtime=time.time() - time_start, + ) + else: + if mpi_rank_zero: + backend_write_file( + file_name=file_name, + output={"result": result}, + runtime=time.time() - time_start, + ) MPI.COMM_WORLD.Barrier() diff --git a/executorlib/cache/backend.py b/executorlib/cache/backend.py index 0e0bf779..cbb649e8 100644 --- a/executorlib/cache/backend.py +++ b/executorlib/cache/backend.py @@ -44,10 +44,16 @@ def backend_write_file(file_name: str, output: Any, runtime: float) -> None: """ file_name_out = os.path.splitext(file_name)[0] os.rename(file_name, file_name_out + ".h5ready") - dump( - file_name=file_name_out + ".h5ready", - data_dict={"output": output, "runtime": runtime}, - ) + if "result" in output: + dump( + file_name=file_name_out + ".h5ready", + data_dict={"output": output["result"], "runtime": runtime}, + ) + else: + dump( + file_name=file_name_out + ".h5ready", + data_dict={"error": output["error"], "runtime": runtime}, + ) os.rename(file_name_out + ".h5ready", file_name_out + ".h5out") @@ -63,7 +69,15 @@ def backend_execute_task_in_file(file_name: str) -> None: """ apply_dict = backend_load_file(file_name=file_name) time_start = time.time() - result = apply_dict["fn"].__call__(*apply_dict["args"], **apply_dict["kwargs"]) + try: + result = { + "result": apply_dict["fn"].__call__( + *apply_dict["args"], **apply_dict["kwargs"] + ) + } + except Exception as error: + result = {"error": error} + backend_write_file( file_name=file_name, output=result, diff --git a/executorlib/cache/shared.py b/executorlib/cache/shared.py index 7e15764c..36c3f693 100644 --- a/executorlib/cache/shared.py +++ b/executorlib/cache/shared.py @@ -30,9 +30,11 @@ def result(self) -> Any: str: The result of the future item. """ - exec_flag, result = get_output(file_name=self._file_name) - if exec_flag: + exec_flag, no_error_flag, result = get_output(file_name=self._file_name) + if exec_flag and no_error_flag: return result + elif exec_flag: + raise result else: return self.result() @@ -198,9 +200,11 @@ def _check_task_output( file_name = os.path.join(cache_directory, task_key, "cache.h5out") if not os.path.exists(file_name): return future_obj - exec_flag, result = get_output(file_name=file_name) - if exec_flag: + exec_flag, no_error_flag, result = get_output(file_name=file_name) + if exec_flag and no_error_flag: future_obj.set_result(result) + elif exec_flag: + future_obj.set_exception(result) return future_obj diff --git a/executorlib/interactive/shared.py b/executorlib/interactive/shared.py index 86fa62dd..cc582347 100644 --- a/executorlib/interactive/shared.py +++ b/executorlib/interactive/shared.py @@ -171,7 +171,7 @@ def _execute_task_with_cache( else: _task_done(future_queue=future_queue) else: - _, result = get_output(file_name=file_name) + _, _, result = get_output(file_name=file_name) future = task_dict["future"] future.set_result(result) _task_done(future_queue=future_queue) diff --git a/executorlib/standalone/hdf.py b/executorlib/standalone/hdf.py index aa765555..8fb26f72 100644 --- a/executorlib/standalone/hdf.py +++ b/executorlib/standalone/hdf.py @@ -10,6 +10,7 @@ "args": "input_args", "kwargs": "input_kwargs", "output": "output", + "error": "error", "runtime": "runtime", "queue_id": "queue_id", } @@ -60,7 +61,7 @@ def load(file_name: str) -> dict: return data_dict -def get_output(file_name: str) -> tuple[bool, Any]: +def get_output(file_name: str) -> tuple[bool, bool, Any]: """ Check if output is available in the HDF5 file @@ -68,13 +69,15 @@ def get_output(file_name: str) -> tuple[bool, Any]: file_name (str): file name of the HDF5 file as absolute path Returns: - Tuple[bool, object]: boolean flag indicating if output is available and the output object itself + Tuple[bool, bool, object]: boolean flag indicating if output is available and the output object itself """ with h5py.File(file_name, "r") as hdf: if "output" in hdf: - return True, cloudpickle.loads(np.void(hdf["/output"])) + return True, True, cloudpickle.loads(np.void(hdf["/output"])) + elif "error" in hdf: + return True, False, cloudpickle.loads(np.void(hdf["/error"])) else: - return False, None + return False, False, None def get_runtime(file_name: str) -> float: diff --git a/tests/test_cache_backend_execute.py b/tests/test_cache_backend_execute.py index 46cbb718..0dce06c6 100644 --- a/tests/test_cache_backend_execute.py +++ b/tests/test_cache_backend_execute.py @@ -19,6 +19,10 @@ def my_funct(a, b): return a + b +def get_error(a): + raise ValueError(a) + + @unittest.skipIf( skip_h5io_test, "h5io is not installed, so the h5io tests are skipped." ) @@ -107,6 +111,36 @@ def test_execute_function_kwargs(self): self.assertTrue(future_file_obj.done()) self.assertEqual(future_file_obj.result(), 3) + def test_execute_function_error(self): + cache_directory = os.path.abspath("cache") + os.makedirs(cache_directory, exist_ok=True) + task_key, data_dict = serialize_funct_h5( + fn=get_error, + fn_args=[], + fn_kwargs={"a": 1}, + ) + file_name = os.path.join(cache_directory, task_key, "cache.h5in") + os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True) + dump(file_name=file_name, data_dict=data_dict) + backend_execute_task_in_file(file_name=file_name) + future_obj = Future() + _check_task_output( + task_key=task_key, future_obj=future_obj, cache_directory=cache_directory + ) + self.assertTrue(future_obj.done()) + with self.assertRaises(ValueError): + future_obj.result() + self.assertTrue( + get_runtime(file_name=os.path.join(cache_directory, task_key, "cache.h5out")) + > 0.0 + ) + future_file_obj = FutureItem( + file_name=os.path.join(cache_directory, task_key, "cache.h5out") + ) + self.assertTrue(future_file_obj.done()) + with self.assertRaises(ValueError): + future_file_obj.result() + def tearDown(self): if os.path.exists("cache"): shutil.rmtree("cache") diff --git a/tests/test_cache_fileexecutor_serial.py b/tests/test_cache_fileexecutor_serial.py index 2a923965..c6ac1b1a 100644 --- a/tests/test_cache_fileexecutor_serial.py +++ b/tests/test_cache_fileexecutor_serial.py @@ -27,6 +27,10 @@ def list_files_in_working_directory(): return os.listdir(os.getcwd()) +def get_error(a): + raise ValueError(a) + + @unittest.skipIf( skip_h5py_test, "h5py is not installed, so the h5py tests are skipped." ) @@ -68,6 +72,15 @@ def test_executor_working_directory(self): fs1 = exe.submit(list_files_in_working_directory) self.assertEqual(fs1.result(), os.listdir(cwd)) + def test_executor_error(self): + cwd = os.path.join(os.path.dirname(__file__), "executables") + with FileExecutor( + resource_dict={"cwd": cwd}, execute_function=execute_in_subprocess + ) as exe: + fs1 = exe.submit(get_error, a=1) + with self.assertRaises(ValueError): + fs1.result() + def test_executor_function(self): fs1 = Future() q = Queue() diff --git a/tests/test_singlenodeexecutor_cache.py b/tests/test_singlenodeexecutor_cache.py index cc5752da..71bb49b8 100644 --- a/tests/test_singlenodeexecutor_cache.py +++ b/tests/test_singlenodeexecutor_cache.py @@ -3,6 +3,7 @@ import unittest from executorlib import SingleNodeExecutor +from executorlib.standalone.serialize import cloudpickle_register try: from executorlib.standalone.hdf import get_cache_data @@ -12,6 +13,10 @@ skip_h5py_test = True +def get_error(a): + raise ValueError(a) + + @unittest.skipIf( skip_h5py_test, "h5py is not installed, so the h5io tests are skipped." ) @@ -28,6 +33,16 @@ def test_cache_data(self): sum([sum(c["input_args"][0]) for c in cache_lst]), sum(result_lst) ) + def test_cache_error(self): + cache_directory = "./cache_error" + with SingleNodeExecutor(cache_directory=cache_directory) as exe: + cloudpickle_register(ind=1) + f = exe.submit(get_error, a=1) + with self.assertRaises(ValueError): + print(f.result()) + def tearDown(self): if os.path.exists("cache"): shutil.rmtree("cache") + if os.path.exists("cache_error"): + shutil.rmtree("cache_error") diff --git a/tests/test_standalone_hdf.py b/tests/test_standalone_hdf.py index 56cae425..95d1aa6e 100644 --- a/tests/test_standalone_hdf.py +++ b/tests/test_standalone_hdf.py @@ -39,8 +39,9 @@ def test_hdf_mixed(self): self.assertTrue("fn" in data_dict.keys()) self.assertEqual(data_dict["args"], [a]) self.assertEqual(data_dict["kwargs"], {"b": b}) - flag, output = get_output(file_name=file_name) + flag, no_error, output = get_output(file_name=file_name) self.assertTrue(get_runtime(file_name=file_name) == 0.0) + self.assertFalse(no_error) self.assertFalse(flag) self.assertIsNone(output) @@ -55,9 +56,10 @@ def test_hdf_args(self): self.assertTrue("fn" in data_dict.keys()) self.assertEqual(data_dict["args"], [a, b]) self.assertEqual(data_dict["kwargs"], {}) - flag, output = get_output(file_name=file_name) + flag, no_error, output = get_output(file_name=file_name) self.assertTrue(get_runtime(file_name=file_name) == 0.0) self.assertFalse(flag) + self.assertFalse(no_error) self.assertIsNone(output) def test_hdf_kwargs(self): @@ -80,9 +82,10 @@ def test_hdf_kwargs(self): self.assertEqual(data_dict["args"], ()) self.assertEqual(data_dict["kwargs"], {"a": a, "b": b}) self.assertEqual(get_queue_id(file_name=file_name), 123) - flag, output = get_output(file_name=file_name) + flag, no_error, output = get_output(file_name=file_name) self.assertTrue(get_runtime(file_name=file_name) == 0.0) self.assertFalse(flag) + self.assertFalse(no_error) self.assertIsNone(output) def test_hdf_queue_id(self): @@ -95,11 +98,27 @@ def test_hdf_queue_id(self): data_dict={"queue_id": queue_id}, ) self.assertEqual(get_queue_id(file_name=file_name), 123) - flag, output = get_output(file_name=file_name) + flag, no_error, output = get_output(file_name=file_name) self.assertTrue(get_runtime(file_name=file_name) == 0.0) self.assertFalse(flag) + self.assertFalse(no_error) self.assertIsNone(output) + def test_hdf_error(self): + cache_directory = os.path.abspath("cache") + os.makedirs(cache_directory, exist_ok=True) + file_name = os.path.join(cache_directory, "test_error.h5") + error = ValueError() + dump( + file_name=file_name, + data_dict={"error": error}, + ) + flag, no_error, output = get_output(file_name=file_name) + self.assertTrue(get_runtime(file_name=file_name) == 0.0) + self.assertTrue(flag) + self.assertFalse(no_error) + self.assertTrue(isinstance(output, error.__class__)) + def tearDown(self): if os.path.exists("cache"): shutil.rmtree("cache")