From 5a6043f37be40a4678feece0cc7be9d990f3b79e Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Mon, 22 Apr 2024 09:26:19 -0500 Subject: [PATCH 1/3] Introduce submit_function_to_separate_process() function --- pympipool/shared/executorbase.py | 95 ++++++++++++++++++++++---------- 1 file changed, 67 insertions(+), 28 deletions(-) diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index d85fb996..02fb999b 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -335,35 +335,13 @@ def execute_separate_tasks( future_queue.join() break elif "fn" in task_dict.keys() and "future" in task_dict.keys(): - resource_dict = task_dict.pop("resource_dict") - qtask = queue.Queue() - qtask.put(task_dict) - qtask.put({"shutdown": True, "wait": True}) - if "cores" not in resource_dict.keys() or ( - resource_dict["cores"] == 1 and kwargs["cores"] >= 1 - ): - resource_dict["cores"] = kwargs["cores"] - active_task_dict = _wait_for_free_slots( - active_task_dict=active_task_dict, - cores_requested=resource_dict["cores"], + process = _submit_function_to_separate_process( + task_dict=task_dict, + interface_class=interface_class, + executor_kwargs=kwargs, max_cores=max_cores, + hostname_localhost=hostname_localhost, ) - active_task_dict[task_dict["future"]] = resource_dict["cores"] - task_kwargs = kwargs.copy() - task_kwargs.update(resource_dict) - task_kwargs.update( - { - "future_queue": qtask, - "interface_class": interface_class, - "hostname_localhost": hostname_localhost, - "init_function": None, - } - ) - process = RaisingThread( - target=execute_parallel_tasks, - kwargs=task_kwargs, - ) - process.start() process_lst.append(process) future_queue.task_done() @@ -406,7 +384,7 @@ def _wait_for_free_slots(active_task_dict: dict, cores_requested: int, max_cores Args: active_task_dict (dict): Dictionary containing the future objects and the number of cores they require cores_requested (int): Number of cores required for executing the next task - max_cores (int): Maximum number cores which can be used + max_cores (int): defines the number cores which can be used in parallel Returns: dict: Dictionary containing the future objects and the number of cores they require @@ -414,3 +392,64 @@ def _wait_for_free_slots(active_task_dict: dict, cores_requested: int, max_cores while sum(active_task_dict.values()) + cores_requested > max_cores: active_task_dict = {k: v for k, v in active_task_dict.items() if not k.done()} return active_task_dict + + +def _submit_function_to_separate_process( + task_dict: dict, + active_task_dict: dict, + interface_class: BaseInterface, + executor_kwargs: dict, + max_cores: int, + hostname_localhost: bool = False, +): + """ + Submit function to be executed in separate Python process + + Args: + task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys + {"fn": callable, "args": (), "kwargs": {}, "resource_dict": {}} + active_task_dict (dict): Dictionary containing the future objects and the number of cores they require + interface_class (BaseInterface): Interface to start process on selected compute resources + executor_kwargs (dict): keyword parameters used to initialize the Executor + max_cores (int): defines the number cores which can be used in parallel + hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the + context of an HPC cluster this essential to be able to communicate to an + Executor running on a different compute node within the same allocation. And + in principle any computer should be able to resolve that their own hostname + points to the same address as localhost. Still MacOS >= 12 seems to disable + this look up for security reasons. So on MacOS it is required to set this + option to true + + Returns: + RaisingThread: thread for communicating with the python process which is executing the function + """ + resource_dict = task_dict.pop("resource_dict") + qtask = queue.Queue() + qtask.put(task_dict) + qtask.put({"shutdown": True, "wait": True}) + if "cores" not in resource_dict.keys() or ( + resource_dict["cores"] == 1 and executor_kwargs["cores"] >= 1 + ): + resource_dict["cores"] = executor_kwargs["cores"] + active_task_dict = _wait_for_free_slots( + active_task_dict=active_task_dict, + cores_requested=resource_dict["cores"], + max_cores=max_cores, + ) + active_task_dict[task_dict["future"]] = resource_dict["cores"] + task_kwargs = executor_kwargs.copy() + task_kwargs.update(resource_dict) + task_kwargs.update( + { + "future_queue": qtask, + "interface_class": interface_class, + "hostname_localhost": hostname_localhost, + "init_function": None, + } + ) + process = RaisingThread( + target=execute_parallel_tasks, + kwargs=task_kwargs, + ) + process.start() + return process From 50cd4e4c362f8d4989d583f53467d2a317100202 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Mon, 22 Apr 2024 09:46:53 -0500 Subject: [PATCH 2/3] bug fix --- pympipool/shared/executorbase.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index 02fb999b..103f3922 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -337,6 +337,7 @@ def execute_separate_tasks( elif "fn" in task_dict.keys() and "future" in task_dict.keys(): process = _submit_function_to_separate_process( task_dict=task_dict, + active_task_dict=active_task_dict, interface_class=interface_class, executor_kwargs=kwargs, max_cores=max_cores, From 79e7607edc9a52e09b6aa202e299343cb3fdc77d Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Mon, 22 Apr 2024 10:32:16 -0500 Subject: [PATCH 3/3] Keep list of queues in memory --- pympipool/shared/executorbase.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index 103f3922..c1bfb84f 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -325,7 +325,7 @@ def execute_separate_tasks( option to true """ active_task_dict = {} - process_lst = [] + process_lst, qtask_lst = [], [] while True: task_dict = future_queue.get() if "shutdown" in task_dict.keys() and task_dict["shutdown"]: @@ -335,14 +335,17 @@ def execute_separate_tasks( future_queue.join() break elif "fn" in task_dict.keys() and "future" in task_dict.keys(): + qtask = queue.Queue() process = _submit_function_to_separate_process( task_dict=task_dict, + qtask=qtask, active_task_dict=active_task_dict, interface_class=interface_class, executor_kwargs=kwargs, max_cores=max_cores, hostname_localhost=hostname_localhost, ) + qtask_lst.append(qtask) process_lst.append(process) future_queue.task_done() @@ -398,6 +401,7 @@ def _wait_for_free_slots(active_task_dict: dict, cores_requested: int, max_cores def _submit_function_to_separate_process( task_dict: dict, active_task_dict: dict, + qtask: queue.Queue, interface_class: BaseInterface, executor_kwargs: dict, max_cores: int, @@ -410,6 +414,7 @@ def _submit_function_to_separate_process( task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys {"fn": callable, "args": (), "kwargs": {}, "resource_dict": {}} active_task_dict (dict): Dictionary containing the future objects and the number of cores they require + qtask (queue.Queue): Queue to communicate with the thread linked to the process executing the python function interface_class (BaseInterface): Interface to start process on selected compute resources executor_kwargs (dict): keyword parameters used to initialize the Executor max_cores (int): defines the number cores which can be used in parallel @@ -425,7 +430,6 @@ def _submit_function_to_separate_process( RaisingThread: thread for communicating with the python process which is executing the function """ resource_dict = task_dict.pop("resource_dict") - qtask = queue.Queue() qtask.put(task_dict) qtask.put({"shutdown": True, "wait": True}) if "cores" not in resource_dict.keys() or (