diff --git a/pympipool/interfaces/fluxbroker.py b/pympipool/interfaces/fluxbroker.py index 7ac4790c..9a4dc2fa 100644 --- a/pympipool/interfaces/fluxbroker.py +++ b/pympipool/interfaces/fluxbroker.py @@ -28,11 +28,12 @@ class SingleTaskExecutor(ExecutorBase): Args: cores (int): defines the number of MPI ranks to use for each function call + threads_per_core (int): number of OpenMP threads to be used for each function call gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 init_function (None): optional function to preset arguments for functions which are submitted later cwd (str/None): current working directory where the parallel python task is executed - Simple example: + Examples: ``` >>> import numpy as np >>> from pympipool import Executor @@ -57,6 +58,7 @@ class SingleTaskExecutor(ExecutorBase): def __init__( self, cores, + threads_per_core=1, gpus_per_task=0, init_function=None, cwd=None, @@ -68,6 +70,7 @@ def __init__( kwargs={ "future_queue": self._future_queue, "cores": cores, + "threads_per_core": threads_per_core, "gpus_per_task": gpus_per_task, "cwd": cwd, "executor": executor, @@ -86,6 +89,7 @@ def __init__( self, max_workers, cores_per_worker=1, + threads_per_core=1, gpus_per_worker=0, init_function=None, cwd=None, @@ -99,6 +103,7 @@ def __init__( "future_queue": self._future_queue, "max_workers": max_workers, "cores_per_worker": cores_per_worker, + "threads_per_core": threads_per_core, "gpus_per_worker": gpus_per_worker, "init_function": init_function, "cwd": cwd, @@ -112,6 +117,7 @@ def __init__( def execute_parallel_tasks( future_queue, cores, + threads_per_core=1, gpus_per_task=0, cwd=None, executor=None, @@ -122,6 +128,7 @@ def execute_parallel_tasks( Args: future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process cores (int): defines the total number of MPI ranks to use + threads_per_core (int): number of OpenMP threads to be used for each function call gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 cwd (str/None): current working directory where the parallel python task is executed executor (flux.job.FluxExecutor/None): flux executor to submit tasks to - optional @@ -141,6 +148,7 @@ def execute_parallel_tasks( command_lst=command_lst, cwd=cwd, cores=cores, + threads_per_core=threads_per_core, gpus_per_core=gpus_per_task, executor=executor, ) @@ -151,6 +159,7 @@ def interface_bootup( command_lst, cwd=None, cores=1, + threads_per_core=1, gpus_per_core=0, executor=None, ): @@ -161,6 +170,7 @@ def interface_bootup( connections = FluxPythonInterface( cwd=cwd, cores=cores, + threads_per_core=threads_per_core, gpus_per_core=gpus_per_core, oversubscribe=False, executor=executor, @@ -178,6 +188,7 @@ def executor_broker( future_queue, max_workers, cores_per_worker=1, + threads_per_core=1, gpus_per_worker=0, init_function=None, cwd=None, @@ -187,6 +198,7 @@ def executor_broker( meta_future_lst = _get_executor_list( max_workers=max_workers, cores_per_worker=cores_per_worker, + threads_per_core=threads_per_core, gpus_per_worker=gpus_per_worker, init_function=init_function, cwd=cwd, @@ -208,6 +220,7 @@ def executor_broker( def _get_executor_list( max_workers, cores_per_worker=1, + threads_per_core=1, gpus_per_worker=0, init_function=None, cwd=None, @@ -216,6 +229,7 @@ def _get_executor_list( return { get_future_done(): SingleTaskExecutor( cores=cores_per_worker, + threads_per_core=threads_per_core, gpus_per_task=int(gpus_per_worker / cores_per_worker), init_function=init_function, cwd=cwd, diff --git a/pympipool/interfaces/taskexecutor.py b/pympipool/interfaces/taskexecutor.py index 4315e309..8039a4a2 100644 --- a/pympipool/interfaces/taskexecutor.py +++ b/pympipool/interfaces/taskexecutor.py @@ -25,25 +25,24 @@ class Executor(ExecutorBase): queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter - Simple example: + Examples: ``` - import numpy as np - from pympipool import Executor - - def calc(i, j, k): - from mpi4py import MPI - size = MPI.COMM_WORLD.Get_size() - rank = MPI.COMM_WORLD.Get_rank() - return np.array([i, j, k]), size, rank - - def init_k(): - return {"k": 3} - - with Executor(cores=2, init_function=init_k) as p: - fs = p.submit(calc, 2, j=4) - print(fs.result()) - - >>> [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] + >>> import numpy as np + >>> from pympipool import Executor + >>> + >>> def calc(i, j, k): + >>> from mpi4py import MPI + >>> size = MPI.COMM_WORLD.Get_size() + >>> rank = MPI.COMM_WORLD.Get_rank() + >>> return np.array([i, j, k]), size, rank + >>> + >>> def init_k(): + >>> return {"k": 3} + >>> + >>> with Executor(cores=2, init_function=init_k) as p: + >>> fs = p.submit(calc, 2, j=4) + >>> print(fs.result()) + [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] ``` """ diff --git a/pympipool/legacy/interfaces/executor.py b/pympipool/legacy/interfaces/executor.py index 62c62fb6..87dfeec9 100644 --- a/pympipool/legacy/interfaces/executor.py +++ b/pympipool/legacy/interfaces/executor.py @@ -24,19 +24,19 @@ class PoolExecutor(ExecutorBase): queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter - Simple example: + Examples: ``` - from pympipool import PoolExecutor - - def calc(i, j): - return i + j - - with PoolExecutor(max_workers=2) as p: - fs1 = p.submit(calc, 1, 2) - fs2 = p.submit(calc, 3, 4) - fs3 = p.submit(calc, 5, 6) - fs4 = p.submit(calc, 7, 8) - print(fs1.result(), fs2.result(), fs3.result(), fs4.result() + >>> from pympipool import PoolExecutor + >>> + >>> def calc(i, j): + >>> return i + j + >>> + >>> with PoolExecutor(max_workers=2) as p: + >>> fs1 = p.submit(calc, 1, 2) + >>> fs2 = p.submit(calc, 3, 4) + >>> fs3 = p.submit(calc, 5, 6) + >>> fs4 = p.submit(calc, 7, 8) + >>> print(fs1.result(), fs2.result(), fs3.result(), fs4.result() ``` """ diff --git a/pympipool/legacy/interfaces/pool.py b/pympipool/legacy/interfaces/pool.py index fffdd60b..da9b37fc 100644 --- a/pympipool/legacy/interfaces/pool.py +++ b/pympipool/legacy/interfaces/pool.py @@ -45,16 +45,16 @@ class Pool(PoolBase): queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter - Simple example: + Examples: ``` - import numpy as np - from pympipool import Pool - - def calc(i): - return np.array(i ** 2) - - with Pool(cores=2) as p: - print(p.map(func=calc, iterable=[1, 2, 3, 4])) + >>> import numpy as np + >>> from pympipool import Pool + >>> + >>> def calc(i): + >>> return np.array(i ** 2) + >>> + >>> with Pool(cores=2) as p: + >>> print(p.map(func=calc, iterable=[1, 2, 3, 4])) ``` """ diff --git a/pympipool/shared/connections.py b/pympipool/shared/connections.py index ac0dd7dc..de7be7dd 100644 --- a/pympipool/shared/connections.py +++ b/pympipool/shared/connections.py @@ -4,9 +4,12 @@ class BaseInterface(ABC): - def __init__(self, cwd, cores=1, gpus_per_core=0, oversubscribe=False): + def __init__( + self, cwd, cores=1, threads_per_core=1, gpus_per_core=0, oversubscribe=False + ): self._cwd = cwd self._cores = cores + self._threads_per_core = threads_per_core self._gpus_per_core = gpus_per_core self._oversubscribe = oversubscribe @@ -21,10 +24,18 @@ def poll(self): class SubprocessInterface(BaseInterface): - def __init__(self, cwd=None, cores=1, gpus_per_core=0, oversubscribe=False): + def __init__( + self, + cwd=None, + cores=1, + threads_per_core=1, + gpus_per_core=0, + oversubscribe=False, + ): super().__init__( cwd=cwd, cores=cores, + threads_per_core=threads_per_core, gpus_per_core=gpus_per_core, oversubscribe=oversubscribe, ) @@ -72,6 +83,7 @@ def generate_command(self, command_lst): command_prepend_lst = generate_slurm_command( cores=self._cores, cwd=self._cwd, + threads_per_core=self._threads_per_core, gpus_per_core=self._gpus_per_core, oversubscribe=self._oversubscribe, ) @@ -142,6 +154,8 @@ def generate_command(self, command_lst): command_prepend_lst += [ "--cwd=" + self._cwd, ] + if self._threads_per_core > 1: + command_prepend_lst += ["--cores-per-task=" + str(self._threads_per_core)] if self._gpus_per_core > 0: command_prepend_lst += ["--gpus-per-task=" + str(self._gpus_per_core)] return super().generate_command( @@ -151,12 +165,19 @@ def generate_command(self, command_lst): class FluxPythonInterface(BaseInterface): def __init__( - self, cwd=None, cores=1, gpus_per_core=0, oversubscribe=False, executor=None + self, + cwd=None, + cores=1, + threads_per_core=1, + gpus_per_core=0, + oversubscribe=False, + executor=None, ): super().__init__( cwd=cwd, cores=cores, gpus_per_core=gpus_per_core, + threads_per_core=threads_per_core, oversubscribe=oversubscribe, ) self._executor = executor @@ -174,7 +195,7 @@ def bootup(self, command_lst): jobspec = flux.job.JobspecV1.from_command( command=command_lst, num_tasks=self._cores, - cores_per_task=1, + cores_per_task=self._threads_per_core, gpus_per_task=self._gpus_per_core, num_nodes=None, exclusive=False, @@ -196,8 +217,12 @@ def poll(self): return self._future is not None and not self._future.done() -def generate_slurm_command(cores, cwd, gpus_per_core=0, oversubscribe=False): +def generate_slurm_command( + cores, cwd, threads_per_core=1, gpus_per_core=0, oversubscribe=False +): command_prepend_lst = ["srun", "-n", str(cores), "-D", cwd] + if threads_per_core > 1: + command_prepend_lst += ["--cpus-per-task" + str(threads_per_core)] if gpus_per_core > 0: command_prepend_lst += ["--gpus-per-task=" + str(gpus_per_core)] if oversubscribe: diff --git a/tests/test_flux.py b/tests/test_flux.py index efa3be47..3b54ffbf 100644 --- a/tests/test_flux.py +++ b/tests/test_flux.py @@ -48,6 +48,15 @@ def test_flux_executor_serial(self): self.assertTrue(fs_1.done()) self.assertTrue(fs_2.done()) + def test_flux_executor_threads(self): + with PyFluxExecutor(max_workers=1, threads_per_core=2, executor=self.executor) as exe: + fs_1 = exe.submit(calc, 1) + fs_2 = exe.submit(calc, 2) + self.assertEqual(fs_1.result(), 1) + self.assertEqual(fs_2.result(), 2) + self.assertTrue(fs_1.done()) + self.assertTrue(fs_2.done()) + def test_flux_executor_parallel(self): with PyFluxExecutor(max_workers=1, cores_per_worker=2, executor=self.executor) as exe: fs_1 = exe.submit(mpi_funct, 1) @@ -73,6 +82,21 @@ def test_execute_task(self): self.assertEqual(f.result(), 2) q.join() + def test_execute_task_threads(self): + f = Future() + q = Queue() + q.put({"fn": calc, 'args': (), "kwargs": {"i": 2}, "future": f}) + q.put({"shutdown": True, "wait": True}) + cloudpickle_register(ind=1) + execute_parallel_tasks( + future_queue=q, + cores=1, + threads_per_core=1, + executor=self.executor + ) + self.assertEqual(f.result(), 2) + q.join() + def test_internal_memory(self): with SingleTaskExecutor(cores=1, init_function=set_global, executor=self.executor) as p: f = p.submit(get_global) @@ -89,3 +113,13 @@ def test_executor_broker(self): self.assertTrue(f.done()) self.assertEqual(f.result(), 1) q.join() + + def test_executor_broker_threads(self): + q = Queue() + f = Future() + q.put({"fn": calc, "args": (1,), "kwargs": {}, "future": f}) + q.put({"shutdown": True, "wait": True}) + executor_broker(future_queue=q, max_workers=1, threads_per_core=2, executor=self.executor) + self.assertTrue(f.done()) + self.assertEqual(f.result(), 1) + q.join()