From 1a462d5f7731c098e0e5e2f06f442984d713fa2d Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 31 Aug 2025 08:39:32 +0200 Subject: [PATCH 1/6] Implement execute_single_task() --- .../interactive/blockallocation.py | 6 +- .../task_scheduler/interactive/onetoone.py | 4 +- .../task_scheduler/interactive/shared.py | 117 ++++++++++++++++-- tests/test_fluxpythonspawner.py | 6 +- tests/test_mpiexecspawner.py | 16 +-- .../test_singlenodeexecutor_shell_executor.py | 8 +- ...st_singlenodeexecutor_shell_interactive.py | 4 +- 7 files changed, 131 insertions(+), 30 deletions(-) diff --git a/executorlib/task_scheduler/interactive/blockallocation.py b/executorlib/task_scheduler/interactive/blockallocation.py index 96cec2c1..ff4c7375 100644 --- a/executorlib/task_scheduler/interactive/blockallocation.py +++ b/executorlib/task_scheduler/interactive/blockallocation.py @@ -10,7 +10,7 @@ from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner from executorlib.standalone.queue import cancel_items_in_queue from executorlib.task_scheduler.base import TaskSchedulerBase -from executorlib.task_scheduler.interactive.shared import execute_tasks +from executorlib.task_scheduler.interactive.shared import execute_multiple_tasks class BlockAllocationTaskScheduler(TaskSchedulerBase): @@ -64,7 +64,7 @@ def __init__( self._set_process( process=[ Thread( - target=execute_tasks, + target=execute_multiple_tasks, kwargs=executor_kwargs | {"worker_id": worker_id}, ) for worker_id in range(self._max_workers) @@ -90,7 +90,7 @@ def max_workers(self, max_workers: int): elif self._max_workers < max_workers: new_process_lst = [ Thread( - target=execute_tasks, + target=execute_multiple_tasks, kwargs=self._process_kwargs, ) for _ in range(max_workers - self._max_workers) diff --git a/executorlib/task_scheduler/interactive/onetoone.py b/executorlib/task_scheduler/interactive/onetoone.py index d28f014b..19ea2ca5 100644 --- a/executorlib/task_scheduler/interactive/onetoone.py +++ b/executorlib/task_scheduler/interactive/onetoone.py @@ -4,7 +4,7 @@ from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner from executorlib.task_scheduler.base import TaskSchedulerBase -from executorlib.task_scheduler.interactive.shared import execute_tasks +from executorlib.task_scheduler.interactive.shared import execute_multiple_tasks class OneProcessTaskScheduler(TaskSchedulerBase): @@ -215,7 +215,7 @@ def _wrap_execute_task_in_separate_process( } ) process = Thread( - target=execute_tasks, + target=execute_multiple_tasks, kwargs=task_kwargs, ) process.start() diff --git a/executorlib/task_scheduler/interactive/shared.py b/executorlib/task_scheduler/interactive/shared.py index 02162308..5e073369 100644 --- a/executorlib/task_scheduler/interactive/shared.py +++ b/executorlib/task_scheduler/interactive/shared.py @@ -13,7 +13,7 @@ from executorlib.standalone.serialize import serialize_funct -def execute_tasks( +def execute_multiple_tasks( future_queue: queue.Queue, cores: int = 1, spawner: type[BaseSpawner] = MpiExecSpawner, @@ -90,8 +90,82 @@ def execute_tasks( ) +def execute_single_task( + task_dict: dict, + cores: int = 1, + spawner: type[BaseSpawner] = MpiExecSpawner, + hostname_localhost: Optional[bool] = None, + init_function: Optional[Callable] = None, + cache_directory: Optional[str] = None, + cache_key: Optional[str] = None, + log_obj_size: bool = False, + error_log_file: Optional[str] = None, + worker_id: Optional[int] = None, + **kwargs, +) -> None: + """ + Execute a single tasks in parallel using the message passing interface (MPI). + + Args: + future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process + cores (int): defines the total number of MPI ranks to use + spawner (BaseSpawner): Spawner to start process on selected compute resources + hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the + context of an HPC cluster this essential to be able to communicate to an + Executor running on a different compute node within the same allocation. And + in principle any computer should be able to resolve that their own hostname + points to the same address as localhost. Still MacOS >= 12 seems to disable + this look up for security reasons. So on MacOS it is required to set this + option to true + init_function (Callable): optional function to preset arguments for functions which are submitted later + cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache". + cache_key (str, optional): By default the cache_key is generated based on the function hash, this can be + overwritten by setting the cache_key. + queue_join_on_shutdown (bool): Join communication queue when thread is closed. Defaults to True. + log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions + submitted to the Executor. + worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource + distribution. + """ + interface = interface_bootup( + command_lst=get_interactive_execute_command( + cores=cores, + ), + connections=spawner(cores=cores, **kwargs), + hostname_localhost=hostname_localhost, + log_obj_size=log_obj_size, + worker_id=worker_id, + ) + if init_function is not None: + interface.send_dict( + input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}} + ) + if error_log_file is not None: + task_dict["error_log_file"] = error_log_file + if cache_directory is None: + _execute_task_without_cache( + interface=interface, + task_dict=task_dict, + task_done_callable=_task_done, + task_done_callable_kwargs={"future_queue": future_queue}, + ) + else: + _execute_task_with_cache( + interface=interface, + task_dict=task_dict, + cache_directory=cache_directory, + cache_key=cache_key, + task_done_callable=_task_done, + task_done_callable_kwargs={"future_queue": future_queue}, + ) + + def _execute_task_without_cache( - interface: SocketInterface, task_dict: dict, future_queue: queue.Queue + interface: SocketInterface, + task_dict: dict, + task_done_callable: Optional[Callable] = None, + task_done_callable_kwargs: Optional[dict] = None, ): """ Execute the task in the task_dict by communicating it via the interface. @@ -108,18 +182,25 @@ def _execute_task_without_cache( f.set_result(interface.send_and_receive_dict(input_dict=task_dict)) except Exception as thread_exception: interface.shutdown(wait=True) - _task_done(future_queue=future_queue) + _evaluate_call_back( + task_done_callable=task_done_callable, + task_done_callable_kwargs=task_done_callable_kwargs, + ) f.set_exception(exception=thread_exception) else: - _task_done(future_queue=future_queue) + _evaluate_call_back( + task_done_callable=task_done_callable, + task_done_callable_kwargs=task_done_callable_kwargs, + ) def _execute_task_with_cache( interface: SocketInterface, task_dict: dict, - future_queue: queue.Queue, cache_directory: str, cache_key: Optional[str] = None, + task_done_callable: Optional[Callable] = None, + task_done_callable_kwargs: Optional[dict] = None, ): """ Execute the task in the task_dict by communicating it via the interface using the cache in the cache directory. @@ -155,18 +236,38 @@ def _execute_task_with_cache( f.set_result(result) except Exception as thread_exception: interface.shutdown(wait=True) - _task_done(future_queue=future_queue) + _evaluate_call_back( + task_done_callable=task_done_callable, + task_done_callable_kwargs=task_done_callable_kwargs, + ) f.set_exception(exception=thread_exception) raise thread_exception else: - _task_done(future_queue=future_queue) + _evaluate_call_back( + task_done_callable=task_done_callable, + task_done_callable_kwargs=task_done_callable_kwargs, + ) else: _, _, result = get_output(file_name=file_name) future = task_dict["future"] future.set_result(result) - _task_done(future_queue=future_queue) + _evaluate_call_back( + task_done_callable=task_done_callable, + task_done_callable_kwargs=task_done_callable_kwargs, + ) def _task_done(future_queue: queue.Queue): with contextlib.suppress(ValueError): future_queue.task_done() + + +def _evaluate_call_back( + task_done_callable: Optional[Callable] = None, + task_done_callable_kwargs: Optional[dict] = None, +): + if task_done_callable is not None: + if task_done_callable_kwargs is not None: + task_done_callable(**task_done_callable_kwargs) + else: + task_done_callable() \ No newline at end of file diff --git a/tests/test_fluxpythonspawner.py b/tests/test_fluxpythonspawner.py index 01f1d160..6235f0a9 100644 --- a/tests/test_fluxpythonspawner.py +++ b/tests/test_fluxpythonspawner.py @@ -5,7 +5,7 @@ import numpy as np -from executorlib.task_scheduler.interactive.shared import execute_tasks +from executorlib.task_scheduler.interactive.shared import execute_multiple_tasks from executorlib.task_scheduler.interactive.blockallocation import BlockAllocationTaskScheduler from executorlib.standalone.serialize import cloudpickle_register @@ -112,7 +112,7 @@ def test_execute_task(self): q.put({"fn": calc, "args": (), "kwargs": {"i": 2}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_tasks( + execute_multiple_tasks( future_queue=q, cores=1, flux_executor=self.flux_executor, @@ -127,7 +127,7 @@ def test_execute_task_threads(self): q.put({"fn": calc, "args": (), "kwargs": {"i": 2}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_tasks( + execute_multiple_tasks( future_queue=q, cores=1, threads_per_core=1, diff --git a/tests/test_mpiexecspawner.py b/tests/test_mpiexecspawner.py index 1811cbae..df22679b 100644 --- a/tests/test_mpiexecspawner.py +++ b/tests/test_mpiexecspawner.py @@ -10,7 +10,7 @@ from executorlib.task_scheduler.base import TaskSchedulerBase from executorlib.standalone.interactive.spawner import MpiExecSpawner -from executorlib.task_scheduler.interactive.shared import execute_tasks +from executorlib.task_scheduler.interactive.shared import execute_multiple_tasks from executorlib.task_scheduler.interactive.blockallocation import BlockAllocationTaskScheduler from executorlib.task_scheduler.interactive.onetoone import OneProcessTaskScheduler from executorlib.standalone.interactive.backend import call_funct @@ -261,7 +261,7 @@ def test_execute_task(self): q.put({"fn": get_global, "args": (), "kwargs": {}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_tasks( + execute_multiple_tasks( future_queue=q, cores=1, openmpi_oversubscribe=False, @@ -444,7 +444,7 @@ def test_execute_task_failed_no_argument(self): q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) with self.assertRaises(TypeError): - execute_tasks( + execute_multiple_tasks( future_queue=q, cores=1, openmpi_oversubscribe=False, @@ -460,7 +460,7 @@ def test_execute_task_failed_wrong_argument(self): q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) with self.assertRaises(TypeError): - execute_tasks( + execute_multiple_tasks( future_queue=q, cores=1, openmpi_oversubscribe=False, @@ -475,7 +475,7 @@ def test_execute_task(self): q.put({"fn": calc_array, "args": (), "kwargs": {"i": 2}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_tasks( + execute_multiple_tasks( future_queue=q, cores=1, openmpi_oversubscribe=False, @@ -493,7 +493,7 @@ def test_execute_task_parallel(self): q.put({"fn": calc_array, "args": (), "kwargs": {"i": 2}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_tasks( + execute_multiple_tasks( future_queue=q, cores=2, openmpi_oversubscribe=False, @@ -516,7 +516,7 @@ def test_execute_task_cache(self): q.put({"fn": calc, "args": (), "kwargs": {"i": 1}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_tasks( + execute_multiple_tasks( future_queue=q, cores=1, openmpi_oversubscribe=False, @@ -535,7 +535,7 @@ def test_execute_task_cache_failed_no_argument(self): q.put({"fn": calc_array, "args": (), "kwargs": {}, "future": f}) cloudpickle_register(ind=1) with self.assertRaises(TypeError): - execute_tasks( + execute_multiple_tasks( future_queue=q, cores=1, openmpi_oversubscribe=False, diff --git a/tests/test_singlenodeexecutor_shell_executor.py b/tests/test_singlenodeexecutor_shell_executor.py index df97ecd2..9dae5e99 100644 --- a/tests/test_singlenodeexecutor_shell_executor.py +++ b/tests/test_singlenodeexecutor_shell_executor.py @@ -5,7 +5,7 @@ from executorlib import SingleNodeExecutor from executorlib.standalone.serialize import cloudpickle_register -from executorlib.task_scheduler.interactive.shared import execute_tasks +from executorlib.task_scheduler.interactive.shared import execute_multiple_tasks from executorlib.standalone.interactive.spawner import MpiExecSpawner @@ -32,7 +32,7 @@ def test_execute_single_task(self): test_queue.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) self.assertFalse(f.done()) - execute_tasks( + execute_multiple_tasks( future_queue=test_queue, cores=1, openmpi_oversubscribe=False, @@ -58,7 +58,7 @@ def test_wrong_error(self): ) cloudpickle_register(ind=1) with self.assertRaises(TypeError): - execute_tasks( + execute_multiple_tasks( future_queue=test_queue, cores=1, openmpi_oversubscribe=False, @@ -85,7 +85,7 @@ def test_broken_executable(self): ) cloudpickle_register(ind=1) with self.assertRaises(FileNotFoundError): - execute_tasks( + execute_multiple_tasks( future_queue=test_queue, cores=1, openmpi_oversubscribe=False, diff --git a/tests/test_singlenodeexecutor_shell_interactive.py b/tests/test_singlenodeexecutor_shell_interactive.py index 0adc54bf..ed1f4f68 100644 --- a/tests/test_singlenodeexecutor_shell_interactive.py +++ b/tests/test_singlenodeexecutor_shell_interactive.py @@ -6,7 +6,7 @@ from executorlib import SingleNodeExecutor from executorlib.standalone.serialize import cloudpickle_register -from executorlib.task_scheduler.interactive.shared import execute_tasks +from executorlib.task_scheduler.interactive.shared import execute_multiple_tasks from executorlib.standalone.interactive.spawner import MpiExecSpawner @@ -88,7 +88,7 @@ def test_execute_single_task(self): cloudpickle_register(ind=1) self.assertFalse(future_lines.done()) self.assertFalse(future_pattern.done()) - execute_tasks( + execute_multiple_tasks( future_queue=test_queue, cores=1, openmpi_oversubscribe=False, From 9b84cce3dcfb38e69883956e39f6f3f97fea9c24 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 31 Aug 2025 11:32:50 +0200 Subject: [PATCH 2/6] no more queue --- executorlib/task_scheduler/interactive/onetoone.py | 13 +++---------- executorlib/task_scheduler/interactive/shared.py | 5 +---- 2 files changed, 4 insertions(+), 14 deletions(-) diff --git a/executorlib/task_scheduler/interactive/onetoone.py b/executorlib/task_scheduler/interactive/onetoone.py index 19ea2ca5..e075b9de 100644 --- a/executorlib/task_scheduler/interactive/onetoone.py +++ b/executorlib/task_scheduler/interactive/onetoone.py @@ -4,7 +4,7 @@ from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner from executorlib.task_scheduler.base import TaskSchedulerBase -from executorlib.task_scheduler.interactive.shared import execute_multiple_tasks +from executorlib.task_scheduler.interactive.shared import execute_single_task class OneProcessTaskScheduler(TaskSchedulerBase): @@ -94,7 +94,6 @@ def _execute_task_in_separate_process( """ active_task_dict: dict = {} process_lst: list = [] - qtask_lst: list = [] if "cores" not in kwargs: kwargs["cores"] = 1 while True: @@ -106,10 +105,8 @@ def _execute_task_in_separate_process( future_queue.join() break elif "fn" in task_dict and "future" in task_dict: - qtask: queue.Queue = queue.Queue() process, active_task_dict = _wrap_execute_task_in_separate_process( task_dict=task_dict, - qtask=qtask, active_task_dict=active_task_dict, spawner=spawner, executor_kwargs=kwargs, @@ -117,7 +114,6 @@ def _execute_task_in_separate_process( max_workers=max_workers, hostname_localhost=hostname_localhost, ) - qtask_lst.append(qtask) process_lst.append(process) future_queue.task_done() @@ -158,7 +154,6 @@ def _wait_for_free_slots( def _wrap_execute_task_in_separate_process( task_dict: dict, active_task_dict: dict, - qtask: queue.Queue, spawner: type[BaseSpawner], executor_kwargs: dict, max_cores: Optional[int] = None, @@ -190,8 +185,6 @@ def _wrap_execute_task_in_separate_process( dictionary containing the future objects and the number of cores they require """ resource_dict = task_dict.pop("resource_dict").copy() - qtask.put(task_dict) - qtask.put({"shutdown": True, "wait": True}) if "cores" not in resource_dict or ( resource_dict["cores"] == 1 and executor_kwargs["cores"] >= 1 ): @@ -208,14 +201,14 @@ def _wrap_execute_task_in_separate_process( task_kwargs.update(resource_dict) task_kwargs.update( { - "future_queue": qtask, + "task_dict": task_dict, "spawner": spawner, "hostname_localhost": hostname_localhost, "init_function": None, } ) process = Thread( - target=execute_multiple_tasks, + target=execute_single_task, kwargs=task_kwargs, ) process.start() diff --git a/executorlib/task_scheduler/interactive/shared.py b/executorlib/task_scheduler/interactive/shared.py index 5ffe3bd5..9f194907 100644 --- a/executorlib/task_scheduler/interactive/shared.py +++ b/executorlib/task_scheduler/interactive/shared.py @@ -141,10 +141,7 @@ def execute_single_task( if error_log_file is not None: task_dict["error_log_file"] = error_log_file if cache_directory is None: - _execute_task_without_cache( - interface=interface, - task_dict=task_dict, - ) + _execute_task_without_cache(interface=interface, task_dict=task_dict) else: _execute_task_with_cache( interface=interface, From 2d175aab7fb4f70d32b211a0f186473ff5d0b5fa Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 31 Aug 2025 09:33:08 +0000 Subject: [PATCH 3/6] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/task_scheduler/interactive/shared.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/task_scheduler/interactive/shared.py b/executorlib/task_scheduler/interactive/shared.py index 9f194907..83a6c2ae 100644 --- a/executorlib/task_scheduler/interactive/shared.py +++ b/executorlib/task_scheduler/interactive/shared.py @@ -230,4 +230,4 @@ def _evaluate_call_back( if task_done_callable_kwargs is not None: task_done_callable(**task_done_callable_kwargs) else: - task_done_callable() \ No newline at end of file + task_done_callable() From f7722ca6b64694c2c3fc9d1438d2cdf50ec9fdea Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 31 Aug 2025 11:35:13 +0200 Subject: [PATCH 4/6] fixes --- executorlib/task_scheduler/interactive/shared.py | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/executorlib/task_scheduler/interactive/shared.py b/executorlib/task_scheduler/interactive/shared.py index 9f194907..b48253a6 100644 --- a/executorlib/task_scheduler/interactive/shared.py +++ b/executorlib/task_scheduler/interactive/shared.py @@ -85,6 +85,7 @@ def execute_multiple_tasks( cache_directory=cache_directory, cache_key=cache_key, ) + _task_done(future_queue=future_queue) def execute_single_task( @@ -174,8 +175,6 @@ def _execute_task_with_cache( task_dict: dict, cache_directory: str, cache_key: Optional[str] = None, - task_done_callable: Optional[Callable] = None, - task_done_callable_kwargs: Optional[dict] = None, ): """ Execute the task in the task_dict by communicating it via the interface using the cache in the cache directory. @@ -220,14 +219,3 @@ def _execute_task_with_cache( def _task_done(future_queue: queue.Queue): with contextlib.suppress(ValueError): future_queue.task_done() - - -def _evaluate_call_back( - task_done_callable: Optional[Callable] = None, - task_done_callable_kwargs: Optional[dict] = None, -): - if task_done_callable is not None: - if task_done_callable_kwargs is not None: - task_done_callable(**task_done_callable_kwargs) - else: - task_done_callable() \ No newline at end of file From b67fb127d9e79afb354ee048e8d0a4328f750533 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 31 Aug 2025 11:42:21 +0200 Subject: [PATCH 5/6] fix tests --- tests/test_mpiexecspawner.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_mpiexecspawner.py b/tests/test_mpiexecspawner.py index 1d69c681..9ebf02de 100644 --- a/tests/test_mpiexecspawner.py +++ b/tests/test_mpiexecspawner.py @@ -443,7 +443,7 @@ def test_execute_task_failed_no_argument(self): q.put({"fn": calc_array, "args": (), "kwargs": {}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_tasks( + execute_multiple_tasks( future_queue=q, cores=1, openmpi_oversubscribe=False, @@ -459,7 +459,7 @@ def test_execute_task_failed_wrong_argument(self): q.put({"fn": calc_array, "args": (), "kwargs": {"j": 4}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_tasks( + execute_multiple_tasks( future_queue=q, cores=1, openmpi_oversubscribe=False, @@ -535,7 +535,7 @@ def test_execute_task_cache_failed_no_argument(self): q.put({"fn": calc_array, "args": (), "kwargs": {}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_tasks( + execute_multiple_tasks( future_queue=q, cores=1, openmpi_oversubscribe=False, From b05b2b5b7364d9c4709c05b7a5891c8ab8e81430 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 31 Aug 2025 11:54:44 +0200 Subject: [PATCH 6/6] refactor --- .../task_scheduler/interactive/onetoone.py | 1 - .../task_scheduler/interactive/shared.py | 105 +++++++++++------- 2 files changed, 62 insertions(+), 44 deletions(-) diff --git a/executorlib/task_scheduler/interactive/onetoone.py b/executorlib/task_scheduler/interactive/onetoone.py index e075b9de..e97fa4bc 100644 --- a/executorlib/task_scheduler/interactive/onetoone.py +++ b/executorlib/task_scheduler/interactive/onetoone.py @@ -204,7 +204,6 @@ def _wrap_execute_task_in_separate_process( "task_dict": task_dict, "spawner": spawner, "hostname_localhost": hostname_localhost, - "init_function": None, } ) process = Thread( diff --git a/executorlib/task_scheduler/interactive/shared.py b/executorlib/task_scheduler/interactive/shared.py index b48253a6..0b46324d 100644 --- a/executorlib/task_scheduler/interactive/shared.py +++ b/executorlib/task_scheduler/interactive/shared.py @@ -74,17 +74,13 @@ def execute_multiple_tasks( future_queue.join() break elif "fn" in task_dict and "future" in task_dict: - if error_log_file is not None: - task_dict["error_log_file"] = error_log_file - if cache_directory is None: - _execute_task_without_cache(interface=interface, task_dict=task_dict) - else: - _execute_task_with_cache( - interface=interface, - task_dict=task_dict, - cache_directory=cache_directory, - cache_key=cache_key, - ) + _execute_task_dict( + task_dict=task_dict, + interface=interface, + cache_directory=cache_directory, + cache_key=cache_key, + error_log_file=error_log_file, + ) _task_done(future_queue=future_queue) @@ -93,7 +89,6 @@ def execute_single_task( cores: int = 1, spawner: type[BaseSpawner] = MpiExecSpawner, hostname_localhost: Optional[bool] = None, - init_function: Optional[Callable] = None, cache_directory: Optional[str] = None, cache_key: Optional[str] = None, log_obj_size: bool = False, @@ -105,40 +100,64 @@ def execute_single_task( Execute a single tasks in parallel using the message passing interface (MPI). Args: - future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process - cores (int): defines the total number of MPI ranks to use - spawner (BaseSpawner): Spawner to start process on selected compute resources - hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the - context of an HPC cluster this essential to be able to communicate to an - Executor running on a different compute node within the same allocation. And - in principle any computer should be able to resolve that their own hostname - points to the same address as localhost. Still MacOS >= 12 seems to disable - this look up for security reasons. So on MacOS it is required to set this - option to true - init_function (Callable): optional function to preset arguments for functions which are submitted later - cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache". - cache_key (str, optional): By default the cache_key is generated based on the function hash, this can be - overwritten by setting the cache_key. - queue_join_on_shutdown (bool): Join communication queue when thread is closed. Defaults to True. - log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. - error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions - submitted to the Executor. - worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource - distribution. + future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process + cores (int): defines the total number of MPI ranks to use + spawner (BaseSpawner): Spawner to start process on selected compute resources + hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the + context of an HPC cluster this essential to be able to communicate to an + Executor running on a different compute node within the same allocation. And + in principle any computer should be able to resolve that their own hostname + points to the same address as localhost. Still MacOS >= 12 seems to disable + this look up for security reasons. So on MacOS it is required to set this + option to true + init_function (Callable): optional function to preset arguments for functions which are submitted later + cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache". + cache_key (str, optional): By default the cache_key is generated based on the function hash, this can be + overwritten by setting the cache_key. + queue_join_on_shutdown (bool): Join communication queue when thread is closed. Defaults to True. + log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions + submitted to the Executor. + worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource + distribution. """ - interface = interface_bootup( - command_lst=get_interactive_execute_command( - cores=cores, + _execute_task_dict( + task_dict=task_dict, + interface=interface_bootup( + command_lst=get_interactive_execute_command( + cores=cores, + ), + connections=spawner(cores=cores, **kwargs), + hostname_localhost=hostname_localhost, + log_obj_size=log_obj_size, + worker_id=worker_id, ), - connections=spawner(cores=cores, **kwargs), - hostname_localhost=hostname_localhost, - log_obj_size=log_obj_size, - worker_id=worker_id, + cache_directory=cache_directory, + cache_key=cache_key, + error_log_file=error_log_file, ) - if init_function is not None: - interface.send_dict( - input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}} - ) + + +def _execute_task_dict( + task_dict: dict, + interface: SocketInterface, + cache_directory: Optional[str] = None, + cache_key: Optional[str] = None, + error_log_file: Optional[str] = None, +): + """ + Execute the task in the task_dict by communicating it via the interface. + + Args: + task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys + {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}} + interface (SocketInterface): socket interface for zmq communication + cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache". + cache_key (str, optional): By default the cache_key is generated based on the function hash, this can be + overwritten by setting the cache_key. + error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions + submitted to the Executor. + """ if error_log_file is not None: task_dict["error_log_file"] = error_log_file if cache_directory is None: