Skip to content

Commit 237f104

Browse files
authored
Merge pull request #149 from pyiron/openmp
Add OpenMP support
2 parents f0c1f7f + abe07ae commit 237f104

File tree

6 files changed

+117
-45
lines changed

6 files changed

+117
-45
lines changed

pympipool/interfaces/fluxbroker.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,12 @@ class SingleTaskExecutor(ExecutorBase):
2828
2929
Args:
3030
cores (int): defines the number of MPI ranks to use for each function call
31+
threads_per_core (int): number of OpenMP threads to be used for each function call
3132
gpus_per_task (int): number of GPUs per MPI rank - defaults to 0
3233
init_function (None): optional function to preset arguments for functions which are submitted later
3334
cwd (str/None): current working directory where the parallel python task is executed
3435
35-
Simple example:
36+
Examples:
3637
```
3738
>>> import numpy as np
3839
>>> from pympipool import Executor
@@ -57,6 +58,7 @@ class SingleTaskExecutor(ExecutorBase):
5758
def __init__(
5859
self,
5960
cores,
61+
threads_per_core=1,
6062
gpus_per_task=0,
6163
init_function=None,
6264
cwd=None,
@@ -68,6 +70,7 @@ def __init__(
6870
kwargs={
6971
"future_queue": self._future_queue,
7072
"cores": cores,
73+
"threads_per_core": threads_per_core,
7174
"gpus_per_task": gpus_per_task,
7275
"cwd": cwd,
7376
"executor": executor,
@@ -86,6 +89,7 @@ def __init__(
8689
self,
8790
max_workers,
8891
cores_per_worker=1,
92+
threads_per_core=1,
8993
gpus_per_worker=0,
9094
init_function=None,
9195
cwd=None,
@@ -99,6 +103,7 @@ def __init__(
99103
"future_queue": self._future_queue,
100104
"max_workers": max_workers,
101105
"cores_per_worker": cores_per_worker,
106+
"threads_per_core": threads_per_core,
102107
"gpus_per_worker": gpus_per_worker,
103108
"init_function": init_function,
104109
"cwd": cwd,
@@ -112,6 +117,7 @@ def __init__(
112117
def execute_parallel_tasks(
113118
future_queue,
114119
cores,
120+
threads_per_core=1,
115121
gpus_per_task=0,
116122
cwd=None,
117123
executor=None,
@@ -122,6 +128,7 @@ def execute_parallel_tasks(
122128
Args:
123129
future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
124130
cores (int): defines the total number of MPI ranks to use
131+
threads_per_core (int): number of OpenMP threads to be used for each function call
125132
gpus_per_task (int): number of GPUs per MPI rank - defaults to 0
126133
cwd (str/None): current working directory where the parallel python task is executed
127134
executor (flux.job.FluxExecutor/None): flux executor to submit tasks to - optional
@@ -141,6 +148,7 @@ def execute_parallel_tasks(
141148
command_lst=command_lst,
142149
cwd=cwd,
143150
cores=cores,
151+
threads_per_core=threads_per_core,
144152
gpus_per_core=gpus_per_task,
145153
executor=executor,
146154
)
@@ -151,6 +159,7 @@ def interface_bootup(
151159
command_lst,
152160
cwd=None,
153161
cores=1,
162+
threads_per_core=1,
154163
gpus_per_core=0,
155164
executor=None,
156165
):
@@ -161,6 +170,7 @@ def interface_bootup(
161170
connections = FluxPythonInterface(
162171
cwd=cwd,
163172
cores=cores,
173+
threads_per_core=threads_per_core,
164174
gpus_per_core=gpus_per_core,
165175
oversubscribe=False,
166176
executor=executor,
@@ -178,6 +188,7 @@ def executor_broker(
178188
future_queue,
179189
max_workers,
180190
cores_per_worker=1,
191+
threads_per_core=1,
181192
gpus_per_worker=0,
182193
init_function=None,
183194
cwd=None,
@@ -187,6 +198,7 @@ def executor_broker(
187198
meta_future_lst = _get_executor_list(
188199
max_workers=max_workers,
189200
cores_per_worker=cores_per_worker,
201+
threads_per_core=threads_per_core,
190202
gpus_per_worker=gpus_per_worker,
191203
init_function=init_function,
192204
cwd=cwd,
@@ -208,6 +220,7 @@ def executor_broker(
208220
def _get_executor_list(
209221
max_workers,
210222
cores_per_worker=1,
223+
threads_per_core=1,
211224
gpus_per_worker=0,
212225
init_function=None,
213226
cwd=None,
@@ -216,6 +229,7 @@ def _get_executor_list(
216229
return {
217230
get_future_done(): SingleTaskExecutor(
218231
cores=cores_per_worker,
232+
threads_per_core=threads_per_core,
219233
gpus_per_task=int(gpus_per_worker / cores_per_worker),
220234
init_function=init_function,
221235
cwd=cwd,

pympipool/interfaces/taskexecutor.py

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,25 +25,24 @@ class Executor(ExecutorBase):
2525
queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems
2626
queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter
2727
28-
Simple example:
28+
Examples:
2929
```
30-
import numpy as np
31-
from pympipool import Executor
32-
33-
def calc(i, j, k):
34-
from mpi4py import MPI
35-
size = MPI.COMM_WORLD.Get_size()
36-
rank = MPI.COMM_WORLD.Get_rank()
37-
return np.array([i, j, k]), size, rank
38-
39-
def init_k():
40-
return {"k": 3}
41-
42-
with Executor(cores=2, init_function=init_k) as p:
43-
fs = p.submit(calc, 2, j=4)
44-
print(fs.result())
45-
46-
>>> [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
30+
>>> import numpy as np
31+
>>> from pympipool import Executor
32+
>>>
33+
>>> def calc(i, j, k):
34+
>>> from mpi4py import MPI
35+
>>> size = MPI.COMM_WORLD.Get_size()
36+
>>> rank = MPI.COMM_WORLD.Get_rank()
37+
>>> return np.array([i, j, k]), size, rank
38+
>>>
39+
>>> def init_k():
40+
>>> return {"k": 3}
41+
>>>
42+
>>> with Executor(cores=2, init_function=init_k) as p:
43+
>>> fs = p.submit(calc, 2, j=4)
44+
>>> print(fs.result())
45+
[(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
4746
```
4847
"""
4948

pympipool/legacy/interfaces/executor.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,19 @@ class PoolExecutor(ExecutorBase):
2424
queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems
2525
queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter
2626
27-
Simple example:
27+
Examples:
2828
```
29-
from pympipool import PoolExecutor
30-
31-
def calc(i, j):
32-
return i + j
33-
34-
with PoolExecutor(max_workers=2) as p:
35-
fs1 = p.submit(calc, 1, 2)
36-
fs2 = p.submit(calc, 3, 4)
37-
fs3 = p.submit(calc, 5, 6)
38-
fs4 = p.submit(calc, 7, 8)
39-
print(fs1.result(), fs2.result(), fs3.result(), fs4.result()
29+
>>> from pympipool import PoolExecutor
30+
>>>
31+
>>> def calc(i, j):
32+
>>> return i + j
33+
>>>
34+
>>> with PoolExecutor(max_workers=2) as p:
35+
>>> fs1 = p.submit(calc, 1, 2)
36+
>>> fs2 = p.submit(calc, 3, 4)
37+
>>> fs3 = p.submit(calc, 5, 6)
38+
>>> fs4 = p.submit(calc, 7, 8)
39+
>>> print(fs1.result(), fs2.result(), fs3.result(), fs4.result()
4040
```
4141
"""
4242

pympipool/legacy/interfaces/pool.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,16 +45,16 @@ class Pool(PoolBase):
4545
queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems
4646
queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter
4747
48-
Simple example:
48+
Examples:
4949
```
50-
import numpy as np
51-
from pympipool import Pool
52-
53-
def calc(i):
54-
return np.array(i ** 2)
55-
56-
with Pool(cores=2) as p:
57-
print(p.map(func=calc, iterable=[1, 2, 3, 4]))
50+
>>> import numpy as np
51+
>>> from pympipool import Pool
52+
>>>
53+
>>> def calc(i):
54+
>>> return np.array(i ** 2)
55+
>>>
56+
>>> with Pool(cores=2) as p:
57+
>>> print(p.map(func=calc, iterable=[1, 2, 3, 4]))
5858
```
5959
"""
6060

pympipool/shared/connections.py

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,12 @@
44

55

66
class BaseInterface(ABC):
7-
def __init__(self, cwd, cores=1, gpus_per_core=0, oversubscribe=False):
7+
def __init__(
8+
self, cwd, cores=1, threads_per_core=1, gpus_per_core=0, oversubscribe=False
9+
):
810
self._cwd = cwd
911
self._cores = cores
12+
self._threads_per_core = threads_per_core
1013
self._gpus_per_core = gpus_per_core
1114
self._oversubscribe = oversubscribe
1215

@@ -21,10 +24,18 @@ def poll(self):
2124

2225

2326
class SubprocessInterface(BaseInterface):
24-
def __init__(self, cwd=None, cores=1, gpus_per_core=0, oversubscribe=False):
27+
def __init__(
28+
self,
29+
cwd=None,
30+
cores=1,
31+
threads_per_core=1,
32+
gpus_per_core=0,
33+
oversubscribe=False,
34+
):
2535
super().__init__(
2636
cwd=cwd,
2737
cores=cores,
38+
threads_per_core=threads_per_core,
2839
gpus_per_core=gpus_per_core,
2940
oversubscribe=oversubscribe,
3041
)
@@ -72,6 +83,7 @@ def generate_command(self, command_lst):
7283
command_prepend_lst = generate_slurm_command(
7384
cores=self._cores,
7485
cwd=self._cwd,
86+
threads_per_core=self._threads_per_core,
7587
gpus_per_core=self._gpus_per_core,
7688
oversubscribe=self._oversubscribe,
7789
)
@@ -142,6 +154,8 @@ def generate_command(self, command_lst):
142154
command_prepend_lst += [
143155
"--cwd=" + self._cwd,
144156
]
157+
if self._threads_per_core > 1:
158+
command_prepend_lst += ["--cores-per-task=" + str(self._threads_per_core)]
145159
if self._gpus_per_core > 0:
146160
command_prepend_lst += ["--gpus-per-task=" + str(self._gpus_per_core)]
147161
return super().generate_command(
@@ -151,12 +165,19 @@ def generate_command(self, command_lst):
151165

152166
class FluxPythonInterface(BaseInterface):
153167
def __init__(
154-
self, cwd=None, cores=1, gpus_per_core=0, oversubscribe=False, executor=None
168+
self,
169+
cwd=None,
170+
cores=1,
171+
threads_per_core=1,
172+
gpus_per_core=0,
173+
oversubscribe=False,
174+
executor=None,
155175
):
156176
super().__init__(
157177
cwd=cwd,
158178
cores=cores,
159179
gpus_per_core=gpus_per_core,
180+
threads_per_core=threads_per_core,
160181
oversubscribe=oversubscribe,
161182
)
162183
self._executor = executor
@@ -174,7 +195,7 @@ def bootup(self, command_lst):
174195
jobspec = flux.job.JobspecV1.from_command(
175196
command=command_lst,
176197
num_tasks=self._cores,
177-
cores_per_task=1,
198+
cores_per_task=self._threads_per_core,
178199
gpus_per_task=self._gpus_per_core,
179200
num_nodes=None,
180201
exclusive=False,
@@ -196,8 +217,12 @@ def poll(self):
196217
return self._future is not None and not self._future.done()
197218

198219

199-
def generate_slurm_command(cores, cwd, gpus_per_core=0, oversubscribe=False):
220+
def generate_slurm_command(
221+
cores, cwd, threads_per_core=1, gpus_per_core=0, oversubscribe=False
222+
):
200223
command_prepend_lst = ["srun", "-n", str(cores), "-D", cwd]
224+
if threads_per_core > 1:
225+
command_prepend_lst += ["--cpus-per-task" + str(threads_per_core)]
201226
if gpus_per_core > 0:
202227
command_prepend_lst += ["--gpus-per-task=" + str(gpus_per_core)]
203228
if oversubscribe:

tests/test_flux.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,15 @@ def test_flux_executor_serial(self):
4848
self.assertTrue(fs_1.done())
4949
self.assertTrue(fs_2.done())
5050

51+
def test_flux_executor_threads(self):
52+
with PyFluxExecutor(max_workers=1, threads_per_core=2, executor=self.executor) as exe:
53+
fs_1 = exe.submit(calc, 1)
54+
fs_2 = exe.submit(calc, 2)
55+
self.assertEqual(fs_1.result(), 1)
56+
self.assertEqual(fs_2.result(), 2)
57+
self.assertTrue(fs_1.done())
58+
self.assertTrue(fs_2.done())
59+
5160
def test_flux_executor_parallel(self):
5261
with PyFluxExecutor(max_workers=1, cores_per_worker=2, executor=self.executor) as exe:
5362
fs_1 = exe.submit(mpi_funct, 1)
@@ -73,6 +82,21 @@ def test_execute_task(self):
7382
self.assertEqual(f.result(), 2)
7483
q.join()
7584

85+
def test_execute_task_threads(self):
86+
f = Future()
87+
q = Queue()
88+
q.put({"fn": calc, 'args': (), "kwargs": {"i": 2}, "future": f})
89+
q.put({"shutdown": True, "wait": True})
90+
cloudpickle_register(ind=1)
91+
execute_parallel_tasks(
92+
future_queue=q,
93+
cores=1,
94+
threads_per_core=1,
95+
executor=self.executor
96+
)
97+
self.assertEqual(f.result(), 2)
98+
q.join()
99+
76100
def test_internal_memory(self):
77101
with SingleTaskExecutor(cores=1, init_function=set_global, executor=self.executor) as p:
78102
f = p.submit(get_global)
@@ -89,3 +113,13 @@ def test_executor_broker(self):
89113
self.assertTrue(f.done())
90114
self.assertEqual(f.result(), 1)
91115
q.join()
116+
117+
def test_executor_broker_threads(self):
118+
q = Queue()
119+
f = Future()
120+
q.put({"fn": calc, "args": (1,), "kwargs": {}, "future": f})
121+
q.put({"shutdown": True, "wait": True})
122+
executor_broker(future_queue=q, max_workers=1, threads_per_core=2, executor=self.executor)
123+
self.assertTrue(f.done())
124+
self.assertEqual(f.result(), 1)
125+
q.join()

0 commit comments

Comments
 (0)