Skip to content

Commit

Permalink
Try submit function again
Browse files Browse the repository at this point in the history
  • Loading branch information
jan-janssen committed Apr 22, 2024
1 parent 44061b3 commit 5316530
Showing 1 changed file with 69 additions and 26 deletions.
95 changes: 69 additions & 26 deletions pympipool/shared/executorbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ def execute_separate_tasks(
option to true
"""
active_task_dict = {}
process_lst = []
process_lst, qtask_lst = [], []
while True:
task_dict = future_queue.get()
if "shutdown" in task_dict.keys() and task_dict["shutdown"]:
Expand All @@ -339,35 +339,17 @@ def execute_separate_tasks(
future_queue.join()
break
elif "fn" in task_dict.keys() and "future" in task_dict.keys():
resource_dict = task_dict.pop("resource_dict")
qtask = queue.Queue()
qtask.put(task_dict)
qtask.put({"shutdown": True, "wait": True})
if "cores" not in resource_dict.keys() or (
resource_dict["cores"] == 1 and kwargs["cores"] >= 1
):
resource_dict["cores"] = kwargs["cores"]
active_task_dict = _wait_for_free_slots(
process, active_task_dict = _submit_function_to_separate_process(
task_dict=task_dict,
qtask=qtask,
active_task_dict=active_task_dict,
cores_requested=resource_dict["cores"],
interface_class=interface_class,
executor_kwargs=kwargs,
max_cores=max_cores,
hostname_localhost=hostname_localhost,
)
active_task_dict[task_dict["future"]] = resource_dict["cores"]
task_kwargs = kwargs.copy()
task_kwargs.update(resource_dict)
task_kwargs.update(
{
"future_queue": qtask,
"interface_class": interface_class,
"hostname_localhost": hostname_localhost,
"init_function": None,
}
)
process = RaisingThread(
target=execute_parallel_tasks,
kwargs=task_kwargs,
)
process.start()
qtask_lst.append(qtask)
process_lst.append(process)
future_queue.task_done()

Expand Down Expand Up @@ -538,3 +520,64 @@ def _get_future_objects_from_input(task_dict: dict):
future_lst
)
return future_lst, boolean_flag


def _submit_function_to_separate_process(
task_dict: dict,
active_task_dict: dict,
qtask: queue.Queue,
interface_class: BaseInterface,
executor_kwargs: dict,
max_cores: int,
hostname_localhost: bool = False,
):
"""
Submit function to be executed in separate Python process
Args:
task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
{"fn": callable, "args": (), "kwargs": {}, "resource_dict": {}}
active_task_dict (dict): Dictionary containing the future objects and the number of cores they require
qtask (queue.Queue): Queue to communicate with the thread linked to the process executing the python function
interface_class (BaseInterface): Interface to start process on selected compute resources
executor_kwargs (dict): keyword parameters used to initialize the Executor
max_cores (int): defines the number cores which can be used in parallel
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
in principle any computer should be able to resolve that their own hostname
points to the same address as localhost. Still MacOS >= 12 seems to disable
this look up for security reasons. So on MacOS it is required to set this
option to true
Returns:
RaisingThread, dict: thread for communicating with the python process which is executing the function and
dictionary containing the future objects and the number of cores they require
"""
resource_dict = task_dict.pop("resource_dict")
qtask.put(task_dict)
qtask.put({"shutdown": True, "wait": True})
if "cores" not in resource_dict.keys() or (
resource_dict["cores"] == 1 and executor_kwargs["cores"] >= 1
):
resource_dict["cores"] = executor_kwargs["cores"]
active_task_dict = _wait_for_free_slots(
active_task_dict=active_task_dict,
cores_requested=resource_dict["cores"],
max_cores=max_cores,
)
active_task_dict[task_dict["future"]] = resource_dict["cores"]
task_kwargs = executor_kwargs.copy()
task_kwargs.update(resource_dict)
task_kwargs.update(
{
"future_queue": qtask,
"interface_class": interface_class,
"hostname_localhost": hostname_localhost,
"init_function": None,
}
)
process = RaisingThread(
target=execute_parallel_tasks,
kwargs=task_kwargs,
)
process.start()
return process, active_task_dict

0 comments on commit 5316530

Please sign in to comment.