Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion pympipool/interfaces/fluxbroker.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ class SingleTaskExecutor(ExecutorBase):

Args:
cores (int): defines the number of MPI ranks to use for each function call
threads_per_core (int): number of OpenMP threads to be used for each function call
gpus_per_task (int): number of GPUs per MPI rank - defaults to 0
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed

Simple example:
Examples:
```
>>> import numpy as np
>>> from pympipool import Executor
Expand All @@ -57,6 +58,7 @@ class SingleTaskExecutor(ExecutorBase):
def __init__(
self,
cores,
threads_per_core=1,
gpus_per_task=0,
init_function=None,
cwd=None,
Expand All @@ -68,6 +70,7 @@ def __init__(
kwargs={
"future_queue": self._future_queue,
"cores": cores,
"threads_per_core": threads_per_core,
"gpus_per_task": gpus_per_task,
"cwd": cwd,
"executor": executor,
Expand All @@ -86,6 +89,7 @@ def __init__(
self,
max_workers,
cores_per_worker=1,
threads_per_core=1,
gpus_per_worker=0,
init_function=None,
cwd=None,
Expand All @@ -99,6 +103,7 @@ def __init__(
"future_queue": self._future_queue,
"max_workers": max_workers,
"cores_per_worker": cores_per_worker,
"threads_per_core": threads_per_core,
"gpus_per_worker": gpus_per_worker,
"init_function": init_function,
"cwd": cwd,
Expand All @@ -112,6 +117,7 @@ def __init__(
def execute_parallel_tasks(
future_queue,
cores,
threads_per_core=1,
gpus_per_task=0,
cwd=None,
executor=None,
Expand All @@ -122,6 +128,7 @@ def execute_parallel_tasks(
Args:
future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
cores (int): defines the total number of MPI ranks to use
threads_per_core (int): number of OpenMP threads to be used for each function call
gpus_per_task (int): number of GPUs per MPI rank - defaults to 0
cwd (str/None): current working directory where the parallel python task is executed
executor (flux.job.FluxExecutor/None): flux executor to submit tasks to - optional
Expand All @@ -141,6 +148,7 @@ def execute_parallel_tasks(
command_lst=command_lst,
cwd=cwd,
cores=cores,
threads_per_core=threads_per_core,
gpus_per_core=gpus_per_task,
executor=executor,
)
Expand All @@ -151,6 +159,7 @@ def interface_bootup(
command_lst,
cwd=None,
cores=1,
threads_per_core=1,
gpus_per_core=0,
executor=None,
):
Expand All @@ -161,6 +170,7 @@ def interface_bootup(
connections = FluxPythonInterface(
cwd=cwd,
cores=cores,
threads_per_core=threads_per_core,
gpus_per_core=gpus_per_core,
oversubscribe=False,
executor=executor,
Expand All @@ -178,6 +188,7 @@ def executor_broker(
future_queue,
max_workers,
cores_per_worker=1,
threads_per_core=1,
gpus_per_worker=0,
init_function=None,
cwd=None,
Expand All @@ -187,6 +198,7 @@ def executor_broker(
meta_future_lst = _get_executor_list(
max_workers=max_workers,
cores_per_worker=cores_per_worker,
threads_per_core=threads_per_core,
gpus_per_worker=gpus_per_worker,
init_function=init_function,
cwd=cwd,
Expand All @@ -208,6 +220,7 @@ def executor_broker(
def _get_executor_list(
max_workers,
cores_per_worker=1,
threads_per_core=1,
gpus_per_worker=0,
init_function=None,
cwd=None,
Expand All @@ -216,6 +229,7 @@ def _get_executor_list(
return {
get_future_done(): SingleTaskExecutor(
cores=cores_per_worker,
threads_per_core=threads_per_core,
gpus_per_task=int(gpus_per_worker / cores_per_worker),
init_function=init_function,
cwd=cwd,
Expand Down
35 changes: 17 additions & 18 deletions pympipool/interfaces/taskexecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,24 @@ class Executor(ExecutorBase):
queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems
queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter

Simple example:
Examples:
```
import numpy as np
from pympipool import Executor

