Skip to content
Browse files

wave2d example using single view, instead of repeated 'rc[:]'

  • Loading branch information...
1 parent ee9089a commit 540ed0251a3ddf347fa77f987dfd8c4bbfbe2e73 @minrk minrk committed
View
2 docs/examples/newparallel/wave2D/communicator.py
@@ -9,7 +9,7 @@
class EngineCommunicator(object):
"""An object that connects Engines to each other.
- north and west sockets listen, while south and east sockets connect.
+ north and east sockets listen, while south and west sockets connect.
This class is useful in cases where there is a set of nodes that
must communicate only with their nearest neighbors.
View
33 docs/examples/newparallel/wave2D/parallelwave-mpi.py
@@ -108,6 +108,9 @@ def wave_saver(u, x, y, t):
assert partition[0]*partition[1] == num_procs, "can't map partition %s to %i engines"%(partition, num_procs)
+ view = rc[:]
+ print "Running %s system on %s processes until %f"%(grid, partition, tstop)
+
# functions defining initial/boundary/source conditions
def I(x,y):
from numpy import exp
@@ -120,15 +123,15 @@ def bc(x,y,t):
return 0.0
# initial imports, setup rank
- rc[:].execute('\n'.join([
+ view.execute('\n'.join([
"from mpi4py import MPI",
"import numpy",
"mpi = MPI.COMM_WORLD",
"my_id = MPI.COMM_WORLD.Get_rank()"]), block=True)
# initialize t_hist/u_hist for saving the state at each step (optional)
- rc[:]['t_hist'] = []
- rc[:]['u_hist'] = []
+ view['t_hist'] = []
+ view['u_hist'] = []
# set vector/scalar implementation details
impl = {}
@@ -137,17 +140,17 @@ def bc(x,y,t):
impl['bc'] = 'vectorized'
# execute some files so that the classes we need will be defined on the engines:
- rc[:].run('RectPartitioner.py')
- rc[:].run('wavesolver.py')
-
+ view.run('RectPartitioner.py')
+ view.run('wavesolver.py')
+
# setup remote partitioner
# note that Reference means that the argument passed to setup_partitioner will be the
# object named 'my_id' in the engine's namespace
- rc[:].apply_sync_bound(setup_partitioner, Reference('my_id'), num_procs, grid, partition)
+ view.apply_sync_bound(setup_partitioner, Reference('my_id'), num_procs, grid, partition)
# wait for initial communication to complete
- rc[:].execute('mpi.barrier()')
+ view.execute('mpi.barrier()')
# setup remote solvers
- rc[:].apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
+ view.apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
# lambda for calling solver.solve:
_solve = lambda *args, **kwargs: solver.solve(*args, **kwargs)
@@ -156,7 +159,7 @@ def bc(x,y,t):
impl['inner'] = 'scalar'
# run first with element-wise Python operations for each cell
t0 = time.time()
- ar = rc[:].apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action)
+ ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action)
if final_test:
# this sum is performed element-wise as results finish
s = sum(ar)
@@ -169,12 +172,12 @@ def bc(x,y,t):
impl['inner'] = 'vectorized'
# setup new solvers
- rc[:].apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
- rc[:].execute('mpi.barrier()')
+ view.apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
+ view.execute('mpi.barrier()')
# run again with numpy vectorized inner-implementation
t0 = time.time()
- ar = rc[:].apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test)#, user_action=wave_saver)
+ ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test)#, user_action=wave_saver)
if final_test:
# this sum is performed element-wise as results finish
s = sum(ar)
@@ -189,9 +192,9 @@ def bc(x,y,t):
# If the partion scheme is Nx1, then u can be reconstructed via 'gather':
if ns.save and partition[-1] == 1:
import pylab
- rc[:].execute('u_last=u_hist[-1]')
+ view.execute('u_last=u_hist[-1]')
# map mpi IDs to IPython IDs, which may not match
- ranks = rc[:]['my_id']
+ ranks = view['my_id']
targets = range(len(ranks))
for idx in range(len(ranks)):
targets[idx] = ranks.index(idx)
View
38 docs/examples/newparallel/wave2D/parallelwave.py
@@ -105,9 +105,15 @@ def wave_saver(u, x, y, t):
if partition is None:
partition = [num_procs,1]
+ else:
+ num_procs = min(num_procs, partition[0]*partition[1])
assert partition[0]*partition[1] == num_procs, "can't map partition %s to %i engines"%(partition, num_procs)
+ # construct the View:
+ view = rc[:num_procs]
+ print "Running %s system on %s processes until %f"%(grid, partition, tstop)
+
# functions defining initial/boundary/source conditions
def I(x,y):
from numpy import exp
@@ -120,8 +126,8 @@ def bc(x,y,t):
return 0.0
# initialize t_hist/u_hist for saving the state at each step (optional)
- rc[:]['t_hist'] = []
- rc[:]['u_hist'] = []
+ view['t_hist'] = []
+ view['u_hist'] = []
# set vector/scalar implementation details
impl = {}
@@ -130,19 +136,19 @@ def bc(x,y,t):
impl['bc'] = 'vectorized'
# execute some files so that the classes we need will be defined on the engines:
- rc[:].execute('import numpy')
- rc[:].run('communicator.py')
- rc[:].run('RectPartitioner.py')
- rc[:].run('wavesolver.py')
+ view.execute('import numpy')
+ view.run('communicator.py')
+ view.run('RectPartitioner.py')
+ view.run('wavesolver.py')
# scatter engine IDs
- rc[:].scatter('my_id', rc.ids, flatten=True)
+ view.scatter('my_id', range(num_procs), flatten=True)
# create the engine connectors
- rc[:].execute('com = EngineCommunicator()')
+ view.execute('com = EngineCommunicator()')
# gather the connection information into a single dict
- ar = rc[:].apply_async(lambda : com.info)
+ ar = view.apply_async(lambda : com.info)
peers = ar.get_dict()
# print peers
# this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators
@@ -150,7 +156,7 @@ def bc(x,y,t):
# setup remote partitioner
# note that Reference means that the argument passed to setup_partitioner will be the
# object named 'com' in the engine's namespace
- rc[:].apply_sync_bound(setup_partitioner, Reference('com'), peers, Reference('my_id'), num_procs, grid, partition)
+ view.apply_sync_bound(setup_partitioner, Reference('com'), peers, Reference('my_id'), num_procs, grid, partition)
time.sleep(1)
# convenience lambda to call solver.solve:
_solve = lambda *args, **kwargs: solver.solve(*args, **kwargs)
@@ -158,11 +164,11 @@ def bc(x,y,t):
if ns.scalar:
impl['inner'] = 'scalar'
# setup remote solvers
- rc[:].apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly, partitioner=Reference('partitioner'), dt=0,implementation=impl)
+ view.apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly, partitioner=Reference('partitioner'), dt=0,implementation=impl)
# run first with element-wise Python operations for each cell
t0 = time.time()
- ar = rc[:].apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action)
+ ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action)
if final_test:
# this sum is performed element-wise as results finish
s = sum(ar)
@@ -176,11 +182,11 @@ def bc(x,y,t):
# run again with faster numpy-vectorized inner implementation:
impl['inner'] = 'vectorized'
# setup remote solvers
- rc[:].apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
+ view.apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
t0 = time.time()
- ar = rc[:].apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test)#, user_action=wave_saver)
+ ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test)#, user_action=wave_saver)
if final_test:
# this sum is performed element-wise as results finish
s = sum(ar)
@@ -195,7 +201,7 @@ def bc(x,y,t):
# If the partion scheme is Nx1, then u can be reconstructed via 'gather':
if ns.save and partition[-1] == 1:
import pylab
- rc[:].execute('u_last=u_hist[-1]')
- u_last = rc[:].gather('u_last', block=True)
+ view.execute('u_last=u_hist[-1]')
+ u_last = view.gather('u_last', block=True)
pylab.pcolor(u_last)
pylab.show()

0 comments on commit 540ed02

Please sign in to comment.
Something went wrong with that request. Please try again.