Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 71 additions & 27 deletions pympipool/shared/executorbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ def execute_separate_tasks(
option to true
"""
active_task_dict = {}
process_lst = []
process_lst, qtask_lst = [], []
while True:
task_dict = future_queue.get()
if "shutdown" in task_dict.keys() and task_dict["shutdown"]:
Expand All @@ -335,35 +335,17 @@ def execute_separate_tasks(
future_queue.join()
break
elif "fn" in task_dict.keys() and "future" in task_dict.keys():
resource_dict = task_dict.pop("resource_dict")
qtask = queue.Queue()
qtask.put(task_dict)
qtask.put({"shutdown": True, "wait": True})
if "cores" not in resource_dict.keys() or (
resource_dict["cores"] == 1 and kwargs["cores"] >= 1
):
resource_dict["cores"] = kwargs["cores"]
active_task_dict = _wait_for_free_slots(
process = _submit_function_to_separate_process(
task_dict=task_dict,
qtask=qtask,
active_task_dict=active_task_dict,
cores_requested=resource_dict["cores"],
interface_class=interface_class,
executor_kwargs=kwargs,
max_cores=max_cores,
hostname_localhost=hostname_localhost,
)
active_task_dict[task_dict["future"]] = resource_dict["cores"]
task_kwargs = kwargs.copy()
task_kwargs.update(resource_dict)
task_kwargs.update(
{
"future_queue": qtask,
"interface_class": interface_class,
"hostname_localhost": hostname_localhost,
"init_function": None,
}
)
process = RaisingThread(
target=execute_parallel_tasks,
kwargs=task_kwargs,
)
process.start()
qtask_lst.append(qtask)
process_lst.append(process)
future_queue.task_done()

Expand Down Expand Up @@ -406,11 +388,73 @@ def _wait_for_free_slots(active_task_dict: dict, cores_requested: int, max_cores
Args:
active_task_dict (dict): Dictionary containing the future objects and the number of cores they require
cores_requested (int): Number of cores required for executing the next task
max_cores (int): Maximum number cores which can be used
max_cores (int): defines the number cores which can be used in parallel

Returns:
dict: Dictionary containing the future objects and the number of cores they require
"""
while sum(active_task_dict.values()) + cores_requested > max_cores:
active_task_dict = {k: v for k, v in active_task_dict.items() if not k.done()}
return active_task_dict


def _submit_function_to_separate_process(
task_dict: dict,
active_task_dict: dict,
qtask: queue.Queue,
interface_class: BaseInterface,
executor_kwargs: dict,
max_cores: int,
hostname_localhost: bool = False,
):
"""
Submit function to be executed in separate Python process

Args:
task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
{"fn": callable, "args": (), "kwargs": {}, "resource_dict": {}}
active_task_dict (dict): Dictionary containing the future objects and the number of cores they require
qtask (queue.Queue): Queue to communicate with the thread linked to the process executing the python function
interface_class (BaseInterface): Interface to start process on selected compute resources
executor_kwargs (dict): keyword parameters used to initialize the Executor
max_cores (int): defines the number cores which can be used in parallel
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
in principle any computer should be able to resolve that their own hostname
points to the same address as localhost. Still MacOS >= 12 seems to disable
this look up for security reasons. So on MacOS it is required to set this
option to true

Returns:
RaisingThread: thread for communicating with the python process which is executing the function
"""
resource_dict = task_dict.pop("resource_dict")
qtask.put(task_dict)
qtask.put({"shutdown": True, "wait": True})
if "cores" not in resource_dict.keys() or (
resource_dict["cores"] == 1 and executor_kwargs["cores"] >= 1
):
resource_dict["cores"] = executor_kwargs["cores"]
active_task_dict = _wait_for_free_slots(
active_task_dict=active_task_dict,
cores_requested=resource_dict["cores"],
max_cores=max_cores,
)
active_task_dict[task_dict["future"]] = resource_dict["cores"]
task_kwargs = executor_kwargs.copy()
task_kwargs.update(resource_dict)
task_kwargs.update(
{
"future_queue": qtask,
"interface_class": interface_class,
"hostname_localhost": hostname_localhost,
"init_function": None,
}
)
process = RaisingThread(
target=execute_parallel_tasks,
kwargs=task_kwargs,
)
process.start()
return process