def calc(i, j, k):
from mpi4py import MPI
size = MPI.COMM_WORLD.Get_size()
rank = MPI.COMM_WORLD.Get_rank()
return np.array([i, j, k]), size, rank

def init_k():
return {"k": 3}

with Executor(cores=2, init_function=init_k) as p:
fs = p.submit(calc, 2, j=4)
print(fs.result())

>>> [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
>>> import numpy as np
>>> from pympipool import Executor
>>>
>>> def calc(i, j, k):
>>> from mpi4py import MPI
>>> size = MPI.COMM_WORLD.Get_size()
>>> rank = MPI.COMM_WORLD.Get_rank()
>>> return np.array([i, j, k]), size, rank
>>>
>>> def init_k():
>>> return {"k": 3}
>>>
>>> with Executor(cores=2, init_function=init_k) as p:
>>> fs = p.submit(calc, 2, j=4)
>>> print(fs.result())
[(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
```
"""

Expand Down
24 changes: 12 additions & 12 deletions pympipool/legacy/interfaces/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,19 @@ class PoolExecutor(ExecutorBase):
queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems
queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter

Simple example:
Examples:
```
from pympipool import PoolExecutor

def calc(i, j):
return i + j

with PoolExecutor(max_workers=2) as p:
fs1 = p.submit(calc, 1, 2)
fs2 = p.submit(calc, 3, 4)
fs3 = p.submit(calc, 5, 6)
fs4 = p.submit(calc, 7, 8)
print(fs1.result(), fs2.result(), fs3.result(), fs4.result()
>>> from pympipool import PoolExecutor
>>>
>>> def calc(i, j):
>>> return i + j
>>>
>>> with PoolExecutor(max_workers=2) as p:
>>> fs1 = p.submit(calc, 1, 2)
>>> fs2 = p.submit(calc, 3, 4)
>>> fs3 = p.submit(calc, 5, 6)
>>> fs4 = p.submit(calc, 7, 8)
>>> print(fs1.result(), fs2.result(), fs3.result(), fs4.result()
```
"""

Expand Down
18 changes: 9 additions & 9 deletions pympipool/legacy/interfaces/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,16 @@ class Pool(PoolBase):
queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems
queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter

Simple example:
Examples:
```
import numpy as np
from pympipool import Pool

def calc(i):
return np.array(i ** 2)

with Pool(cores=2) as p:
print(p.map(func=calc, iterable=[1, 2, 3, 4]))
>>> import numpy as np
>>> from pympipool import Pool
>>>
>>> def calc(i):
>>> return np.array(i ** 2)
>>>
>>> with Pool(cores=2) as p:
>>> print(p.map(func=calc, iterable=[1, 2, 3, 4]))
```
"""

Expand Down
35 changes: 30 additions & 5 deletions pympipool/shared/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@


class BaseInterface(ABC):
def __init__(self, cwd, cores=1, gpus_per_core=0, oversubscribe=False):
def __init__(
self, cwd, cores=1, threads_per_core=1, gpus_per_core=0, oversubscribe=False
):
self._cwd = cwd
self._cores = cores
self._threads_per_core = threads_per_core
self._gpus_per_core = gpus_per_core
self._oversubscribe = oversubscribe

Expand All @@ -21,10 +24,18 @@ def poll(self):


class SubprocessInterface(BaseInterface):
def __init__(self, cwd=None, cores=1, gpus_per_core=0, oversubscribe=False):
def __init__(
self,
cwd=None,
cores=1,
threads_per_core=1,
gpus_per_core=0,
oversubscribe=False,
):
super().__init__(
cwd=cwd,
cores=cores,
threads_per_core=threads_per_core,
gpus_per_core=gpus_per_core,
oversubscribe=oversubscribe,
)
Expand Down Expand Up @@ -72,6 +83,7 @@ def generate_command(self, command_lst):
command_prepend_lst = generate_slurm_command(
cores=self._cores,
cwd=self._cwd,
threads_per_core=self._threads_per_core,
gpus_per_core=self._gpus_per_core,
oversubscribe=self._oversubscribe,
)
Expand Down Expand Up @@ -142,6 +154,8 @@ def generate_command(self, command_lst):
command_prepend_lst += [
"--cwd=" + self._cwd,
]
if self._threads_per_core > 1:
command_prepend_lst += ["--cores-per-task=" + str(self._threads_per_core)]
if self._gpus_per_core > 0:
command_prepend_lst += ["--gpus-per-task=" + str(self._gpus_per_core)]
return super().generate_command(
Expand All @@ -151,12 +165,19 @@ def generate_command(self, command_lst):

class FluxPythonInterface(BaseInterface):
def __init__(
self, cwd=None, cores=1, gpus_per_core=0, oversubscribe=False, executor=None
self,
cwd=None,
cores=1,
threads_per_core=1,
gpus_per_core=0,
oversubscribe=False,
executor=None,
):
super().__init__(
cwd=cwd,
cores=cores,
gpus_per_core=gpus_per_core,
threads_per_core=threads_per_core,
oversubscribe=oversubscribe,
)
self._executor = executor
Expand All @@ -174,7 +195,7 @@ def bootup(self, command_lst):
jobspec = flux.job.JobspecV1.from_command(
command=command_lst,
num_tasks=self._cores,
cores_per_task=1,
cores_per_task=self._threads_per_core,
gpus_per_task=self._gpus_per_core,
num_nodes=None,
exclusive=False,
Expand All @@ -196,8 +217,12 @@ def poll(self):
return self._future is not None and not self._future.done()


def generate_slurm_command(cores, cwd, gpus_per_core=0, oversubscribe=False):
def generate_slurm_command(
cores, cwd, threads_per_core=1, gpus_per_core=0, oversubscribe=False
):
command_prepend_lst = ["srun", "-n", str(cores), "-D", cwd]
if threads_per_core > 1:
command_prepend_lst += ["--cpus-per-task" + str(threads_per_core)]
if gpus_per_core > 0:
command_prepend_lst += ["--gpus-per-task=" + str(gpus_per_core)]
if oversubscribe:
Expand Down
34 changes: 34 additions & 0 deletions tests/test_flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ def test_flux_executor_serial(self):
self.assertTrue(fs_1.done())
self.assertTrue(fs_2.done())

def test_flux_executor_threads(self):
with PyFluxExecutor(max_workers=1, threads_per_core=2, executor=self.executor) as exe:
fs_1 = exe.submit(calc, 1)
fs_2 = exe.submit(calc, 2)
self.assertEqual(fs_1.result(), 1)
self.assertEqual(fs_2.result(), 2)
self.assertTrue(fs_1.done())
self.assertTrue(fs_2.done())

def test_flux_executor_parallel(self):
with PyFluxExecutor(max_workers=1, cores_per_worker=2, executor=self.executor) as exe:
fs_1 = exe.submit(mpi_funct, 1)
Expand All @@ -73,6 +82,21 @@ def test_execute_task(self):
self.assertEqual(f.result(), 2)
q.join()

def test_execute_task_threads(self):
f = Future()
q = Queue()
q.put({"fn": calc, 'args': (), "kwargs": {"i": 2}, "future": f})
q.put({"shutdown": True, "wait": True})
cloudpickle_register(ind=1)
execute_parallel_tasks(
future_queue=q,
cores=1,
threads_per_core=1,
executor=self.executor
)
self.assertEqual(f.result(), 2)
q.join()

def test_internal_memory(self):
with SingleTaskExecutor(cores=1, init_function=set_global, executor=self.executor) as p:
f = p.submit(get_global)
Expand All @@ -89,3 +113,13 @@ def test_executor_broker(self):
self.assertTrue(f.done())
self.assertEqual(f.result(), 1)
q.join()

def test_executor_broker_threads(self):
q = Queue()
f = Future()
q.put({"fn": calc, "args": (1,), "kwargs": {}, "future": f})
q.put({"shutdown": True, "wait": True})
executor_broker(future_queue=q, max_workers=1, threads_per_core=2, executor=self.executor)
self.assertTrue(f.done())
self.assertEqual(f.result(), 1)
q.join()