Skip to content

Commit

Permalink
Merge pull request #306 from pyiron/dependencies
Browse files Browse the repository at this point in the history
Add support for dependent tasks
  • Loading branch information
jan-janssen committed Apr 22, 2024
2 parents 73b3178 + c6094e6 commit 86d7ea5
Show file tree
Hide file tree
Showing 5 changed files with 242 additions and 15 deletions.
38 changes: 38 additions & 0 deletions docs/source/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ with flux.job.FluxExecutor() as flux_exe:
block_allocation=False,
init_function=None, # only available with block_allocation=True
command_line_argument_lst=[], # additional command line arguments for SLURM
disable_dependencies=False, # option to disable input dependency resolution
refresh_rate=0.01, # refresh frequency in seconds for the dependency resolution
) as exe:
future_obj = exe.submit(
calc_function,
Expand Down Expand Up @@ -384,6 +386,42 @@ As a result the GPUs available on the two compute nodes are reported:
```
In this case each compute node `cn138` and `cn139` is equipped with one `Tesla V100S-PCIE-32GB`.

## Coupled Functions
For submitting two functions with rather different computing resource requirements it is essential to represent this
dependence during the submission process. In `pympipool` this can be achieved by leveraging the separate submission of
individual python functions and including the `concurrent.futures.Future` object of the first submitted function as
input for the second function during the submission. Consequently, this functionality can be used for directed acyclic
graphs, still it does not enable cyclic graphs. As a simple example we can add one to the result of the addition of one
and two:
```python
from pympipool import Executor

def calc_function(parameter_a, parameter_b):
return parameter_a + parameter_b

with flux.job.FluxExecutor() as flux_exe:
with Executor(max_cores=2, executor=flux_exe) as exe:
future_1 = exe.submit(
calc_function,
parameter_a=1,
parameter_b=2,
)
future_2 = exe.submit(
calc_function,
parameter_a=1,
parameter_b=future_1,
)
print(future_2.result())
```
Here the first addition `1+2` is computed and the output `3` is returned as the result of `future_1.result()`. Still
before the computation of this addition is completed already the next addition is submitted which uses the future object
as an input `future_1` and adds `1`. The result of both additions is `4` as `1+2+1=4`.

To disable this functionality the parameter `disable_dependencies=True` can be set on the executor level. Still at the
current stage the performance improvement of disabling this functionality seem to be minimal. Furthermore, this
functionality introduces the `refresh_rate=0.01` parameter, it defines the refresh rate in seconds how frequently the
queue of submitted functions is queried. Typically, there is no need to change these default parameters.

## SLURM Job Scheduler
Using `pympipool` without the [flux framework](https://flux-framework.org) results in one `srun` call per worker in
`block_allocation=True` mode and one `srun` call per submitted function in `block_allocation=False` mode. As each `srun`
Expand Down
56 changes: 42 additions & 14 deletions pympipool/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from pympipool.scheduler import create_executor
from pympipool.shell.executor import SubprocessExecutor
from pympipool.shell.interactive import ShellExecutor
from pympipool.shared.dependencies import ExecutorWithDependencies


__version__ = get_versions()["version"]
Expand Down Expand Up @@ -74,6 +75,8 @@ def __init__(
block_allocation: bool = True,
init_function: Optional[callable] = None,
command_line_argument_lst: list[str] = [],
disable_dependencies: bool = False,
refresh_rate: float = 0.01,
):
# Use __new__() instead of __init__(). This function is only implemented to enable auto-completion.
pass
Expand All @@ -92,6 +95,8 @@ def __new__(
block_allocation: bool = False,
init_function: Optional[callable] = None,
command_line_argument_lst: list[str] = [],
disable_dependencies: bool = False,
refresh_rate: float = 0.01,
):
"""
Instead of returning a pympipool.Executor object this function returns either a pympipool.mpi.PyMPIExecutor,
Expand Down Expand Up @@ -123,19 +128,42 @@ def __new__(
of the individual function.
init_function (None): optional function to preset arguments for functions which are submitted later
command_line_argument_lst (list): Additional command line arguments for the srun call (SLURM only)
disable_dependencies (boolean): Disable resolving future objects during the submission.
refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked.
"""
return create_executor(
max_cores=max_cores,
cores_per_worker=cores_per_worker,
threads_per_core=threads_per_core,
gpus_per_worker=gpus_per_worker,
oversubscribe=oversubscribe,
cwd=cwd,
executor=executor,
hostname_localhost=hostname_localhost,
backend=backend,
block_allocation=block_allocation,
init_function=init_function,
command_line_argument_lst=command_line_argument_lst,
)
if not disable_dependencies:
return ExecutorWithDependencies(
max_cores=max_cores,
cores_per_worker=cores_per_worker,
threads_per_core=threads_per_core,
gpus_per_worker=gpus_per_worker,
oversubscribe=oversubscribe,
cwd=cwd,
executor=executor,
hostname_localhost=hostname_localhost,
backend=backend,
block_allocation=block_allocation,
init_function=init_function,
command_line_argument_lst=command_line_argument_lst,
refresh_rate=refresh_rate,
)
else:
if refresh_rate != 0.01:
raise ValueError(
"The sleep_interval parameter is only used when disable_dependencies=False."
)
return create_executor(
max_cores=max_cores,
cores_per_worker=cores_per_worker,
threads_per_core=threads_per_core,
gpus_per_worker=gpus_per_worker,
oversubscribe=oversubscribe,
cwd=cwd,
executor=executor,
hostname_localhost=hostname_localhost,
backend=backend,
block_allocation=block_allocation,
init_function=init_function,
command_line_argument_lst=command_line_argument_lst,
)
21 changes: 21 additions & 0 deletions pympipool/shared/dependencies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from pympipool.scheduler import create_executor
from pympipool.shared.executorbase import ExecutorSteps, execute_tasks_with_dependencies
from pympipool.shared.thread import RaisingThread


class ExecutorWithDependencies(ExecutorSteps):
def __init__(self, *args, refresh_rate: float = 0.01, **kwargs):
super().__init__()
executor = create_executor(*args, **kwargs)
self._set_process(
RaisingThread(
target=execute_tasks_with_dependencies,
kwargs={
# Executor Arguments
"future_queue": self._future_queue,
"executor_queue": executor._future_queue,
"executor": executor,
"refresh_rate": refresh_rate,
},
)
)
123 changes: 122 additions & 1 deletion pympipool/shared/executorbase.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from typing import Optional, List
from concurrent.futures import (
Executor as FutureExecutor,
Future,
Expand All @@ -7,6 +6,8 @@
import os
import queue
import sys
from time import sleep
from typing import Optional, List

import cloudpickle

Expand Down Expand Up @@ -371,6 +372,63 @@ def execute_separate_tasks(
future_queue.task_done()


def execute_tasks_with_dependencies(
future_queue: queue.Queue,
executor_queue: queue.Queue,
executor: ExecutorBase,
refresh_rate: float = 0.01,
):
"""
Resolve the dependencies of multiple tasks, by analysing which task requires concurrent.future.Futures objects from
other tasks.
Args:
future_queue (Queue): Queue for receiving new tasks.
executor_queue (Queue): Queue for the internal executor.
executor (ExecutorBase): Executor to execute the tasks with after the dependencies are resolved.
refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked.
"""
wait_lst = []
while True:
try:
task_dict = future_queue.get_nowait()
except queue.Empty:
task_dict = None
if ( # shutdown the executor
task_dict is not None
and "shutdown" in task_dict.keys()
and task_dict["shutdown"]
):
executor.shutdown(wait=task_dict["wait"])
future_queue.task_done()
future_queue.join()
break
elif ( # handle function submitted to the executor
task_dict is not None
and "fn" in task_dict.keys()
and "future" in task_dict.keys()
):
future_lst, ready_flag = _get_future_objects_from_input(task_dict=task_dict)
if len(future_lst) == 0 or ready_flag:
# No future objects are used in the input or all future objects are already done
task_dict["args"], task_dict["kwargs"] = _update_futures_in_input(
args=task_dict["args"], kwargs=task_dict["kwargs"]
)
executor_queue.put(task_dict)
else: # Otherwise add the function to the wait list
task_dict["future_lst"] = future_lst
wait_lst.append(task_dict)
future_queue.task_done()
elif len(wait_lst) > 0:
# Check functions in the wait list and execute them if all future objects are now ready
wait_lst = _submit_waiting_task(
wait_lst=wait_lst, executor_queue=executor_queue
)
else:
# If there is nothing else to do, sleep for a moment
sleep(refresh_rate)


def _get_backend_path(cores: int):
"""
Get command to call backend as a list of two strings
Expand Down Expand Up @@ -417,3 +475,66 @@ def _wait_for_free_slots(active_task_dict: dict, cores_requested: int, max_cores
while sum(active_task_dict.values()) + cores_requested > max_cores:
active_task_dict = {k: v for k, v in active_task_dict.items() if not k.done()}
return active_task_dict


def _submit_waiting_task(wait_lst: List[dict], executor_queue: queue.Queue):
"""
Submit the waiting tasks, which future inputs have been completed, to the executor
Args:
wait_lst (list): List of waiting tasks
executor_queue (Queue): Queue of the internal executor
Returns:
list: list tasks which future inputs have not been completed
"""
wait_tmp_lst = []
for task_wait_dict in wait_lst:
if all([future.done() for future in task_wait_dict["future_lst"]]):
del task_wait_dict["future_lst"]
task_wait_dict["args"], task_wait_dict["kwargs"] = _update_futures_in_input(
args=task_wait_dict["args"], kwargs=task_wait_dict["kwargs"]
)
executor_queue.put(task_wait_dict)
else:
wait_tmp_lst.append(task_wait_dict)
return wait_tmp_lst


def _update_futures_in_input(args: tuple, kwargs: dict):
"""
Evaluate future objects in the arguments and keyword arguments by calling future.result()
Args:
args (tuple): function arguments
kwargs (dict): function keyword arguments
Returns:
tuple, dict: arguments and keyword arguments with each future object in them being evaluated
"""
args = [arg if not isinstance(arg, Future) else arg.result() for arg in args]
kwargs = {
key: value if not isinstance(value, Future) else value.result()
for key, value in kwargs.items()
}
return args, kwargs


def _get_future_objects_from_input(task_dict: dict):
"""
Check the input parameters if they contain future objects and which of these future objects are executed
Args:
task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
{"fn": callable, "args": (), "kwargs": {}, "resource_dict": {}}
Returns:
list, boolean: list of future objects and boolean flag if all future objects are already done
"""
future_lst = [arg for arg in task_dict["args"] if isinstance(arg, Future)] + [
value for value in task_dict["kwargs"] if isinstance(value, Future)
]
boolean_flag = len([future for future in future_lst if future.done()]) == len(
future_lst
)
return future_lst, boolean_flag
19 changes: 19 additions & 0 deletions tests/test_dependencies_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import unittest
from time import sleep

from pympipool import Executor
from pympipool.shared.executorbase import cloudpickle_register


def add_function(parameter_1, parameter_2):
sleep(0.2)
return parameter_1 + parameter_2


class TestExecutorWithDependencies(unittest.TestCase):
def test_executor(self):
with Executor(max_cores=1, backend="mpi", hostname_localhost=True) as exe:
cloudpickle_register(ind=1)
future_1 = exe.submit(add_function, 1, parameter_2=2)
future_2 = exe.submit(add_function, 1, parameter_2=future_1)
self.assertEqual(future_2.result(), 4)

0 comments on commit 86d7ea5

Please sign in to comment.