In [1]:
%load_ext autoreload
%load_ext line_profiler
%autoreload 2

In [2]:
import numpy as np
from mpi4py import MPI
from mpi4py.MPI import COMM_WORLD, IN_PLACE

from qtm.mpi.utils import scatter_len, scatter_range

# size, rank = COMM_WORLD.Get_size(), COMM_WORLD.Get_rank()
size, rank = 2, 0

glob_shape = (3, 4)
glob_arr = np.arange(np.prod(glob_shape), dtype='f8').reshape(glob_shape)
dtype_np = glob_arr.dtype

send_shape = list(glob_shape)
send_shape[0] = scatter_len(send_shape[0], size, rank)
send_shape = tuple(send_shape)
print(glob_shape, send_shape)

send_arr = np.empty(send_shape, dtype='f8')
r = scatter_range(glob_shape[0], size, rank)
glob_arr.take(r, axis=0, out=send_arr)
print(send_arr)

recv_shape = list(reversed(glob_shape))
recv_shape[0] = scatter_len(recv_shape[0], size, rank)
recv_arr = np.empty(recv_shape, dtype='f8')
print(recv_arr.shape)


subshape = list(send_shape)
substarts = [0] * len(send_shape)
dtype_mpi = MPI._typedict[dtype_np.char]
send_dtypes = []
for iproc in range(size):
    sendlen = scatter_len(subshape[1], size, iproc)
    subshape[1] = sendlen
    send_dtypes.append(
        dtype_mpi.Create_subarray(send_shape, subshape, substarts)
    )
    substarts[1] += sendlen
    
subshape = list(recv_shape)
substarts = [0] * len(recv_shape)
dtype_mpi = MPI._typedict[dtype_np.char]
recv_dtypes = []
for iproc in range(size):
    recvlen = scatter_len(subshape[1], size, iproc)
    subshape[1] = sendlen
    recv_dtypes.append(
        dtype_mpi.Create_subarray(recv_shape, subshape, substarts)
    )
    substarts[1] += recvlen


# COMM_WORLD.Scatterv((sendbuf, sendcounts), recvbuf)
# print(rank, recvbuf)

# recvbuf[:] = -1

# COMM_WORLD.Scatterv([sendbuf if rank == 0 else recvbuf, sendcounts], 
#                     IN_PLACE if rank == 0 else recvbuf
#                    )
# print(rank, recvbuf, sendbuf)

(3, 4) (2, 4)
[[0. 1. 2. 3.]
 [4. 5. 6. 7.]]
(2, 3)


In [3]:
import ipyparallel as ipp
print(ipp.version_info)

nproc = 4
cluster = ipp.Cluster(engines='mpi', n=nproc, shutdown_atexit=False)
print(cluster)
client = cluster.start_and_connect_sync(activate=True)
view = client[:]
client.ids

(8, 6, 1)
<Cluster(cluster_id='1690371257-gdg8', profile='default')>
100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 4/4 [00:05<00:00,  1.49s/engine]


[0, 1, 2, 3]

In [4]:
!ipcluster list

PROFILE          CLUSTER ID                       RUNNING ENGINES LAUNCHER
default          1690370528-elmy                  True          0 MPI
default          1690371257-gdg8                  True          4 MPI


In [5]:
%autopx --block --group-outputs=engines

%autopx enabled


In [6]:
import numpy as np
from mpi4py import MPI
from mpi4py.MPI import COMM_WORLD, IN_PLACE

from qtm.mpi.utils import scatter_len, scatter_range, gen_subarray_dtypes

size, rank = COMM_WORLD.Get_size(), COMM_WORLD.Get_rank()

glob_shape = (7, 9)
glob_arr = np.arange(np.prod(glob_shape), dtype='c16').reshape(glob_shape)
dtype_np = glob_arr.dtype

send_shape = list(glob_shape)
send_shape[0] = scatter_len(send_shape[0], size, rank)
send_shape = tuple(send_shape)
print('glob', glob_shape)
print('send', send_shape)

send_arr = np.empty(send_shape, dtype='c16')
r = scatter_range(glob_shape[0], size, rank)
glob_arr.take(r, axis=0, out=send_arr)
#print(send_arr)

recv_shape = list(glob_shape)
recv_shape[1] = scatter_len(recv_shape[1], size, rank)
recv_arr = np.empty(recv_shape, dtype='c16')
print('recv', recv_shape)


