From e88ad4f086d2216f61117ac765322153ea5ba84c Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Mon, 31 Jul 2023 16:47:56 -0600 Subject: [PATCH 1/6] Add OpenMP support --- pympipool/interfaces/fluxbroker.py | 12 ++++++++++++ pympipool/shared/connections.py | 18 +++++++++++++----- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/pympipool/interfaces/fluxbroker.py b/pympipool/interfaces/fluxbroker.py index 06d68ec7..b6a33aa1 100644 --- a/pympipool/interfaces/fluxbroker.py +++ b/pympipool/interfaces/fluxbroker.py @@ -63,6 +63,7 @@ def init_k(): def __init__( self, cores, + threads_per_core=1, gpus_per_task=0, init_function=None, cwd=None, @@ -74,6 +75,7 @@ def __init__( kwargs={ "future_queue": self._future_queue, "cores": cores, + "threads_per_core": threads_per_core, "gpus_per_task": gpus_per_task, "cwd": cwd, "executor": executor, @@ -92,6 +94,7 @@ def __init__( self, max_workers, cores_per_worker=1, + threads_per_core=1, gpus_per_worker=0, init_function=None, cwd=None, @@ -105,6 +108,7 @@ def __init__( "future_queue": self._future_queue, "max_workers": max_workers, "cores_per_worker": cores_per_worker, + "threads_per_core": threads_per_core, "gpus_per_worker": gpus_per_worker, "init_function": init_function, "cwd": cwd, @@ -118,6 +122,7 @@ def __init__( def execute_parallel_tasks( future_queue, cores, + threads_per_core=1, gpus_per_task=0, cwd=None, executor=None, @@ -140,6 +145,7 @@ def execute_parallel_tasks( command_lst=command_lst, cwd=cwd, cores=cores, + threads_per_core=threads_per_core, gpus_per_core=gpus_per_task, executor=executor, ) @@ -150,6 +156,7 @@ def interface_bootup( command_lst, cwd=None, cores=1, + threads_per_core=1, gpus_per_core=0, executor=None, ): @@ -160,6 +167,7 @@ def interface_bootup( connections = FluxPythonInterface( cwd=cwd, cores=cores, + threads_per_core=threads_per_core, gpus_per_core=gpus_per_core, oversubscribe=False, executor=executor, @@ -177,6 +185,7 @@ def executor_broker( future_queue, max_workers, cores_per_worker=1, + threads_per_core=1, gpus_per_worker=0, init_function=None, cwd=None, @@ -186,6 +195,7 @@ def executor_broker( meta_future_lst = _get_executor_list( max_workers=max_workers, cores_per_worker=cores_per_worker, + threads_per_core=threads_per_core, gpus_per_worker=gpus_per_worker, init_function=init_function, cwd=cwd, @@ -207,6 +217,7 @@ def executor_broker( def _get_executor_list( max_workers, cores_per_worker=1, + threads_per_core=1, gpus_per_worker=0, init_function=None, cwd=None, @@ -217,6 +228,7 @@ def _get_executor_list( future=_get_future_done(), executor=SingleTaskExecutor( cores=cores_per_worker, + threads_per_core=threads_per_core, gpus_per_task=int(gpus_per_worker / cores_per_worker), init_function=init_function, cwd=cwd, diff --git a/pympipool/shared/connections.py b/pympipool/shared/connections.py index ac0dd7dc..a74b4e85 100644 --- a/pympipool/shared/connections.py +++ b/pympipool/shared/connections.py @@ -4,9 +4,10 @@ class BaseInterface(ABC): - def __init__(self, cwd, cores=1, gpus_per_core=0, oversubscribe=False): + def __init__(self, cwd, cores=1, threads_per_core=1, gpus_per_core=0, oversubscribe=False): self._cwd = cwd self._cores = cores + self._threads_per_core = threads_per_core self._gpus_per_core = gpus_per_core self._oversubscribe = oversubscribe @@ -21,10 +22,11 @@ def poll(self): class SubprocessInterface(BaseInterface): - def __init__(self, cwd=None, cores=1, gpus_per_core=0, oversubscribe=False): + def __init__(self, cwd=None, cores=1, threads_per_core=1, gpus_per_core=0, oversubscribe=False): super().__init__( cwd=cwd, cores=cores, + threads_per_core=threads_per_core, gpus_per_core=gpus_per_core, oversubscribe=oversubscribe, ) @@ -72,6 +74,7 @@ def generate_command(self, command_lst): command_prepend_lst = generate_slurm_command( cores=self._cores, cwd=self._cwd, + threads_per_core=self._threads_per_core, gpus_per_core=self._gpus_per_core, oversubscribe=self._oversubscribe, ) @@ -142,6 +145,8 @@ def generate_command(self, command_lst): command_prepend_lst += [ "--cwd=" + self._cwd, ] + if self._threads_per_core > 1: + command_prepend_lst += ["--cores-per-task=" + str(self._threads_per_core)] if self._gpus_per_core > 0: command_prepend_lst += ["--gpus-per-task=" + str(self._gpus_per_core)] return super().generate_command( @@ -151,12 +156,13 @@ def generate_command(self, command_lst): class FluxPythonInterface(BaseInterface): def __init__( - self, cwd=None, cores=1, gpus_per_core=0, oversubscribe=False, executor=None + self, cwd=None, cores=1, threads_per_core=1, gpus_per_core=0, oversubscribe=False, executor=None ): super().__init__( cwd=cwd, cores=cores, gpus_per_core=gpus_per_core, + threads_per_core=threads_per_core, oversubscribe=oversubscribe, ) self._executor = executor @@ -174,7 +180,7 @@ def bootup(self, command_lst): jobspec = flux.job.JobspecV1.from_command( command=command_lst, num_tasks=self._cores, - cores_per_task=1, + cores_per_task=self._threads_per_core, gpus_per_task=self._gpus_per_core, num_nodes=None, exclusive=False, @@ -196,8 +202,10 @@ def poll(self): return self._future is not None and not self._future.done() -def generate_slurm_command(cores, cwd, gpus_per_core=0, oversubscribe=False): +def generate_slurm_command(cores, cwd, threads_per_core=1, gpus_per_core=0, oversubscribe=False): command_prepend_lst = ["srun", "-n", str(cores), "-D", cwd] + if threads_per_core > 1: + command_prepend_lst += ["--cpus-per-task" + str(threads_per_core)] if gpus_per_core > 0: command_prepend_lst += ["--gpus-per-task=" + str(gpus_per_core)] if oversubscribe: From 792f012587f2a57eefce181e8e93902399a1840c Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Mon, 31 Jul 2023 17:01:20 -0600 Subject: [PATCH 2/6] black formatting --- pympipool/shared/connections.py | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/pympipool/shared/connections.py b/pympipool/shared/connections.py index a74b4e85..de7be7dd 100644 --- a/pympipool/shared/connections.py +++ b/pympipool/shared/connections.py @@ -4,7 +4,9 @@ class BaseInterface(ABC): - def __init__(self, cwd, cores=1, threads_per_core=1, gpus_per_core=0, oversubscribe=False): + def __init__( + self, cwd, cores=1, threads_per_core=1, gpus_per_core=0, oversubscribe=False + ): self._cwd = cwd self._cores = cores self._threads_per_core = threads_per_core @@ -22,7 +24,14 @@ def poll(self): class SubprocessInterface(BaseInterface): - def __init__(self, cwd=None, cores=1, threads_per_core=1, gpus_per_core=0, oversubscribe=False): + def __init__( + self, + cwd=None, + cores=1, + threads_per_core=1, + gpus_per_core=0, + oversubscribe=False, + ): super().__init__( cwd=cwd, cores=cores, @@ -156,7 +165,13 @@ def generate_command(self, command_lst): class FluxPythonInterface(BaseInterface): def __init__( - self, cwd=None, cores=1, threads_per_core=1, gpus_per_core=0, oversubscribe=False, executor=None + self, + cwd=None, + cores=1, + threads_per_core=1, + gpus_per_core=0, + oversubscribe=False, + executor=None, ): super().__init__( cwd=cwd, @@ -202,7 +217,9 @@ def poll(self): return self._future is not None and not self._future.done() -def generate_slurm_command(cores, cwd, threads_per_core=1, gpus_per_core=0, oversubscribe=False): +def generate_slurm_command( + cores, cwd, threads_per_core=1, gpus_per_core=0, oversubscribe=False +): command_prepend_lst = ["srun", "-n", str(cores), "-D", cwd] if threads_per_core > 1: command_prepend_lst += ["--cpus-per-task" + str(threads_per_core)] From 0ef68b31b94ebb887fa505098fd2dc5028d18732 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 4 Aug 2023 11:05:19 -0600 Subject: [PATCH 3/6] update docstrings --- pympipool/interfaces/fluxbroker.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/pympipool/interfaces/fluxbroker.py b/pympipool/interfaces/fluxbroker.py index 5a778681..6815e2f4 100644 --- a/pympipool/interfaces/fluxbroker.py +++ b/pympipool/interfaces/fluxbroker.py @@ -28,14 +28,10 @@ class SingleTaskExecutor(ExecutorBase): Args: cores (int): defines the number of MPI ranks to use for each function call + threads_per_core (int): number of OpenMP threads to be used for each function call gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 - oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False - enable_flux_backend (bool): use the flux-framework as backend rather than just calling mpiexec - enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False init_function (None): optional function to preset arguments for functions which are submitted later cwd (str/None): current working directory where the parallel python task is executed - queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems - queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter Simple example: ``` @@ -132,6 +128,7 @@ def execute_parallel_tasks( Args: future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process cores (int): defines the total number of MPI ranks to use + threads_per_core (int): number of OpenMP threads to be used for each function call gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 cwd (str/None): current working directory where the parallel python task is executed executor (flux.job.FluxExecutor/None): flux executor to submit tasks to - optional From 005ffab13749a047bc312cf2d381b924fce150b3 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 4 Aug 2023 12:58:18 -0600 Subject: [PATCH 4/6] Some more docstring clean up --- pympipool/interfaces/fluxbroker.py | 36 ++++++++++++------------- pympipool/interfaces/taskexecutor.py | 35 ++++++++++++------------ pympipool/legacy/interfaces/executor.py | 24 ++++++++--------- pympipool/legacy/interfaces/pool.py | 18 ++++++------- 4 files changed, 56 insertions(+), 57 deletions(-) diff --git a/pympipool/interfaces/fluxbroker.py b/pympipool/interfaces/fluxbroker.py index 673fd129..a66e786b 100644 --- a/pympipool/interfaces/fluxbroker.py +++ b/pympipool/interfaces/fluxbroker.py @@ -33,25 +33,25 @@ class SingleTaskExecutor(ExecutorBase): init_function (None): optional function to preset arguments for functions which are submitted later cwd (str/None): current working directory where the parallel python task is executed - Simple example: + Examples: ``` - import numpy as np - from pympipool import Executor - - def calc(i, j, k): - from mpi4py import MPI - size = MPI.COMM_WORLD.Get_size() - rank = MPI.COMM_WORLD.Get_rank() - return np.array([i, j, k]), size, rank - - def init_k(): - return {"k": 3} - - with Executor(cores=2, init_function=init_k) as p: - fs = p.submit(calc, 2, j=4) - print(fs.result()) - - >>> [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] + >>> import numpy as np + >>> from pympipool import Executor + >>> + >>> def calc(i, j, k): + >>> from mpi4py import MPI + >>> size = MPI.COMM_WORLD.Get_size() + >>> rank = MPI.COMM_WORLD.Get_rank() + >>> return np.array([i, j, k]), size, rank + >>> + >>> def init_k(): + >>> return {"k": 3} + >>> + >>> with Executor(cores=2, init_function=init_k) as p: + >>> fs = p.submit(calc, 2, j=4) + >>> print(fs.result()) + + [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] ``` """ diff --git a/pympipool/interfaces/taskexecutor.py b/pympipool/interfaces/taskexecutor.py index 5fae9c43..79a0e78c 100644 --- a/pympipool/interfaces/taskexecutor.py +++ b/pympipool/interfaces/taskexecutor.py @@ -25,25 +25,24 @@ class Executor(ExecutorBase): queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter - Simple example: + Examples: ``` - import numpy as np - from pympipool import Executor - - def calc(i, j, k): - from mpi4py import MPI - size = MPI.COMM_WORLD.Get_size() - rank = MPI.COMM_WORLD.Get_rank() - return np.array([i, j, k]), size, rank - - def init_k(): - return {"k": 3} - - with Executor(cores=2, init_function=init_k) as p: - fs = p.submit(calc, 2, j=4) - print(fs.result()) - - >>> [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] + >>> import numpy as np + >>> from pympipool import Executor + >>> + >>> def calc(i, j, k): + >>> from mpi4py import MPI + >>> size = MPI.COMM_WORLD.Get_size() + >>> rank = MPI.COMM_WORLD.Get_rank() + >>> return np.array([i, j, k]), size, rank + >>> + >>> def init_k(): + >>> return {"k": 3} + >>> + >>> with Executor(cores=2, init_function=init_k) as p: + >>> fs = p.submit(calc, 2, j=4) + >>> print(fs.result()) + [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] ``` """ diff --git a/pympipool/legacy/interfaces/executor.py b/pympipool/legacy/interfaces/executor.py index 56b2905e..c813acf1 100644 --- a/pympipool/legacy/interfaces/executor.py +++ b/pympipool/legacy/interfaces/executor.py @@ -24,19 +24,19 @@ class PoolExecutor(ExecutorBase): queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter - Simple example: + Examples: ``` - from pympipool import PoolExecutor - - def calc(i, j): - return i + j - - with PoolExecutor(max_workers=2) as p: - fs1 = p.submit(calc, 1, 2) - fs2 = p.submit(calc, 3, 4) - fs3 = p.submit(calc, 5, 6) - fs4 = p.submit(calc, 7, 8) - print(fs1.result(), fs2.result(), fs3.result(), fs4.result() + >>> from pympipool import PoolExecutor + >>> + >>> def calc(i, j): + >>> return i + j + >>> + >>> with PoolExecutor(max_workers=2) as p: + >>> fs1 = p.submit(calc, 1, 2) + >>> fs2 = p.submit(calc, 3, 4) + >>> fs3 = p.submit(calc, 5, 6) + >>> fs4 = p.submit(calc, 7, 8) + >>> print(fs1.result(), fs2.result(), fs3.result(), fs4.result() ``` """ diff --git a/pympipool/legacy/interfaces/pool.py b/pympipool/legacy/interfaces/pool.py index fffdd60b..da9b37fc 100644 --- a/pympipool/legacy/interfaces/pool.py +++ b/pympipool/legacy/interfaces/pool.py @@ -45,16 +45,16 @@ class Pool(PoolBase): queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter - Simple example: + Examples: ``` - import numpy as np - from pympipool import Pool - - def calc(i): - return np.array(i ** 2) - - with Pool(cores=2) as p: - print(p.map(func=calc, iterable=[1, 2, 3, 4])) + >>> import numpy as np + >>> from pympipool import Pool + >>> + >>> def calc(i): + >>> return np.array(i ** 2) + >>> + >>> with Pool(cores=2) as p: + >>> print(p.map(func=calc, iterable=[1, 2, 3, 4])) ``` """ From 2c0b5fbebe38769678ab4f6274f72124c3ca4af3 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 4 Aug 2023 13:12:05 -0600 Subject: [PATCH 5/6] Add some threading tests --- tests/test_flux.py | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/tests/test_flux.py b/tests/test_flux.py index efa3be47..e21b05c5 100644 --- a/tests/test_flux.py +++ b/tests/test_flux.py @@ -48,6 +48,15 @@ def test_flux_executor_serial(self): self.assertTrue(fs_1.done()) self.assertTrue(fs_2.done()) + def test_flux_executor_threads(self): + with PyFluxExecutor(max_workers=1, threads_per_core=2, executor=self.executor) as exe: + fs_1 = exe.submit(calc, 1) + fs_2 = exe.submit(calc, 2) + self.assertEqual(fs_1.result(), 1) + self.assertEqual(fs_2.result(), 2) + self.assertTrue(fs_1.done()) + self.assertTrue(fs_2.done()) + def test_flux_executor_parallel(self): with PyFluxExecutor(max_workers=1, cores_per_worker=2, executor=self.executor) as exe: fs_1 = exe.submit(mpi_funct, 1) @@ -73,6 +82,21 @@ def test_execute_task(self): self.assertEqual(f.result(), 2) q.join() + def test_execute_task_threads(self): + f = Future() + q = Queue() + q.put({"fn": calc, 'args': (), "kwargs": {"i": 2}, "future": f}) + q.put({"shutdown": True, "wait": True}) + cloudpickle_register(ind=1) + execute_parallel_tasks( + future_queue=q, + cores=1, + threads_per_core=1, + executor=self.executor + ) + self.assertEqual(f.result(), 2) + q.join() + def test_internal_memory(self): with SingleTaskExecutor(cores=1, init_function=set_global, executor=self.executor) as p: f = p.submit(get_global) @@ -89,3 +113,13 @@ def test_executor_broker(self): self.assertTrue(f.done()) self.assertEqual(f.result(), 1) q.join() + + def test_executor_broker_threads(self): + q = Queue() + f = Future() + q.put({"fn": calc, "args": (1,), "kwargs": {}, "future": f}) + q.put({"shutdown": True, "wait": True}) + executor_broker(future_queue=q, max_workers=1, cores_per_worker=2, executor=self.executor) + self.assertTrue(f.done()) + self.assertEqual(f.result(), 1) + q.join() From abe07ae2ce095e5b932a6f7c6b98e1c6d35f8721 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 4 Aug 2023 13:20:00 -0600 Subject: [PATCH 6/6] fix test --- tests/test_flux.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_flux.py b/tests/test_flux.py index e21b05c5..3b54ffbf 100644 --- a/tests/test_flux.py +++ b/tests/test_flux.py @@ -119,7 +119,7 @@ def test_executor_broker_threads(self): f = Future() q.put({"fn": calc, "args": (1,), "kwargs": {}, "future": f}) q.put({"shutdown": True, "wait": True}) - executor_broker(future_queue=q, max_workers=1, cores_per_worker=2, executor=self.executor) + executor_broker(future_queue=q, max_workers=1, threads_per_core=2, executor=self.executor) self.assertTrue(f.done()) self.assertEqual(f.result(), 1) q.join()