Skip to content

Commit

Permalink
Rename executor_kwargs to resource_dict
Browse files Browse the repository at this point in the history
  • Loading branch information
jan-janssen committed Apr 17, 2024
1 parent 11c7326 commit e4577f6
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 56 deletions.
114 changes: 80 additions & 34 deletions pympipool/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
import shutil
from typing import Optional
from ._version import get_versions
from pympipool.mpi.executor import PyMPIExecutor
from pympipool.mpi.executor import PyMPIExecutor, PyMPIStepExecutor
from pympipool.shared.interface import SLURM_COMMAND
from pympipool.shell.executor import SubprocessExecutor
from pympipool.shell.interactive import ShellExecutor
from pympipool.slurm.executor import PySlurmExecutor
from pympipool.slurm.executor import PySlurmExecutor, PySlurmStepExecutor

try: # The PyFluxExecutor requires flux-core to be installed.
from pympipool.flux.executor import PyFluxExecutor
from pympipool.flux.executor import PyFluxExecutor, PyFluxStepExecutor

flux_installed = "FLUX_URI" in os.environ
except ImportError:
Expand All @@ -32,12 +32,11 @@ class Executor:
an interactive Jupyter notebook.
Args:
max_workers (int): defines the number workers which can execute functions in parallel
max_cores (int): defines the number cores which can be used in parallel
cores_per_worker (int): number of MPI cores to be used for each function call
threads_per_core (int): number of OpenMP threads to be used for each function call
gpus_per_worker (int): number of GPUs per worker - defaults to 0
oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Expand All @@ -46,6 +45,13 @@ class Executor:
points to the same address as localhost. Still MacOS >= 12 seems to disable
this look up for security reasons. So on MacOS it is required to set this
option to true
backend (str): Switch between the different backends "flux", "mpi" or "slurm". Alternatively, when "auto"
is selected (the default) the available backend is determined automatically.
block_allocation (boolean): To accelerate the submission of a series of python functions with the same resource
requirements, pympipool supports block allocation. In this case all resources have
to be defined on the executor, rather than during the submission of the individual
function.
init_function (None): optional function to preset arguments for functions which are submitted later
Examples:
```
Expand All @@ -70,32 +76,34 @@ class Executor:

def __init__(
self,
max_workers: int = 1,
max_cores: int = 1,
cores_per_worker: int = 1,
threads_per_core: int = 1,
gpus_per_worker: int = 0,
oversubscribe: bool = False,
init_function: Optional[callable] = None,
cwd: Optional[str] = None,
executor=None,
hostname_localhost: bool = False,
backend="auto",
block_allocation: bool = True,
init_function: Optional[callable] = None,
):
# Use __new__() instead of __init__(). This function is only implemented to enable auto-completion.
pass

def __new__(
cls,
max_workers: int = 1,
max_cores: int = 1,
cores_per_worker: int = 1,
threads_per_core: int = 1,
gpus_per_worker: int = 0,
oversubscribe: bool = False,
init_function: Optional[callable] = None,
cwd: Optional[str] = None,
executor=None,
hostname_localhost: bool = False,
backend: str = "auto",
block_allocation: bool = False,
init_function: Optional[callable] = None,
):
"""
Instead of returning a pympipool.Executor object this function returns either a pympipool.mpi.PyMPIExecutor,
Expand All @@ -106,12 +114,11 @@ def __new__(
requires the SLURM workload manager to be installed on the system.
Args:
max_workers (int): defines the number workers which can execute functions in parallel
max_cores (int): defines the number cores which can be used in parallel
cores_per_worker (int): number of MPI cores to be used for each function call
threads_per_core (int): number of OpenMP threads to be used for each function call
gpus_per_worker (int): number of GPUs per worker - defaults to 0
oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Expand All @@ -122,8 +129,15 @@ def __new__(
option to true
backend (str): Switch between the different backends "flux", "mpi" or "slurm". Alternatively, when "auto"
is selected (the default) the available backend is determined automatically.
block_allocation (boolean): To accelerate the submission of a series of python functions with the same
resource requirements, pympipool supports block allocation. In this case all
resources have to be defined on the executor, rather than during the submission
of the individual function.
init_function (None): optional function to preset arguments for functions which are submitted later
"""
if not block_allocation and init_function is not None:
raise ValueError("")
if backend not in ["auto", "mpi", "slurm", "flux"]:
raise ValueError(
'The currently implemented backends are ["flux", "mpi", "slurm"]. '
Expand All @@ -137,23 +151,47 @@ def __new__(
"Oversubscribing is not supported for the pympipool.flux.PyFLuxExecutor backend."
"Please use oversubscribe=False instead of oversubscribe=True."
)
return PyFluxExecutor(
max_workers=max_workers,
cores_per_worker=cores_per_worker,
threads_per_core=threads_per_core,
gpus_per_worker=gpus_per_worker,
init_function=init_function,
cwd=cwd,
hostname_localhost=hostname_localhost,
)
if block_allocation:
return PyFluxExecutor(
max_workers=int(max_cores / cores_per_worker),
cores_per_worker=cores_per_worker,
threads_per_core=threads_per_core,
gpus_per_worker=gpus_per_worker,
init_function=init_function,
cwd=cwd,
hostname_localhost=hostname_localhost,
)
else:
return PyFluxStepExecutor(
max_cores=max_cores,
cores_per_worker=cores_per_worker,
threads_per_core=threads_per_core,
gpus_per_worker=gpus_per_worker,
cwd=cwd,
hostname_localhost=hostname_localhost,
)
elif backend == "slurm" or (backend == "auto" and slurm_installed):
return PySlurmExecutor(
max_workers=max_workers,
cores_per_worker=cores_per_worker,
init_function=init_function,
cwd=cwd,
hostname_localhost=hostname_localhost,
)
if block_allocation:
return PySlurmExecutor(
max_workers=int(max_cores / cores_per_worker),
cores_per_worker=cores_per_worker,
threads_per_core=threads_per_core,
gpus_per_worker=gpus_per_worker,
oversubscribe=oversubscribe,
init_function=init_function,
cwd=cwd,
hostname_localhost=hostname_localhost,
)
else:
return PySlurmStepExecutor(
max_cores=max_cores,
cores_per_worker=cores_per_worker,
threads_per_core=threads_per_core,
gpus_per_worker=gpus_per_worker,
oversubscribe=oversubscribe,
cwd=cwd,
hostname_localhost=hostname_localhost,
)
else: # backend="mpi"
if threads_per_core != 1:
raise ValueError(
Expand All @@ -169,10 +207,18 @@ def __new__(
+ str(gpus_per_worker)
+ "."
)
return PyMPIExecutor(
max_workers=max_workers,
cores_per_worker=cores_per_worker,
init_function=init_function,
cwd=cwd,
hostname_localhost=hostname_localhost,
)
if block_allocation:
return PyMPIExecutor(
max_workers=int(max_cores / cores_per_worker),
cores_per_worker=cores_per_worker,
init_function=init_function,
cwd=cwd,
hostname_localhost=hostname_localhost,
)
else:
return PyMPIStepExecutor(
max_cores=max_cores,
cores_per_worker=cores_per_worker,
cwd=cwd,
hostname_localhost=hostname_localhost,
)
2 changes: 1 addition & 1 deletion pympipool/flux/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class PyFluxStepExecutor(ExecutorSteps):
>>> return np.array([i, j, k]), size, rank
>>>
>>> with PyFluxStepExecutor(max_cores=2) as p:
>>> fs = p.submit(calc, 2, j=4, k=3, executor_kwargs={"cores": 2})
>>> fs = p.submit(calc, 2, j=4, k=3, resource_dict={"cores": 2})
>>> print(fs.result())
[(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
Expand Down
2 changes: 1 addition & 1 deletion pympipool/mpi/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class PyMPIStepExecutor(ExecutorSteps):
>>> return np.array([i, j, k]), size, rank
>>>
>>> with PyMPIStepExecutor(max_cores=2) as p:
>>> fs = p.submit(calc, 2, j=4, k=3, executor_kwargs={"cores": 2})
>>> fs = p.submit(calc, 2, j=4, k=3, resource_dict={"cores": 2})
>>> print(fs.result())
[(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
Expand Down
69 changes: 50 additions & 19 deletions pympipool/shared/executorbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,42 @@ def info(self):
def future_queue(self):
return self._future_queue

def submit(self, fn: callable, *args, **kwargs):
"""Submits a callable to be executed with the given arguments.
def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs):
"""
Submits a callable to be executed with the given arguments.
Schedules the callable to be executed as fn(*args, **kwargs) and returns
a Future instance representing the execution of the callable.
Args:
fn (callable): function to submit for execution
args: arguments for the submitted function
kwargs: keyword arguments for the submitted function
resource_dict (dict): resource dictionary, which defines the resources used for the execution of the
function. Example resource dictionary: {
cores: 1,
threads_per_core: 1,
gpus_per_worker: 0,
oversubscribe: False,
cwd: None,
executor: None,
hostname_localhost: False,
}
Returns:
A Future representing the given call.
"""
if len(resource_dict) > 0:
raise ValueError(
"When block_allocation is enabled, the resource requirements have to be defined on the executor level."
)
f = Future()
self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f})
return f

def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
"""Clean-up the resources associated with the Executor.
"""
Clean-up the resources associated with the Executor.
It is safe to call this method several times. Otherwise, no other
methods can be called after this one.
Expand Down Expand Up @@ -90,10 +111,6 @@ def __del__(self):
except (AttributeError, RuntimeError):
pass

def _set_process(self, process: RaisingThread):
self._process = process
self._process.start()


class ExecutorBroker(ExecutorBase):
def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
Expand Down Expand Up @@ -129,18 +146,28 @@ def _set_process(self, process: List[RaisingThread]):


class ExecutorSteps(ExecutorBase):
def submit(
self,
fn: callable,
*args,
executor_kwargs: dict = {},
**kwargs,
):
"""Submits a callable to be executed with the given arguments.
def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs):
"""
Submits a callable to be executed with the given arguments.
Schedules the callable to be executed as fn(*args, **kwargs) and returns
a Future instance representing the execution of the callable.
Args:
fn (callable): function to submit for execution
args: arguments for the submitted function
kwargs: keyword arguments for the submitted function
resource_dict (dict): resource dictionary, which defines the resources used for the execution of the
function. Example resource dictionary: {
cores: 1,
threads_per_core: 1,
gpus_per_worker: 0,
oversubscribe: False,
cwd: None,
executor: None,
hostname_localhost: False,
}
Returns:
A Future representing the given call.
"""
Expand All @@ -151,7 +178,7 @@ def submit(
"args": args,
"kwargs": kwargs,
"future": f,
"executor_kwargs": executor_kwargs,
"resource_dict": resource_dict,
}
)
return f
Expand Down Expand Up @@ -298,9 +325,12 @@ def execute_separate_tasks(
option to true
"""
active_task_dict = {}
process_lst = []
while True:
task_dict = future_queue.get()
if "shutdown" in task_dict.keys() and task_dict["shutdown"]:
if task_dict["wait"]:
_ = [process.join() for process in process_lst]
future_queue.task_done()
future_queue.join()
break
Expand All @@ -309,13 +339,13 @@ def execute_separate_tasks(
active_task_dict=active_task_dict,
max_cores=max_cores,
)
executor_dict = task_dict.pop("executor_kwargs")
resource_dict = task_dict.pop("resource_dict")
qtask = queue.Queue()
qtask.put(task_dict)
qtask.put({"shutdown": True, "wait": True})
active_task_dict[task_dict["future"]] = executor_dict["cores"]
active_task_dict[task_dict["future"]] = resource_dict["cores"]
task_kwargs = kwargs.copy()
task_kwargs.update(executor_dict)
task_kwargs.update(resource_dict)
task_kwargs.update(
{
"future_queue": qtask,
Expand All @@ -329,6 +359,7 @@ def execute_separate_tasks(
kwargs=task_kwargs,
)
process.start()
process_lst.append(process)
future_queue.task_done()


Expand Down
2 changes: 1 addition & 1 deletion pympipool/slurm/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class PySlurmStepExecutor(ExecutorSteps):
>>> return np.array([i, j, k]), size, rank
>>>
>>> with PySlurmStepExecutor(max_cores=2) as p:
>>> fs = p.submit(calc, 2, j=4, k=3, executor_kwargs={"cores": 2})
>>> fs = p.submit(calc, 2, j=4, k=3, resource_dict={"cores": 2})
>>> print(fs.result())
[(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
Expand Down

0 comments on commit e4577f6

Please sign in to comment.