diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index d85fb996..c1bfb84f 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -325,7 +325,7 @@ def execute_separate_tasks( option to true """ active_task_dict = {} - process_lst = [] + process_lst, qtask_lst = [], [] while True: task_dict = future_queue.get() if "shutdown" in task_dict.keys() and task_dict["shutdown"]: @@ -335,35 +335,17 @@ def execute_separate_tasks( future_queue.join() break elif "fn" in task_dict.keys() and "future" in task_dict.keys(): - resource_dict = task_dict.pop("resource_dict") qtask = queue.Queue() - qtask.put(task_dict) - qtask.put({"shutdown": True, "wait": True}) - if "cores" not in resource_dict.keys() or ( - resource_dict["cores"] == 1 and kwargs["cores"] >= 1 - ): - resource_dict["cores"] = kwargs["cores"] - active_task_dict = _wait_for_free_slots( + process = _submit_function_to_separate_process( + task_dict=task_dict, + qtask=qtask, active_task_dict=active_task_dict, - cores_requested=resource_dict["cores"], + interface_class=interface_class, + executor_kwargs=kwargs, max_cores=max_cores, + hostname_localhost=hostname_localhost, ) - active_task_dict[task_dict["future"]] = resource_dict["cores"] - task_kwargs = kwargs.copy() - task_kwargs.update(resource_dict) - task_kwargs.update( - { - "future_queue": qtask, - "interface_class": interface_class, - "hostname_localhost": hostname_localhost, - "init_function": None, - } - ) - process = RaisingThread( - target=execute_parallel_tasks, - kwargs=task_kwargs, - ) - process.start() + qtask_lst.append(qtask) process_lst.append(process) future_queue.task_done() @@ -406,7 +388,7 @@ def _wait_for_free_slots(active_task_dict: dict, cores_requested: int, max_cores Args: active_task_dict (dict): Dictionary containing the future objects and the number of cores they require cores_requested (int): Number of cores required for executing the next task - max_cores (int): Maximum number cores which can be used + max_cores (int): defines the number cores which can be used in parallel Returns: dict: Dictionary containing the future objects and the number of cores they require @@ -414,3 +396,65 @@ def _wait_for_free_slots(active_task_dict: dict, cores_requested: int, max_cores while sum(active_task_dict.values()) + cores_requested > max_cores: active_task_dict = {k: v for k, v in active_task_dict.items() if not k.done()} return active_task_dict + + +def _submit_function_to_separate_process( + task_dict: dict, + active_task_dict: dict, + qtask: queue.Queue, + interface_class: BaseInterface, + executor_kwargs: dict, + max_cores: int, + hostname_localhost: bool = False, +): + """ + Submit function to be executed in separate Python process + + Args: + task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys + {"fn": callable, "args": (), "kwargs": {}, "resource_dict": {}} + active_task_dict (dict): Dictionary containing the future objects and the number of cores they require + qtask (queue.Queue): Queue to communicate with the thread linked to the process executing the python function + interface_class (BaseInterface): Interface to start process on selected compute resources + executor_kwargs (dict): keyword parameters used to initialize the Executor + max_cores (int): defines the number cores which can be used in parallel + hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the + context of an HPC cluster this essential to be able to communicate to an + Executor running on a different compute node within the same allocation. And + in principle any computer should be able to resolve that their own hostname + points to the same address as localhost. Still MacOS >= 12 seems to disable + this look up for security reasons. So on MacOS it is required to set this + option to true + + Returns: + RaisingThread: thread for communicating with the python process which is executing the function + """ + resource_dict = task_dict.pop("resource_dict") + qtask.put(task_dict) + qtask.put({"shutdown": True, "wait": True}) + if "cores" not in resource_dict.keys() or ( + resource_dict["cores"] == 1 and executor_kwargs["cores"] >= 1 + ): + resource_dict["cores"] = executor_kwargs["cores"] + active_task_dict = _wait_for_free_slots( + active_task_dict=active_task_dict, + cores_requested=resource_dict["cores"], + max_cores=max_cores, + ) + active_task_dict[task_dict["future"]] = resource_dict["cores"] + task_kwargs = executor_kwargs.copy() + task_kwargs.update(resource_dict) + task_kwargs.update( + { + "future_queue": qtask, + "interface_class": interface_class, + "hostname_localhost": hostname_localhost, + "init_function": None, + } + ) + process = RaisingThread( + target=execute_parallel_tasks, + kwargs=task_kwargs, + ) + process.start() + return process