Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions executorlib/backend/cache_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,24 @@ def main() -> None:
apply_dict = backend_load_file(file_name=file_name)
apply_dict = MPI.COMM_WORLD.bcast(apply_dict, root=0)
output = apply_dict["fn"].__call__(*apply_dict["args"], **apply_dict["kwargs"])
result = MPI.COMM_WORLD.gather(output, root=0) if mpi_size_larger_one else output
if mpi_rank_zero:
backend_write_file(
file_name=file_name,
output=result,
runtime=time.time() - time_start,
try:
result = (
MPI.COMM_WORLD.gather(output, root=0) if mpi_size_larger_one else output
)
except Exception as error:
if mpi_rank_zero:
backend_write_file(
file_name=file_name,
output={"error": error},
runtime=time.time() - time_start,
)
else:
if mpi_rank_zero:
backend_write_file(
file_name=file_name,
output={"result": result},
runtime=time.time() - time_start,
)
MPI.COMM_WORLD.Barrier()


Expand Down
24 changes: 19 additions & 5 deletions executorlib/cache/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,16 @@ def backend_write_file(file_name: str, output: Any, runtime: float) -> None:
"""
file_name_out = os.path.splitext(file_name)[0]
os.rename(file_name, file_name_out + ".h5ready")
dump(
file_name=file_name_out + ".h5ready",
data_dict={"output": output, "runtime": runtime},
)
if "result" in output:
dump(
file_name=file_name_out + ".h5ready",
data_dict={"output": output["result"], "runtime": runtime},
)
else:
dump(
file_name=file_name_out + ".h5ready",
data_dict={"error": output["error"], "runtime": runtime},
)
os.rename(file_name_out + ".h5ready", file_name_out + ".h5out")


Expand All @@ -63,7 +69,15 @@ def backend_execute_task_in_file(file_name: str) -> None:
"""
apply_dict = backend_load_file(file_name=file_name)
time_start = time.time()
result = apply_dict["fn"].__call__(*apply_dict["args"], **apply_dict["kwargs"])
try:
result = {
"result": apply_dict["fn"].__call__(
*apply_dict["args"], **apply_dict["kwargs"]
)
}
except Exception as error:
result = {"error": error}

backend_write_file(
file_name=file_name,
output=result,
Expand Down
12 changes: 8 additions & 4 deletions executorlib/cache/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ def result(self) -> Any:
str: The result of the future item.

"""
exec_flag, result = get_output(file_name=self._file_name)
if exec_flag:
exec_flag, no_error_flag, result = get_output(file_name=self._file_name)
if exec_flag and no_error_flag:
return result
elif exec_flag:
raise result
else:
return self.result()

Expand Down Expand Up @@ -198,9 +200,11 @@ def _check_task_output(
file_name = os.path.join(cache_directory, task_key, "cache.h5out")
if not os.path.exists(file_name):
return future_obj
exec_flag, result = get_output(file_name=file_name)
if exec_flag:
exec_flag, no_error_flag, result = get_output(file_name=file_name)
if exec_flag and no_error_flag:
future_obj.set_result(result)
elif exec_flag:
future_obj.set_exception(result)
return future_obj


Expand Down
2 changes: 1 addition & 1 deletion executorlib/interactive/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def _execute_task_with_cache(
else:
_task_done(future_queue=future_queue)
else:
_, result = get_output(file_name=file_name)
_, _, result = get_output(file_name=file_name)
future = task_dict["future"]
future.set_result(result)
_task_done(future_queue=future_queue)
Expand Down
11 changes: 7 additions & 4 deletions executorlib/standalone/hdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"args": "input_args",
"kwargs": "input_kwargs",
"output": "output",
"error": "error",
"runtime": "runtime",
"queue_id": "queue_id",
}
Expand Down Expand Up @@ -60,21 +61,23 @@ def load(file_name: str) -> dict:
return data_dict


def get_output(file_name: str) -> tuple[bool, Any]:
def get_output(file_name: str) -> tuple[bool, bool, Any]:
"""
Check if output is available in the HDF5 file

Args:
file_name (str): file name of the HDF5 file as absolute path

Returns:
Tuple[bool, object]: boolean flag indicating if output is available and the output object itself
Tuple[bool, bool, object]: boolean flag indicating if output is available and the output object itself
"""
with h5py.File(file_name, "r") as hdf:
if "output" in hdf:
return True, cloudpickle.loads(np.void(hdf["/output"]))
return True, True, cloudpickle.loads(np.void(hdf["/output"]))
elif "error" in hdf:
return True, False, cloudpickle.loads(np.void(hdf["/error"]))
else:
return False, None
return False, False, None


def get_runtime(file_name: str) -> float:
Expand Down
34 changes: 34 additions & 0 deletions tests/test_cache_backend_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ def my_funct(a, b):
return a + b


def get_error(a):
raise ValueError(a)


@unittest.skipIf(
skip_h5io_test, "h5io is not installed, so the h5io tests are skipped."
)
Expand Down Expand Up @@ -107,6 +111,36 @@ def test_execute_function_kwargs(self):
self.assertTrue(future_file_obj.done())
self.assertEqual(future_file_obj.result(), 3)

def test_execute_function_error(self):
cache_directory = os.path.abspath("cache")
os.makedirs(cache_directory, exist_ok=True)
task_key, data_dict = serialize_funct_h5(
fn=get_error,
fn_args=[],
fn_kwargs={"a": 1},
)
file_name = os.path.join(cache_directory, task_key, "cache.h5in")
os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True)
dump(file_name=file_name, data_dict=data_dict)
backend_execute_task_in_file(file_name=file_name)
future_obj = Future()
_check_task_output(
task_key=task_key, future_obj=future_obj, cache_directory=cache_directory
)
self.assertTrue(future_obj.done())
with self.assertRaises(ValueError):
future_obj.result()
self.assertTrue(
get_runtime(file_name=os.path.join(cache_directory, task_key, "cache.h5out"))
> 0.0
)
future_file_obj = FutureItem(
file_name=os.path.join(cache_directory, task_key, "cache.h5out")
)
self.assertTrue(future_file_obj.done())
with self.assertRaises(ValueError):
future_file_obj.result()

def tearDown(self):
if os.path.exists("cache"):
shutil.rmtree("cache")
13 changes: 13 additions & 0 deletions tests/test_cache_fileexecutor_serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ def list_files_in_working_directory():
return os.listdir(os.getcwd())


def get_error(a):
raise ValueError(a)


@unittest.skipIf(
skip_h5py_test, "h5py is not installed, so the h5py tests are skipped."
)
Expand Down Expand Up @@ -68,6 +72,15 @@ def test_executor_working_directory(self):
fs1 = exe.submit(list_files_in_working_directory)
self.assertEqual(fs1.result(), os.listdir(cwd))

def test_executor_error(self):
cwd = os.path.join(os.path.dirname(__file__), "executables")
with FileExecutor(
resource_dict={"cwd": cwd}, execute_function=execute_in_subprocess
) as exe:
fs1 = exe.submit(get_error, a=1)
with self.assertRaises(ValueError):
fs1.result()

def test_executor_function(self):
fs1 = Future()
q = Queue()
Expand Down
15 changes: 15 additions & 0 deletions tests/test_singlenodeexecutor_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import unittest

from executorlib import SingleNodeExecutor
from executorlib.standalone.serialize import cloudpickle_register

try:
from executorlib.standalone.hdf import get_cache_data
Expand All @@ -12,6 +13,10 @@
skip_h5py_test = True


def get_error(a):
raise ValueError(a)


@unittest.skipIf(
skip_h5py_test, "h5py is not installed, so the h5io tests are skipped."
)
Expand All @@ -28,6 +33,16 @@ def test_cache_data(self):
sum([sum(c["input_args"][0]) for c in cache_lst]), sum(result_lst)
)

def test_cache_error(self):
cache_directory = "./cache_error"
with SingleNodeExecutor(cache_directory=cache_directory) as exe:
cloudpickle_register(ind=1)
f = exe.submit(get_error, a=1)
with self.assertRaises(ValueError):
print(f.result())

def tearDown(self):
if os.path.exists("cache"):
shutil.rmtree("cache")
if os.path.exists("cache_error"):
shutil.rmtree("cache_error")
27 changes: 23 additions & 4 deletions tests/test_standalone_hdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ def test_hdf_mixed(self):
self.assertTrue("fn" in data_dict.keys())
self.assertEqual(data_dict["args"], [a])
self.assertEqual(data_dict["kwargs"], {"b": b})
flag, output = get_output(file_name=file_name)
flag, no_error, output = get_output(file_name=file_name)
self.assertTrue(get_runtime(file_name=file_name) == 0.0)
self.assertFalse(no_error)
self.assertFalse(flag)
self.assertIsNone(output)

Expand All @@ -55,9 +56,10 @@ def test_hdf_args(self):
self.assertTrue("fn" in data_dict.keys())
self.assertEqual(data_dict["args"], [a, b])
self.assertEqual(data_dict["kwargs"], {})
flag, output = get_output(file_name=file_name)
flag, no_error, output = get_output(file_name=file_name)
self.assertTrue(get_runtime(file_name=file_name) == 0.0)
self.assertFalse(flag)
self.assertFalse(no_error)
self.assertIsNone(output)

def test_hdf_kwargs(self):
Expand All @@ -80,9 +82,10 @@ def test_hdf_kwargs(self):
self.assertEqual(data_dict["args"], ())
self.assertEqual(data_dict["kwargs"], {"a": a, "b": b})
self.assertEqual(get_queue_id(file_name=file_name), 123)
flag, output = get_output(file_name=file_name)
flag, no_error, output = get_output(file_name=file_name)
self.assertTrue(get_runtime(file_name=file_name) == 0.0)
self.assertFalse(flag)
self.assertFalse(no_error)
self.assertIsNone(output)

def test_hdf_queue_id(self):
Expand All @@ -95,11 +98,27 @@ def test_hdf_queue_id(self):
data_dict={"queue_id": queue_id},
)
self.assertEqual(get_queue_id(file_name=file_name), 123)
flag, output = get_output(file_name=file_name)
flag, no_error, output = get_output(file_name=file_name)
self.assertTrue(get_runtime(file_name=file_name) == 0.0)
self.assertFalse(flag)
self.assertFalse(no_error)
self.assertIsNone(output)

def test_hdf_error(self):
cache_directory = os.path.abspath("cache")
os.makedirs(cache_directory, exist_ok=True)
file_name = os.path.join(cache_directory, "test_error.h5")
error = ValueError()
dump(
file_name=file_name,
data_dict={"error": error},
)
flag, no_error, output = get_output(file_name=file_name)
self.assertTrue(get_runtime(file_name=file_name) == 0.0)
self.assertTrue(flag)
self.assertFalse(no_error)
self.assertTrue(isinstance(output, error.__class__))

def tearDown(self):
if os.path.exists("cache"):
shutil.rmtree("cache")
Loading