# Pushing data to engines

In [48]:
import ipyparallel as ipp
cluster = ipp.Cluster(n=4, engines="mpi")
rc = cluster.start_and_connect_sync(activate=True)

Starting 4 engines with <class 'ipyparallel.cluster.launcher.MPIEngineSetLauncher'>


  0%|          | 0/4 [00:00<?, ?engine/s]

Say we want to send a vector to the process with MPI rank 1, we then start by defining the rank on all processes

In [49]:
%%px
from mpi4py import MPI
rank = int(MPI.COMM_WORLD.rank)

We can next get this rank from all the processes in the view:

In [50]:
p_view = rc[:]
mpi_ranks = p_view.pull("rank")
print(mpi_ranks.get())

[0, 1, 2, 3]


We find the position of the processer with rank 1

In [51]:
import numpy as np
proc_pos = int(np.argwhere(np.isclose(mpi_ranks.get(), 1))[0, 0])

We can next send an array to this process

In [52]:
data = np.arange(10, dtype=np.int32)
rc[proc_pos].push(dict(a=data))

<AsyncResult(_push): pending>

In [53]:
%%px
try:
    print(rank, a)
except NameError as e:
    print(f"Error: rank {rank} raised {e}")

[stdout:0] Error: rank 0 raised name 'a' is not defined


[stdout:1] 1 [0 1 2 3 4 5 6 7 8 9]


[stdout:3] Error: rank 3 raised name 'a' is not defined


[stdout:2] Error: rank 2 raised name 'a' is not defined


In [55]:
data = rc[proc_pos].pull("a")
print(data.get())

[0 1 2 3 4 5 6 7 8 9]


# Scatter/gather
We can of course send data to all processers as well

In [68]:
sub_view = rc[1:3]
sub_view.scatter("e", np.arange(5, dtype=np.int8))

<AsyncResult(scatter): pending>

In [69]:
%%px
try:
    print(rank, e)
except NameError:
    pass

[stdout:1] 1 [0 1 2]


[stdout:2] 2 [3 4]


In [70]:
sub_view.gather("e").get()

array([0, 1, 2, 3, 4], dtype=int8)

In [72]:
for i in range(4):
    sub_view.gather("e")