-
Notifications
You must be signed in to change notification settings - Fork 96
/
mpi.py
372 lines (293 loc) · 11.6 KB
/
mpi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
# This file is part of the pyMOR project (http://www.pymor.org).
# Copyright 2013-2017 pyMOR developers and contributors. All rights reserved.
# License: BSD 2-Clause License (http://opensource.org/licenses/BSD-2-Clause)
""" This module provides helper methods to use pyMOR in parallel with MPI.
Executing this module will run :func:`event_loop` on all MPI ranks
except for rank 0 where either a given script is executed::
mpirun -n 16 python -m pymor.tools.mpi /path/to/script
or an interactive session is started::
mpirun -n 16 python -m pymor.tools.mpi
When IPython is available, an IPython kernel is started which
the user can connect to by calling::
ipython console --existing file_name_printed_by_ipython.json
(Starting the IPython console directly will not work properly
with most MPI implementations.)
When IPython is not available, the builtin Python REPL is started.
When :func:`event_loop` is running on the MPI ranks, :func:`call`
can be used on rank 0 to execute the same Python function (given
as first argument) simultaneously on all MPI ranks (including
rank 0). Calling :func:`quit` will exit :func:`event_loop` on
all MPI ranks.
Additionally, this module provides several helper methods which are
intended to be used in conjunction with :func:`call`: :func:`mpi_info`
will print a summary of all active MPI ranks, :func:`run_code`
will execute the given code string on all MPI ranks,
:func:`import_module` imports the module with the given path.
A simple object management is implemented with the
:func:`manage_object`, :func:`get_object` and :func:`remove_object`
methods. It is the user's responsibility to ensure that calls to
:func:`manage_object` are executed in the same order on all MPI ranks
to ensure that the returned :class:`ObjectId` refers to the same
distributed object on all ranks. The functions :func:`function_call`,
:func:`function_call_manage`, :func:`method_call`,
:func:`method_call_manage` map instances :class:`ObjectId`
transparently to distributed objects. :func:`function_call_manage` and
:func:`method_call_manage` will call :func:`manage_object` on the
return value and return the corresponding :class:`ObjectId`. The functions
:func:`method_call` and :func:`method_call_manage` are given an
:class:`ObjectId` and a string as first and second argument and execute
the method named by the second argument on the object referred to by the
first argument.
"""
import sys
from packaging.version import Version
from pymor.core.config import config
from pymor.core.defaults import defaults
from pymor.core.pickle import dumps, loads
if config.HAVE_MPI:
import mpi4py
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
finished = False
mpi4py_version = Version(mpi4py.__version__)
if mpi4py_version == Version('2.0'):
import pymor.core.pickle
MPI.pickle.PROTOCOL = pymor.core.pickle.PROTOCOL
MPI.pickle.loads = pymor.core.pickle.loads
MPI.pickle.dumps = pymor.core.pickle.dumps
else:
mpi4py_version = Version('0.0')
rank = 0
size = 1
finished = True
rank0 = rank == 0
parallel = (size > 1)
_managed_objects = {}
_object_counter = 0
################################################################################
@defaults('auto_launch')
def event_loop_settings(auto_launch=True):
"""Settings for pyMOR's MPI event loop.
Parameters
----------
auto_launch
If `True`, automatically execute :func:`event_loop` on
all MPI ranks (except 0) when pyMOR is imported.
"""
return {'auto_launch': auto_launch}
if mpi4py_version == Version('2.0'):
def event_loop():
"""Launches an MPI-based event loop.
Events can be sent by either calling :func:`call` on
rank 0 to execute an arbitrary method on all ranks or
by calling :func:`quit` to exit the loop.
"""
assert not rank0
while True:
try:
method, args, kwargs = comm.bcast(None)
if method == 'QUIT':
break
else:
method(*args, **kwargs)
except:
import traceback
print("Caught exception on MPI rank {}:".format(rank))
traceback.print_exception(*sys.exc_info())
def call(method, *args, **kwargs):
"""Execute method on all MPI ranks.
Assuming :func:`event_loop` is running on all MPI ranks
(except rank 0), this will execute `method` on all
ranks (including rank 0) with positional arguments
`args` and keyword arguments `kwargs`.
Parameters
----------
method
The function to execute on all ranks (must be picklable).
args
The positional arguments for `method`.
kwargs
The keyword arguments for `method`.
Returns
-------
The return value of `method` on rank 0.
"""
assert rank0
if finished:
return
comm.bcast((method, args, kwargs), root=0)
return method(*args, **kwargs)
def quit():
"""Exit the event loop on all MPI ranks.
This will cause :func:`event_loop` to terminate on all
MPI ranks.
"""
global finished
comm.bcast(('QUIT', None, None))
finished = True
else:
# for older mpi4py versions we have to pickle manually to ensure
# that pymor.core.pickle is used which will correctly serialize
# lambdas, etc.
def event_loop():
"""Launches an MPI-based event loop.
Events can be sent by either calling :func:`call` on
rank 0 to execute an arbitrary method on all ranks or
by calling :func:`quit` to exit the loop.
"""
assert not rank0
while True:
try:
method, args, kwargs = loads(comm.bcast(None))
if method == 'QUIT':
break
else:
method(*args, **kwargs)
except:
import traceback
print("Caught exception on MPI rank {}:".format(rank))
traceback.print_exception(*sys.exc_info())
def call(method, *args, **kwargs):
"""Execute method on all MPI ranks.
Assuming :func:`event_loop` is running on all MPI ranks
(except rank 0), this will execute `method` on all
ranks (including rank 0) with positional arguments
`args` and keyword arguments `kwargs`.
Parameters
----------
method
The function to execute on all ranks (must be picklable).
args
The positional arguments for `method`.
kwargs
The keyword arguments for `method`.
Returns
-------
The return value of `method` on rank 0.
"""
assert rank0
if finished:
return
comm.bcast(dumps((method, args, kwargs)), root=0)
return method(*args, **kwargs)
def quit():
"""Exit the event loop on all MPI ranks.
This will cause :func:`event_loop` to terminate on all
MPI ranks.
"""
global finished
comm.bcast(dumps(('QUIT', None, None)))
finished = True
################################################################################
def mpi_info():
"""Print some information on the MPI setup.
Intended to be used in conjunction with :func:`call`.
"""
data = comm.gather((rank, MPI.Get_processor_name()), root=0)
if rank0:
print('\n'.join('{}: {}'.format(rank, processor) for rank, processor in data))
def run_code(code):
"""Execute the code string `code`.
Intended to be used in conjunction with :func:`call`.
"""
exec(code)
def import_module(path):
"""Import the module named by `path`.
Intended to be used in conjunction with :func:`call`.
"""
__import__(path)
def function_call(f, *args, **kwargs):
"""Execute the function `f` with given arguments.
Intended to be used in conjunction with :func:`call`.
Arguments of type :class:`ObjectId` are transparently
mapped to the object they refer to.
"""
return f(*((get_object(arg) if type(arg) is ObjectId else arg) for arg in args),
**{k: (get_object(v) if type(v) is ObjectId else v) for k, v in kwargs.items()})
def function_call_manage(f, *args, **kwargs):
"""Execute the function `f` and manage the return value.
Intended to be used in conjunction with :func:`call`.
The return value of `f` is managed by calling :func:`manage_object`
and the corresponding :class:`ObjectId` is returned.
Arguments of type :class:`ObjectId` are transparently
mapped to the object they refer to.
"""
return manage_object(function_call(f, *args, **kwargs))
def method_call(obj_id, name_, *args, **kwargs):
"""Execute a method with given arguments.
Intended to be used in conjunction with :func:`call`.
Arguments of type :class:`ObjectId` are transparently
mapped to the object they refer to.
Parameters
----------
obj_id
The :class:`ObjectId` of the object on which to call
the method.
`name_`
Name of the method to call.
args
Positional arguments for the method.
kwargs
Keyword arguments for the method.
"""
obj = get_object(obj_id)
return getattr(obj, name_)(*((get_object(arg) if type(arg) is ObjectId else arg) for arg in args),
**{k: (get_object(v) if type(v) is ObjectId else v) for k, v in kwargs.items()})
def method_call_manage(obj_id, name_, *args, **kwargs):
"""Execute a method with given arguments and manage the return value.
Intended to be used in conjunction with :func:`call`.
The return value of the called method is managed by calling
:func:`manage_object` and the corresponding :class:`ObjectId`
is returned. Arguments of type :class:`ObjectId` are transparently
mapped to the object they refer to.
Parameters
----------
obj_id
The :class:`ObjectId` of the object on which to call
the method.
`name_`
Name of the method to call.
args
Positional arguments for the method.
kwargs
Keyword arguments for the method.
"""
return manage_object(method_call(obj_id, name_, *args, **kwargs))
################################################################################
class ObjectId(int):
"""A handle to an MPI distributed object."""
pass
def manage_object(obj):
"""Keep track of `obj` and return an :class:`ObjectId` handle."""
global _object_counter
obj_id = ObjectId(_object_counter)
_managed_objects[obj_id] = obj
_object_counter += 1
return obj_id
def get_object(obj_id):
"""Return the object referred to by `obj_id`."""
return _managed_objects[obj_id]
def remove_object(obj_id):
"""Remove the object referred to by `obj_id` from the registry."""
del _managed_objects[obj_id]
################################################################################
if __name__ == '__main__':
assert config.HAVE_MPI
if rank0:
if len(sys.argv) >= 2:
filename = sys.argv[1]
sys.argv = sys.argv[:1] + sys.argv[2:]
exec(compile(open(filename, 'rt').read(), filename, 'exec'))
import pymor.tools.mpi # this is different from __main__
pymor.tools.mpi.quit() # change global state in the right module
else:
try:
import IPython
IPython.start_kernel() # only start a kernel since mpirun messes up the terminal
except ImportError:
import code
code.interact()
else:
event_loop()