Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 28 additions & 26 deletions pympipool/flux/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@

class PyFluxExecutor(ExecutorBase):
"""
The pympipool.flux.PyFluxExecutor leverages the flux framework to distribute python tasks within a queuing system
allocation. In analogy to the pympipool.slurm.PySlurmExecutur it provides the option to specify the number of
threads per worker as well as the number of GPUs per worker in addition to specifying the number of cores per
worker.

Args:
max_workers (int): defines the number workers which can execute functions in parallel
cores_per_worker (int): number of MPI cores to be used for each function call
Expand All @@ -23,6 +28,27 @@ class PyFluxExecutor(ExecutorBase):
cwd (str/None): current working directory where the parallel python task is executed
sleep_interval (float): synchronization interval - default 0.1
executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux

Examples:
```
>>> import numpy as np
>>> from pympipool.flux import PyFluxExecutor
>>>
>>> def calc(i, j, k):
>>> from mpi4py import MPI
>>> size = MPI.COMM_WORLD.Get_size()
>>> rank = MPI.COMM_WORLD.Get_rank()
>>> return np.array([i, j, k]), size, rank
>>>
>>> def init_k():
>>> return {"k": 3}
>>>
>>> with PyFluxExecutor(cores=2, init_function=init_k) as p:
>>> fs = p.submit(calc, 2, j=4)
>>> print(fs.result())

[(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
```
"""