# subshape = list(send_shape)
# substarts = [0] * len(send_shape)
# dtype_mpi = MPI._typedict[dtype_np.char]
# send_dtypes = []
# for iproc in range(size):
#     sendlen = scatter_len(send_shape[1], size, iproc)
#     subshape[1] = sendlen
#     # print('args', iproc, send_shape, subshape, substarts)
#     send_dtypes.append(
#         dtype_mpi.Create_subarray(send_shape, tuple(subshape), tuple(substarts)).Commit()
#     )
#     substarts[1] += sendlen
send_dtypes = gen_subarray_dtypes(send_shape, 1, send_arr.dtype, size)
# print(send_dtypes)

# subshape = list(recv_shape)
# substarts = [0] * len(recv_shape)
# dtype_mpi = MPI._typedict[dtype_np.char]
# recv_dtypes = []
# for iproc in range(size):
#     recvlen = scatter_len(recv_shape[1], size, iproc)
#     subshape[1] = recvlen
#     # print('args', iproc, recv_shape, subshape, substarts, recvlen)
#     recv_dtypes.append(
#         dtype_mpi.Create_subarray(recv_shape, tuple(subshape), tuple(substarts)).Commit()
#     )
#     substarts[1] += recvlen
recv_dtypes = gen_subarray_dtypes(recv_shape, 0, recv_arr.dtype, size)
# print(recv_dtypes)
    
COMM_WORLD.Alltoallw((send_arr, send_dtypes), (recv_arr, recv_dtypes))
print(recv_arr.real)
print('-'*20)

[stdout:0] glob (7, 9)
send (2, 9)
recv [7, 3]
[[ 0.  1.  2.]
 [ 9. 10. 11.]
 [18. 19. 20.]
 [27. 28. 29.]
 [36. 37. 38.]
 [45. 46. 47.]
 [54. 55. 56.]]
--------------------


[stdout:2] glob (7, 9)
send (2, 9)
recv [7, 2]
[[ 5.  6.]
 [14. 15.]
 [23. 24.]
 [32. 33.]
 [41. 42.]
 [50. 51.]
 [59. 60.]]
--------------------


[stdout:3] glob (7, 9)
send (1, 9)
recv [7, 2]
[[ 7.  8.]
 [16. 17.]
 [25. 26.]
 [34. 35.]
 [43. 44.]
 [52. 53.]
 [61. 62.]]
--------------------


[stdout:1] glob (7, 9)
send (2, 9)
recv [7, 2]
[[ 3.  4.]
 [12. 13.]
 [21. 22.]
 [30. 31.]
 [39. 40.]
 [48. 49.]
 [57. 58.]]
--------------------


In [7]:
send_arr[:] = -1
COMM_WORLD.Alltoallw((recv_arr, recv_dtypes), (send_arr, send_dtypes))
print(send_arr.real)
print('-'*20)

[stdout:3] [[54. 55. 56. 57. 58. 59. 60. 61. 62.]]
--------------------


[stdout:1] [[18. 19. 20. 21. 22. 23. 24. 25. 26.]
 [27. 28. 29. 30. 31. 32. 33. 34. 35.]]
--------------------


[stdout:0] [[ 0.  1.  2.  3.  4.  5.  6.  7.  8.]
 [ 9. 10. 11. 12. 13. 14. 15. 16. 17.]]
--------------------


[stdout:2] [[36. 37. 38. 39. 40. 41. 42. 43. 44.]
 [45. 46. 47. 48. 49. 50. 51. 52. 53.]]
--------------------


In [8]:
from typing import Sequence

def gen_vector_dtype(shape: Sequence[int], axis: int, dtype_np: np.dtype):
    from mpi4py.MPI import _typedict
    dtype_mpi = _typedict[dtype_np.char]

    nblocks = np.prod(shape[:axis], dtype='i8')
    stridelen = np.prod(shape[axis:], dtype='i8')
    blocklen = stridelen // shape[axis]
    print('vec', nblocks, stridelen, blocklen)
    vectype = dtype_mpi.Create_vector(
        nblocks, blocklen, stridelen
    ).Commit()
    vectype = vectype.Create_resized(0, dtype_mpi.size).Commit()
    return vectype

glob_shape, axis = (7, 9), 1

glob_arr = np.arange(np.prod(glob_shape), dtype='c16').reshape(glob_shape)
dtype_np = glob_arr.dtype

loc_shape = list(glob_shape)
loc_shape[axis] = scatter_len(glob_shape[axis], size, rank)
loc_arr = np.ones(loc_shape, dtype='c16') * -1

print(rank, glob_shape, loc_shape)
sendbuf = None
if rank == 0:
    send_dtype = gen_vector_dtype(glob_shape, axis, dtype_np)
    print('sendcounts', scatter_len(glob_shape[axis], size))
    sendbuf = (glob_arr, scatter_len(glob_shape[axis], size), send_dtype)

