## MPI Index maps and parallel matrix-vector product in dolfinx/python

In this notebook we implement matrix-vector multiplication with MPI. 

1. For both matrices and vectors, we construct index maps and use that to distribute values. Basically, we create an IndexMap to describe the distribution scheme we want, and then we use the IndexMap to create objects (such as vectors and matrices) that have this distribution. 

2. After we define IndexMap and distributed objects we need, we show some basic arithmetic with those objects, such as adding vectors and matrix-vector product.

First, we create a cluster and activate mpi engine. For now, we start with n=3 processors.

In [47]:
import logging
import ipyparallel as ipp

# create a cluster
cluster = ipp.Cluster(engines="mpi", n=3, log_level=logging.WARNING)
rc = cluster.start_and_connect_sync(activate=True)


  0%|                                                            | 0/3 [00:00<?, ?engine/s][A
100%|████████████████████████████████████████████████████| 3/3 [00:05<00:00,  1.89s/engine][A


In [48]:
%%px
# Create MPI communicator, find out rank, size
from mpi4py import MPI

comm = MPI.COMM_WORLD
world_rank = comm.Get_rank()
world_size = comm.Get_size()
print(f"I am rank {world_rank} / {world_size}")

[stdout:0] I am rank 0 / 3


[stdout:2] I am rank 2 / 3


[stdout:1] I am rank 1 / 3


We want to create an overlapping, ie ghosted, index map for our vectors and matrices. (see [dolfinx implementation](https://github.com/FEniCS/dolfinx/blob/160ed13eb476df99944072aec70bd46a6fcb9bcf/cpp/dolfinx/common/IndexMap.cpp))

An index map defines how we distribute degrees of freedoms (dofs) of a vector/array over processors. An overlapping map takes into account that some dofs are shared between processors, and these dofs are called *ghosts*. We can use ghosts to communicate/exchange data over different processors. Every ghost has its *owner*, ie rank/processor that owns that dof.

To create an index map, what we need is:
- MPI communicator
- local size of the index map (number of owned entries)
- global indices of ghost entries
- owner rank of each ghost entry on global communicator

In the index map, we construct:
- **source ranks** - ranks that own indices ghosted by the caller
- **destination ranks** - ranks that ghost indices owned by the caller
- **forward communicator** - 
- **reverse communicator** - 


In [49]:
def is_sorted(lst):
    return all(a <= b for a, b in zip(lst, lst[1:]))

In [50]:
%%px
class IndexMap():
    def __init__(self, comm, local_size, ghosts, owners):
        # fixme: perhaps hide these and use special methods to access
        # assume ghosts and owners are of type list
        assert len(ghosts) == len(owners)
        
        self.comm = comm
        self.forward_comm = None
        self.reverse_comm = None
        self.ghosts = ghosts
        self.owners = owners
        self.local_size = local_size
        
        # Get global size
        self.global_size = 0
        self.compute_size_global()
        
        # Get local range of the index map
        self.local_range = None
        self.compute_local_range()
        
        # Get lists of sources and destinations
        self.sources = None
        self.destinations = None
        self.compute_src_dest()
        
    def compute_src_dest(self):
        dest_ranks = np.unique(self.owners)
        print(f"Rank={self.comm.rank}, destinations={dest_ranks}")
        
        # create lists of source and destination ranks and the forward communicator that pushes data from sources to destinations
        self.forward_comm = self.comm.Create_dist_graph([self.comm.rank], [len(dest_ranks)], dest_ranks.tolist(), reorder=False)
        src, dest, _ = self.forward_comm.Get_dist_neighbors()
        
        # create the reverse communicator that pushes data back from destinations to sources
        self.reverse_comm = comm.Create_dist_graph_adjacent(dest, src, reorder=False)
        src2, dest2, _ = self.reverse_comm.Get_dist_neighbors()
        
        # check the lists these communicators have are the same
        print(f"{src=}, {src2=}")
        print(f"{dest=}, {dest2=}")
        # assume src and dest are lists
        # assert is_sorted(src)
        # assert is_sorted(dest)
        
        self.sources = src
        self.destinations = dest
        
    def local_to_global(self, local_ind):
        # todo
        return NotImplemented
    
    def compute_size_global(self):
        # sum up local sizes to get global size (cummulative?)
        self.global_size = self.comm.allreduce([self.local_size], op=MPI.SUM)
        
    def compute_local_range(self):
        # compute offset (returns list?)
        offset = self.comm.exscan([self.local_size], op=MPI.SUM)
        # if we're at rank 0, then offset is None and local range is (0, local_size); 
        # otherwise it's offset by some number
        if offset is None:
            self.local_range = (0, self.local_size)
        else: 
            print(f"{offset=}")
            self.local_range = (offset[0], offset[0] + self.local_size)


In [51]:
%%px
import numpy as np

# set up some Index Map over range(200)
if comm.rank == 0:
    IM = IndexMap(comm, 50, range(50, 200), list(np.ones(150, dtype=np.int32)))
elif comm.rank == 1:
    IM = IndexMap(comm, 150, range(50), list(np.zeros(50, dtype=np.int32)))
elif comm.rank == 2:
    IM = IndexMap(comm, 33, range(5), list(np.zeros(5, dtype=np.int32)))
else:
    raise RuntimeError("Too many processors!")

[stdout:0] Rank=0, destinations=[1]
src=[1, 2], src2=[1]
dest=[1], dest2=[1, 2]


[stdout:2] offset=[50, 150]
Rank=2, destinations=[0]
src=[], src2=[0]
dest=[0], dest2=[]


[stdout:1] offset=[50]
Rank=1, destinations=[0]
src=[0], src2=[0]
dest=[0], dest2=[0]


In [52]:
%%px
# Let's check if everything makes sense
print(IM.comm.rank)
print(IM.local_size)
print(IM.local_range)
print(IM.sources, IM.destinations)

[stdout:0] 0
50
(0, 50)
[1, 2] [1]


[stdout:1] 1
150
(50, 200)
[0] [0]


[stdout:2] 2
33
(50, 83)
[] [0]




Now we want to use this index map for dolfinx objects (vectors, matrices)



In [53]:
%%px
import dolfin as df
# from petsc4py import PETSc

def create_indexmap(V):
    indexmap = V.dofmap().index_map()
    owners = V.dofmap().off_process_owner()
    ghosts = indexmap.local_to_global_unowned()
    
    return IndexMap(comm, indexmap.size(df.IndexMap.MapSize.OWNED), ghosts, owners)

mesh = df.UnitSquareMesh(2,2)
V = df.FunctionSpace(mesh, 'CG', 1)

# create an index map from whatever dolfin made into a dofmap (namely, index map)
new_im = create_indexmap(V)

[stdout:1] offset=[1]
Rank=1, destinations=[2]
src=[0], src2=[2]
dest=[2], dest2=[0]


[stdout:2] offset=[1, 3]
Rank=2, destinations=[]
src=[1, 0], src2=[]
dest=[], dest2=[1, 0]


[stdout:0] Rank=0, destinations=[1 2]
src=[], src2=[1, 2]
dest=[1, 2], dest2=[]


In [54]:
%%px
# Create a vector and edit its value only on rank 0
# See if the updated values on ghosts dofs are also updated on their respective owner ranks
u = df.Function(V)
u.interpolate(df.Expression("x[0]+x[1]", degree=1))
uu = u.vector().get_local(np.arange(new_im.local_size+len(new_im.ghosts)))
print(uu)

if comm.rank == 0:
    uu[:] = 2405
    u.vector().set_local(uu[:])
    # print(uu)

print(comm.rank)
u.vector().apply("insert")
print(u.vector().get_local(np.arange(new_im.local_size+len(new_im.ghosts))))

[stdout:1] [0.5 0.  1.5 0.5 1.  2. ]
1
[2.405e+03 0.000e+00 2.405e+03 5.000e-01 2.405e+03 2.000e+00]


[stdout:2] [2.  1.  1.5 0.5 1. ]
2
[2.000e+00 2.405e+03 1.500e+00 5.000e-01 1.000e+00]


[stdout:0] [1.  0.5 1.5 1. ]
0
[2405. 2405. 2405. 2405.]


In [55]:
%%px
print(f"{new_im.local_size=}, {new_im.ghosts=}")

[stdout:0] new_im.local_size=1, new_im.ghosts=array([1, 3, 5], dtype=uint64)


[stdout:2] new_im.local_size=5, new_im.ghosts=array([], dtype=uint64)


[stdout:1] new_im.local_size=3, new_im.ghosts=array([7, 5, 4], dtype=uint64)


In [56]:
%%px
# Implement vector addition z = alpha * x + y
def axpy(alpha, x, y):
    xa = x.vector().get_local(np.arange(new_im.local_size))
    ya = y.vector().get_local(np.arange(new_im.local_size))
    # Use own index map for scatter_reverse add values
    z = df.Function(V)
    z.vector().set_local(alpha*xa+ya)
    z.vector().apply("insert")

    return z

def getl(x):
    return x.vector().get_local(np.arange(new_im.local_size+len(new_im.ghosts)))

v = df.Function(V)
v.interpolate(df.Expression("x[0]*x[1]", degree=2))
u = df.Function(V)
u.interpolate(df.Expression("39502", degree=1))

w = axpy(0.1, u, v)
print(f"{comm.rank=}")
print("w=", getl(w))

# Check if we get the same with dolfin
q = df.Function(V)
q.interpolate(df.Expression("0.1*39502 + x[0]*x[1]", degree=2))
print("q=", getl(q))

[stdout:0] comm.rank=0
w= [3950.2  3950.2  3950.7  3950.45]
q= [3950.2  3950.2  3950.7  3950.45]


[stdout:1] comm.rank=1
w= [3950.2  3950.2  3950.7  3950.2  3950.45 3951.2 ]
q= [3950.2  3950.2  3950.7  3950.2  3950.45 3951.2 ]


[stdout:2] comm.rank=2
w= [3951.2  3950.45 3950.7  3950.2  3950.2 ]
q= [3951.2  3950.45 3950.7  3950.2  3950.2 ]


Now that we have vectors and their addition working in parallel, we set up distributed (square) CSR matrices. We will need one index map for both rows and columns (same size).

In [60]:
%%px
u, v = df.TrialFunction(V), df.TestFunction(V)
a = df.inner(u, v) * df.dx + df.inner(df.grad(u), df.grad(v)) * df.dx
A = df.assemble(a)
Amat = df.as_backend_type(A).mat()

In [65]:
%%px
def mv(Amat, x):
    """ 
    assume square matrix A and vectors x,y of same size
    nrow: number of rows
    ncol: number of cols
    nnz: number of nonzeros
    IA: column pointers for each row (size = nrow+1)
    JA: column indices (size = nnz)
    data: values (size = nnz)
    """
    csr = Amat.getValuesCSR()
    
    # local_rows = new_im.local_size
    local_rows = new_im.local_size
    assert local_rows <= Amat.size[0] and local_rows <= Amat.size[1]
    
    row_range = range(local_rows)

    xlocal = x.get_local(row_range)
    temp = np.zeros(local_rows)
    
    for i in row_range:
        for j in range(csr[0][i], csr[0][i+1]):
            temp[i] += csr[2][j]*x[csr[1][j]]
            
    y = df.Function(V)
    y.vector().set_local(temp)
    y.vector().apply("insert")
    
    return y

v = df.Function(V)
v.interpolate(df.Expression("1", degree=1))

w = mv(Amat, v.vector())
print(f"{comm.rank=}")
print("w=", getl(w))

# Check if we get the same with dolfin
q = df.Function(V)
q = A * v
print("q=", getl(q))

[2:execute]
[0;31m---------------------------------------------------------------------------[0m
[0;31mIndexError[0m                                Traceback (most recent call last)
[0;32m/tmp/ipykernel_98769/4120170664.py[0m in [0;36m<module>[0;34m[0m
[1;32m     33[0m [0mv[0m[0;34m.[0m[0minterpolate[0m[0;34m([0m[0mdf[0m[0;34m.[0m[0mExpression[0m[0;34m([0m[0;34m"1"[0m[0;34m,[0m [0mdegree[0m[0;34m=[0m[0;36m1[0m[0;34m)[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m     34[0m [0;34m[0m[0m
[0;32m---> 35[0;31m [0mw[0m [0;34m=[0m [0mmv[0m[0;34m([0m[0mAmat[0m[0;34m,[0m [0mv[0m[0;34m.[0m[0mvector[0m[0;34m([0m[0;34m)[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m     36[0m [0mprint[0m[0;34m([0m[0;34mf"{comm.rank=}"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m     37[0m [0mprint[0m[0;34m([0m[0;34m"w="[0m[0;34m,[0m [0mgetl[0m[0;34m([0m[0mw[0m[0;34m)[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m

[0;

AlreadyDisplayedError: 3 errors

In [45]:
# Stop cluster
cluster.stop_cluster_sync()

engine set stopped 1681394295: {'exit_code': 1, 'pid': 98099, 'identifier': 'ipengine-1681394294-2oc2-1681394295-96128'}