def __init__(
Expand Down Expand Up @@ -59,39 +85,15 @@ def __init__(

class PyFluxSingleTaskExecutor(ExecutorBase):
"""
The pympipool.Executor behaves like the concurrent.futures.Executor but it uses mpi4py to execute parallel tasks.
In contrast to the mpi4py.futures.MPIPoolExecutor the pympipool.Executor can be executed in a serial python process
and does not require the python script to be executed with MPI. Still internally the pympipool.Executor uses the
mpi4py.futures.MPIPoolExecutor, consequently it is primarily an abstraction of its functionality to improve the
usability in particular when used in combination with Jupyter notebooks.
The pympipool.flux.PyFluxSingleTaskExecutor is the internal worker for the pympipool.flux.PyFluxExecutor.

Args:
cores (int): defines the number of MPI ranks to use for each function call
threads_per_core (int): number of OpenMP threads to be used for each function call
gpus_per_task (int): number of GPUs per MPI rank - defaults to 0
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed

Examples:
```
>>> import numpy as np
>>> from pympipool.flux.executor import PyFluxSingleTaskExecutor
>>>
>>> def calc(i, j, k):
>>> from mpi4py import MPI
>>> size = MPI.COMM_WORLD.Get_size()
>>> rank = MPI.COMM_WORLD.Get_rank()
>>> return np.array([i, j, k]), size, rank
>>>
>>> def init_k():
>>> return {"k": 3}
>>>
>>> with PyFluxSingleTaskExecutor(cores=2, init_function=init_k) as p:
>>> fs = p.submit(calc, 2, j=4)
>>> print(fs.result())

[(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
```
executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
"""

def __init__(
Expand Down
51 changes: 27 additions & 24 deletions pympipool/mpi/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,39 @@

class PyMPIExecutor(ExecutorBase):
"""
The pympipool.mpi.PyMPIExecutor leverages the message passing interface MPI to distribute python tasks within an
MPI allocation. In contrast to the mpi4py.futures.MPIPoolExecutor the pympipool.mpi.PyMPIExecutor can be executed
in a serial python process and does not require the python script to be executed with MPI. Consequently, it is
primarily an abstraction of its functionality to improve the usability in particular when used in combination with \
Jupyter notebooks.

Args:
max_workers (int): defines the number workers which can execute functions in parallel
cores_per_worker (int): number of MPI cores to be used for each function call
oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
sleep_interval (float): synchronization interval - default 0.1

Examples:
```
>>> import numpy as np
>>> from pympipool.mpi import PyMPIExecutor
>>>
>>> def calc(i, j, k):
>>> from mpi4py import MPI
>>> size = MPI.COMM_WORLD.Get_size()
>>> rank = MPI.COMM_WORLD.Get_rank()
>>> return np.array([i, j, k]), size, rank
>>>
>>> def init_k():
>>> return {"k": 3}
>>>
>>> with PyMPIExecutor(cores=2, init_function=init_k) as p:
>>> fs = p.submit(calc, 2, j=4)
>>> print(fs.result())
[(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
```
"""

def __init__(
Expand Down Expand Up @@ -49,37 +75,14 @@ def __init__(

class PyMPISingleTaskExecutor(ExecutorBase):
"""
The pympipool.Executor behaves like the concurrent.futures.Executor but it uses mpi4py to execute parallel tasks.
In contrast to the mpi4py.futures.MPIPoolExecutor the pympipool.Executor can be executed in a serial python process
and does not require the python script to be executed with MPI. Still internally the pympipool.Executor uses the
mpi4py.futures.MPIPoolExecutor, consequently it is primarily an abstraction of its functionality to improve the
usability in particular when used in combination with Jupyter notebooks.
The pympipool.mpi.PyMPISingleTaskExecutor is the internal worker for the pympipool.mpi.PyMPIExecutor.

Args:
cores (int): defines the number of MPI ranks to use for each function call
oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed

Examples:
```
>>> import numpy as np
>>> from pympipool.mpi.executor import PyMPISingleTaskExecutor
>>>
>>> def calc(i, j, k):
>>> from mpi4py import MPI
>>> size = MPI.COMM_WORLD.Get_size()
>>> rank = MPI.COMM_WORLD.Get_rank()
>>> return np.array([i, j, k]), size, rank
>>>
>>> def init_k():
>>> return {"k": 3}
>>>
>>> with PyMPISingleTaskExecutor(cores=2, init_function=init_k) as p:
>>> fs = p.submit(calc, 2, j=4)
>>> print(fs.result())
[(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
```
"""

def __init__(
Expand Down
51 changes: 27 additions & 24 deletions pympipool/slurm/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@

class PySlurmExecutor(ExecutorBase):
"""
The pympipool.slurm.PySlurmExecutor leverages the srun command to distribute python tasks within a SLURM queuing
system allocation. In analogy to the pympipool.flux.PyFluxExecutor it provides the option to specify the number of
threads per worker as well as the number of GPUs per worker in addition to specifying the number of cores per
worker.

Args:
max_workers (int): defines the number workers which can execute functions in parallel
cores_per_worker (int): number of MPI cores to be used for each function call
Expand All @@ -27,6 +32,27 @@ class PySlurmExecutor(ExecutorBase):
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
sleep_interval (float): synchronization interval - default 0.1

Examples:
```
>>> import numpy as np
>>> from pympipool.slurm import PySlurmExecutor
>>>
>>> def calc(i, j, k):
>>> from mpi4py import MPI
>>> size = MPI.COMM_WORLD.Get_size()
>>> rank = MPI.COMM_WORLD.Get_rank()
>>> return np.array([i, j, k]), size, rank
>>>
>>> def init_k():
>>> return {"k": 3}
>>>
>>> with PySlurmExecutor(cores=2, init_function=init_k) as p:
>>> fs = p.submit(calc, 2, j=4)
>>> print(fs.result())

[(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
```
"""

def __init__(
Expand Down Expand Up @@ -63,11 +89,7 @@ def __init__(

class PySlurmSingleTaskExecutor(ExecutorBase):
"""
The pympipool.Executor behaves like the concurrent.futures.Executor but it uses mpi4py to execute parallel tasks.
In contrast to the mpi4py.futures.MPIPoolExecutor the pympipool.Executor can be executed in a serial python process
and does not require the python script to be executed with MPI. Still internally the pympipool.Executor uses the
mpi4py.futures.MPIPoolExecutor, consequently it is primarily an abstraction of its functionality to improve the
usability in particular when used in combination with Jupyter notebooks.
The pympipool.slurm.PySlurmSingleTaskExecutor is the internal worker for the pympipool.slurm.PySlurmExecutor.

Args:
cores (int): defines the number of MPI ranks to use for each function call
Expand All @@ -77,25 +99,6 @@ class PySlurmSingleTaskExecutor(ExecutorBase):
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed

Examples:
```
>>> import numpy as np
>>> from pympipool.mpi.executor import PyMPISingleTaskExecutor
>>>
>>> def calc(i, j, k):
>>> from mpi4py import MPI
>>> size = MPI.COMM_WORLD.Get_size()
>>> rank = MPI.COMM_WORLD.Get_rank()
>>> return np.array([i, j, k]), size, rank
>>>
>>> def init_k():
>>> return {"k": 3}
>>>
>>> with PyMPISingleTaskExecutor(cores=2, init_function=init_k) as p:
>>> fs = p.submit(calc, 2, j=4)
>>> print(fs.result())
[(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
```
"""

def __init__(
Expand Down