recv_dtype = gen_vector_dtype(loc_shape, axis, dtype_np)
recvbuf = (loc_arr, recv_dtype)

    
COMM_WORLD.Scatterv(sendbuf, recvbuf)
print(loc_arr.real)

[stdout:1] 1 (7, 9) [7, 2]
vec 7 2 1
[[ 3.  4.]
 [12. 13.]
 [21. 22.]
 [30. 31.]
 [39. 40.]
 [48. 49.]
 [57. 58.]]


[stdout:3] 3 (7, 9) [7, 2]
vec 7 2 1
[[ 7.  8.]
 [16. 17.]
 [25. 26.]
 [34. 35.]
 [43. 44.]
 [52. 53.]
 [61. 62.]]


[stdout:0] 0 (7, 9) [7, 3]
vec 7 9 1
sendcounts [3 2 2 2]
vec 7 3 1
[[ 0.  1.  2.]
 [ 9. 10. 11.]
 [18. 19. 20.]
 [27. 28. 29.]
 [36. 37. 38.]
 [45. 46. 47.]
 [54. 55. 56.]]


[stdout:2] 2 (7, 9) [7, 2]
vec 7 2 1
[[ 5.  6.]
 [14. 15.]
 [23. 24.]
 [32. 33.]
 [41. 42.]
 [50. 51.]
 [59. 60.]]


In [13]:
import numpy as np
from mpi4py.MPI import Op

a = np.arange(16, dtype='f8').reshape(4,2,2)

out = np.multiply.reduce(a, axis=0)
print(out.shape)

a_loc = a[rank].copy()

def sum_op(inp, out, dtype):
    inp = np.frombuffer(inp, dtype='f8')
    out = np.frombuffer(out, dtype='f8')
    #print('inp', inp, 'out', out)
    ufunc = np.multiply
    ufunc(inp, out, out=out)
    #print('red', out)
    
op = Op.Create(sum_op, True)

COMM_WORLD.Allreduce(MPI.IN_PLACE, a_loc, op)
print(np.allclose(a_loc, out))
#print(out)
#print(a_loc)

[stdout:0] (2, 2)
True


[stdout:3] (2, 2)
True


[stdout:1] (2, 2)
True


[stdout:2] (2, 2)
True


In [14]:
%autopx --block --group-outputs=engines

%autopx disabled


In [15]:
cluster.stop_cluster_sync()

In [16]:
!ipcluster list

PROFILE          CLUSTER ID                       RUNNING ENGINES LAUNCHER
default          1690370528-elmy                  True          0 MPI


In [20]:
import numpy as np

a = np.arange(16).reshape(4,2,2)
out = np.subtract.reduce(a, axis=0)
print(out)

[[-24 -26]
 [-28 -30]]


In [3]:
from mpi4py import MPI

for key in MPI._typedict:
    print(key, MPI._typedict[key])

? <mpi4py.MPI.Datatype object at 0x7fc4a873eac0>
c <mpi4py.MPI.Datatype object at 0x7fc4a873e7f0>
S <mpi4py.MPI.Datatype object at 0x7fc4a873e7f0>
S1 <mpi4py.MPI.Datatype object at 0x7fc4a873e7f0>
s <mpi4py.MPI.Datatype object at 0x7fc4a873e7f0>
1s <mpi4py.MPI.Datatype object at 0x7fc4a873e7f0>
b <mpi4py.MPI.Datatype object at 0x7fc4a873e850>
h <mpi4py.MPI.Datatype object at 0x7fc4a873e880>
i <mpi4py.MPI.Datatype object at 0x7fc4a873e8b0>
l <mpi4py.MPI.Datatype object at 0x7fc4a873e8e0>
q <mpi4py.MPI.Datatype object at 0x7fc4a873e910>
B <mpi4py.MPI.Datatype object at 0x7fc4a873e940>
H <mpi4py.MPI.Datatype object at 0x7fc4a873e970>
I <mpi4py.MPI.Datatype object at 0x7fc4a873e9a0>
L <mpi4py.MPI.Datatype object at 0x7fc4a873e9d0>
Q <mpi4py.MPI.Datatype object at 0x7fc4a873ea00>
f <mpi4py.MPI.Datatype object at 0x7fc4a873ea30>
d <mpi4py.MPI.Datatype object at 0x7fc4a873ea60>
g <mpi4py.MPI.Datatype object at 0x7fc4a873ea90>
Zf <mpi4py.MPI.Datatype object at 0x7fc4a873eca0>
Zd <mpi4py.MPI.Da