# Collective Operations

### Sources

- [Wikipedia: Collective operation](https://en.wikipedia.org/wiki/Collective_operation)
- [Wikipedia: Broadcast parallel pattern](https://en.wikipedia.org/wiki/Broadcast_(parallel_pattern))
- [MPI Tutorial](https://mpitutorial.com/tutorials/mpi-reduce-and-allreduce)
- [Python MPI library](https://materials.jeremybejarano.com/MPIwithPython/collectiveCom.html)
- [Python MPI tutorial](https://nyu-cds.github.io/python-mpi/)

### Notation

$$
\begin{align}
p          & = \textrm{Number of processing units}                                             && \\
\mathbf{p} & = (p_0, p_1,...,p_{p-1}) & p_i & = \textrm{Processing unit / node }i                 \\
\mathbf{n} & = (n_0, n_1,...,n_{p-1}) & n_i & = \textrm{Input message size for processing unit }i \\
\end{align}
$$
- In cases where we have initial messages on more than one node we assume that all local messages are of the same size.
- If we do not have an equal distribution, i.e. node $p_i$ has a message of size $n_i$, we get an upper bound for the runtime by setting $n=\max(n_{0},n_{1},\dots ,n_{p-1})$.
- A distributed memory model is assumed where each processing unit has it's own associated memory.

## 1. Broadcast

- The broadcast pattern is used to distribute data from one processing unit to all processing units
- Broadcast can be interpreted as an inverse version of the reduce pattern
- Initially only root $r$ with $id=0$ stores message $m$.
- During broadcast $m$ is sent to the remaining processing units, so that eventually $m$ is available to all processing units.
- Since an implementation by means of a sequential for-loop with $p-1$ iterations becomes a bottleneck, divide-and-conquer approaches are common.

![fig](figures/Broadcast.png)
By RenderFlamingo - Own work, CC BY-SA 4.0, [curid=86786881](https://commons.wikimedia.org/w/index.php?curid=86786881).

### 1.1. Binomial Tree Broadcast

- $p$ has to be a power of two.
- When a processing unit is responsible for sending $m$ to processing units $[i,j]$,
- it sends $m$ to processing unit $(i+j)/2$ and delegates responsibility for the processing units $\left\lceil (i+j)/2\right\rceil ..\left\lceil (i+j)-1\right\rceil$ to it,
- while its own responsibility is cut down to $i..\left\lceil (i+j)/2\right\rceil -1$.
- Binomial trees have a problem with long messages $m$. The receiving unit of $m$ can only propagate the message to other units, after it received the whole message. In the meantime, the communication network is not utilized.
- Broadcast on balanced binary tree is possible in $\mathrm{O}(\alpha \log p)$, where $\alpha$ is the latency.

![fig](figures/Binomial_Tree_Broadcast.gif)
By 13hannes1 - Own work, CC BY-SA 4.0, [curid=77379340](https://commons.wikimedia.org/w/index.php?curid=77379340)

In [1]:
# All library imports here

import math
import random

In [2]:
n    = 4
msg  = [1,1]
k    = len(msg)

p    = [[0]*k for i in range(n)]
p[0] = msg

print('Binary broadcast with simple recursive algorithm')
print()
print('i j s')
print('- -', sum([sum(p[m]) for m in range(n)]), '\t',p)

def broadcast(i, j, msg):
    if j-i == 1:
        p[j] = msg
        print(i,j, sum([sum(p[m]) for m in range(n)]), '\t',p)
        return
    else:
        newi = int((i+j)/2)+1
        p[newi] = msg
        print(i,j, sum([sum(p[m]) for m in range(n)]), '\t',p)
        broadcast(i, newi-1, msg)
        broadcast(newi, j, msg)

broadcast(0, n-1, msg)

### 1.2. Linear Broadcast

- The message is $k$ pieces long and you can only copy over one piece at a time
- Pipelined broadcast on balanced binary tree is possible in $\mathrm{O}(\alpha (p+k))$.

![fig](figures/Pipeline_Broadcast.gif)

By 13hannes1 - Own work, CC BY-SA 4.0, https://commons.wikimedia.org/w/index.php?curid=77379385

In [3]:
p    = [[0]*k for i in range(n)]
p[0] = msg

print('Linear pipeline broadcast with simple recursive algorithm')
print()
print('n', 'k', 's')
def linear_broadcast(node, mi, p):
    n, k = len(p), len(p[0])
    if mi==k:
        return
    if node==n and mi==k:
        return
    print(node, mi, sum([sum(p[i]) for i in range(n)]), '\t', p)
    if node<n-1:
        p[node+1][mi]=p[node][mi]
        if mi<k-1:
            linear_broadcast(node, mi+1, p)
            linear_broadcast(node+1, mi, p)
    return
    
linear_broadcast(0, 0, p)

Linear pipeline broadcast with simple recursive algorithm

n k s
0 0 2 	 [[1, 1], [0, 0], [0, 0], [0, 0]]
0 1 3 	 [[1, 1], [1, 0], [0, 0], [0, 0]]
1 0 4 	 [[1, 1], [1, 1], [0, 0], [0, 0]]
1 1 5 	 [[1, 1], [1, 1], [1, 0], [0, 0]]
2 0 6 	 [[1, 1], [1, 1], [1, 1], [0, 0]]
2 1 7 	 [[1, 1], [1, 1], [1, 1], [1, 0]]
3 0 8 	 [[1, 1], [1, 1], [1, 1], [1, 1]]


### 1.3. Pipelined Fibonacci Tree Broadcast

$$
\begin{align}
\alpha     & = \textrm{Latency}                     \\
\beta      & = \textrm{Communication cost per word} \\
\end{align}
$$

- Therefore pipelining on binary trees is used, where $m$ is split into an array of $k$ packets of size $\left\lceil n/k\right\rceil$.
- The packets are then broadcast one after another, so that data is distributed fast in the communication network.
- Pipelined broadcast on balanced binary tree is possible in $\mathcal{O}(\alpha \log p+\beta n)$.

- <p style="color:red">Shouldn't this be $\mathcal{O}(\alpha(\log p+k))$?!</p>

![fig](figures/Fibonacci_Tree_Pipeline.gif)

By 13hannes1 - Own work, CC BY-SA 4.0, [curid=77401990](https://commons.wikimedia.org/w/index.php?curid=77401990)

In [4]:
print('Fibonacci tree pipeline broadcast with simple recursive algorithm')
print()

msg  = [1, 1]
k    = len(msg)
fibs = [1]*(k+3)
for i in range (2,k+3):
    fibs[i]=fibs[i-1]+fibs[i-2]
n = fibs[k+2]-1
lastleft  = fibs[k+1]-2
lastright = fibs[k]-2
print('n =', n, 'k =', k, 'fibs =', fibs)
print('LL =', lastleft, 'LR =', lastright)
print()

index2depth = []
for i in range(k+1):
    temp = [i]*fibs[i]
    index2depth.extend(temp)

left   = [i+fibs[index2depth[i]]   for i in range(lastleft+1) ]
right  = [i+fibs[index2depth[i]+3] for i in range(lastright+1)] + [0]*(n-lastright-1)
parent = [0]*n
for i in range(1,len(left)):
    parent[left[i]] = i
    if right[i] > 0:
        parent[right[i]] = i
left  += [0]*(n-lastleft-1)

print(' i:\t', [i for i in range(n)])
print('di:\t', index2depth)
print('Lt:\t', left)
print('Rt:\t', right)
print('Pt:\t', parent)
print()

p     = [[0]*k for i in range(n)]
p[0]  = msg
ncalls = [[0]*k]

print('n', 'k', 'c')
def fibonacci_broadcast(node, ki, p, ncalls):
    n, k = len(p), len(p[0])
    if len(ncalls) - 1 < node:
        ncalls.append([0]*k)
    if index2depth[node]>k-1 or ki>k-1 or (node > 0 and ncalls[parent[node]][ki] == 0):
        return
    ncalls[node][ki] += 1
    print(node, ki, ncalls)
    if left[node]>0:
        p[left[node]][ki]  = p[node][ki]
        fibonacci_broadcast(left[node], ki, p, ncalls)
    if right[node]>0:
        p[right[node]][ki] = p[node][ki]
        fibonacci_broadcast(right[node], ki, p, ncalls)
    if index2depth[node]<k-1:
        fibonacci_broadcast(node, ki+1, p, ncalls)
    
fibonacci_broadcast(0, 0, p, ncalls)

Fibonacci tree pipeline broadcast with simple recursive algorithm

n = 4 k = 2 fibs = [1, 1, 2, 3, 5]
LL = 1 LR = 0

 i:	 [0, 1, 2, 3]
di:	 [0, 1, 2, 2]
Lt:	 [1, 2, 0, 0]
Rt:	 [3, 0, 0, 0]
Pt:	 [0, 0, 1, 0]

n k c
0 0 [[1, 0]]
1 0 [[1, 0], [1, 0]]
0 1 [[1, 1], [1, 0], [0, 0], [0, 0]]
1 1 [[1, 1], [1, 1], [0, 0], [0, 0]]


| Broadcast                     | Reduce                     | All-Reduce                    |
|-------------------------------|----------------------------|-------------------------------|
| ![fig](figures/Broadcast.png) | ![fig](figures/Reduce.png) | ![fig](figures/AllReduce.png) |
| [curid=86786881](https://commons.wikimedia.org/w/index.php?curid=86786881)                 |                         [curid=86786880](https://commons.wikimedia.org/w/index.php?curid=86786880)                 |                         [curid=86786874](https://commons.wikimedia.org/w/index.php?curid=86786874)                 |

By RenderFlamingo - Own work, CC BY-SA 4.0

## 2. Reduce

- Reduce is a function the takes an array, combines them using an operator values to give a single value

- The reduce pattern is used to collect data or partial results from different processing units and to combine them into a global result by a chosen operator.
- Given $p$ processing units, message $m_i$ is on processing unit $p_i$ initially. All $m_i$ are aggregated by $\otimes$  and the result is eventually stored on $p_0$. The reduction operator $\otimes$  must be associative at least. Some algorithms require a commutative operator with a neutral element. Operators like $sum$, $min$ and $max$ are common.
- Implementation considerations are similar to broadcast. For pipelining on binary trees the message must be representable as a vector of smaller object for component-wise reduction.
- Pipelined reduce on a balanced binary tree is possible in ${\mathcal {O}}(\alpha \log p+\beta n)$.

## 3. All-Reduce

- The all-reduce pattern (also called allreduce) is used if the result of a reduce operation must be distributed to all processing units. Given $p$ processing units, message $m_i$ is on processing unit $p_i$ initially. All $m_i$ are aggregated by an operator $\otimes$  and the result is eventually stored on all $p_i$. Analog to the reduce operation, the operator $\otimes$  must be at least associative.
- All-reduce can be interpreted as a reduce operation with a subsequent broadcast. For long messages a corresponding implementation is suitable, whereas for short messages, the latency can be reduced by using a hypercube (Hypercube (communication pattern) § All-Gather/ All-Reduce) topology, if $p$ is a power of two.
- All-reduce is possible in ${\mathcal {O}}(\alpha \log p+\beta n)$, since reduce and broadcast are possible in ${\mathcal {O}}(\alpha \log p+\beta n)$ with pipelining on balanced binary trees.

## 4. Putting it All Together

### 4.1. MPI Python Library Function: Bcast

```
Comm.Bcast( buffer,  # array of size k, i.e. a single message
            root=0 ) # data location (root is the processor/node id)
```

![fig](figures/Bcast.png)

### 4.2. MPI Python Library Function 

```
Comm.Reduce( sendbuff,   # send buffer address i.e. word index?
             recvbuff,   # recieve buffer address, i.e., only relevant at root=0
             op = 'SUM', # reduce operation takes send_buffer
             root = 0 )  # data location (root is the processor/node id)
```

![fig](figures/Red.png)

### 4.3. MPI Python Library Function 

```
Comm.Allreduce( sendbuff,    # send buffer address i.e. word index?
                recvbuff,    # recieve buffer address, i.e., only relevant at root=0
                op = 'SUM' ) # reduce operation takes send_buffer
```
![fig](figures/AllRed.png)

Reduces values on all processes to a single value onto all processes.

### 4.4. Communicator class

- In mpi4py, ranks are essential to learning about other processes. A rank is the process’s id within a communicator. A process can be part of more than one communicator at any given time.
- When `Comm.Get_rank()` is called in your program, it gets called by every process in the communicator variable `comm`, and the rank of each respective process is stored into the variable pointed to by rank.
- Remember, rank points to a local variable, which is unique for every calling process because each process has its own separate copy of local variables.

In [33]:
import math

class Node(object):
    def __init__(self, rank, node_size):
        self.rank = rank
        self.size = node_size # k
        self._values = [None]*node_size

    def set_value(self, index, value):
        if (self._values):
            self._values[index] = value
        else:
            print('Node:', self.index, 'values size must be set')

    def get_value(self, index):
        return self._values[index]

    
class Cluster(object):
    def __init__(self, num_nodes, node_size):
        self._nodes = [Node(i, node_size) for i in range(num_nodes)]
        self.size = num_nodes
        
    def get_node(self, index):
        return self._nodes[index]

    
class Comm(object):
    def __init__(self, num_nodes, node_size, bcast_method = 'linear'):
        self.ncalls = None
        self.fibs = None
        self.index2depth = None
        self.left = None
        self.right = None
        self.parent = None
        self.lastleft = None
        self.lastright = None
        self._check_sizes(num_nodes, node_size, bcast_method)
        self.size = num_nodes
        self.node_size = node_size
        self.bcast_method = bcast_method
        self.cluster = Cluster(num_nodes, node_size)
        self.rank = 0 # rank is the node that comm is pointing to
    
    def bcast(self, buffer, root = 0):
        """Sends buffer to the root process"""
        self.rank = root
        if self.bcast_method == 'linear':
            for ki in range(self.node_size):
                self.ncalls = [0]*self.size
                self._binary_bcast(root, root + self.size - 1, ki)
        if self.bcast_method == 'fibonacci':
            self.ncalls = [[0]*self.node_size]
            self._fibonacci_bcast(root, 0, 0)
    
    def reduce(self, send_ki, recv_ki, op = 'SUM', root = 0):
        """Reduces values on all processes to a single value onto the root process.
        """
        self.rank = root
        temp = [self.cluster.get_node(i).get_value(send_ki) for i in range(self.size)]
        result = self.reduce_operation(temp, op)
        self.cluster.get_node(root).set_value(recv_ki, result)
        return result
    
    def allreduce(self, send_ki, recv_ki, op = 'SUM'):
        """Reduces values on all processes to a single value onto all processes."""
        self.bcast(self.reduce(self, send_ki, recv_ki, op, self.rank), self.rank)
    
    def reduce_operation(self, values, op = 'SUM'):
        if op=='SUM':  return sum(values)
        if op=='AVG':  return sum(values) / len(values)

    def _check_sizes(self, num_nodes, node_size, bcast_method):
        if bcast_method == 'linear':
            assert (math.log(num_nodes)/math.log(2)).is_integer()
            print('node:', [i for i in range(num_nodes)])
            print('i j k ncalls')
        if bcast_method == 'fibonacci':
            self._fibonacci_numbers(node_size)
            assert num_nodes == self.fibs[node_size + 2] - 1
            self._setup_fibonacci_tree(num_nodes, node_size)
            print('i r k ncalls')
    
    def _binary_bcast(self, inode, jnode, ki):
        """Uses divide and conquer to send data to address ki on all processors"""
        if ki == self.node_size - 1:
            return
        self.ncalls[inode] += 1
        word = self.cluster.get_node(inode).get_value(ki)
        if jnode - inode == 1:
            self.cluster.get_node(jnode%(self.size - 1)).set_value(ki, word)
            print(inode, jnode, ki, self.ncalls)
            return
        else:
            newinode = int((inode + jnode) / 2) + 1
            self.cluster.get_node(newinode%(self.size - 1)).set_value(ki, word)
            print(inode, jnode, ki, self.ncalls)
            self._binary_bcast(inode, newinode-1, ki)
            self._binary_bcast(newinode, jnode, ki)
    
    def _fibonacci_numbers(self, node_size):
        self.fibs = [1]*(node_size + 3)
        for i in range (2, node_size + 3):
            self.fibs[i] = self.fibs[i - 1] + self.fibs[i - 2]
    
    def _setup_fibonacci_tree(self, num_nodes, node_size):
        self.lastleft  = self.fibs[node_size + 1] - 2
        self.lastright = self.fibs[node_size] - 2
        print('n =', num_nodes, 'k =', node_size, 'fibs =', self.fibs)
        print('LL =', self.lastleft, 'LR =', self.lastright)
        print()

        self.index2depth = []
        for i in range(node_size + 1):
            self.index2depth.extend([i]*self.fibs[i])
            
        self.left   = [i + self.fibs[self.index2depth[i]    ] for i in range(self.lastleft  + 1)]
        self.right  = [i + self.fibs[self.index2depth[i] + 3] for i in range(self.lastright + 1)] \
        + [0]*(num_nodes - self.lastright - 1)
        self.parent = [0]*num_nodes
        for i in range(1,len(self.left)):
            self.parent[self.left[i]] = i
            if self.right[i] > 0:
                self.parent[self.right[i]] = i
        self.left  += [0]*(num_nodes - self.lastleft - 1)

        print(' i:\t', [i for i in range(num_nodes)])
        print('di:\t', self.index2depth)
        print('Lt:\t', self.left)
        print('Rt:\t', self.right)
        print('Pt:\t', self.parent)
        print()
        
    def _fibonacci_bcast(self, root, inode, ki):
        if len(self.ncalls) - 1 < inode:
            self.ncalls.append([0]*self.node_size)
        if self.index2depth[inode] > self.node_size - 1 \
        or ki > self.node_size - 1 \
        or (inode > 0 and self.ncalls[self.parent[inode]][ki] == 0):
            return
        self.ncalls[inode][ki] += 1
        word = self.cluster.get_node(root).get_value(ki)
        print(inode, root, ki, self.ncalls)
        if self.left[inode] > 0:
            self.cluster.get_node((self.left[inode] + root) % self.size).set_value(ki, word)
            self._fibonacci_bcast(root, self.left[inode], ki)
        if self.right[inode] > 0:
            self.cluster.get_node((self.right[inode] + root) % self.size).set_value(ki, word)
            self._fibonacci_bcast(root, self.right[inode], ki)
        if self.index2depth[inode] < self.node_size - 1:
            self._fibonacci_bcast(root, inode, ki+1)

n    = 4
msg  = [1,1]
k = len(msg)
comm = Comm(n, k, 'fibonacci')
# Put the message at node
start_node = 1
for i, m in enumerate(msg):
    comm.cluster.get_node(start_node).set_value(i, m)
comm.bcast(comm.cluster.get_node(start_node), start_node)

n = 4 k = 2 fibs = [1, 1, 2, 3, 5]
LL = 1 LR = 0

 i:	 [0, 1, 2, 3]
di:	 [0, 1, 2, 2]
Lt:	 [1, 2, 0, 0]
Rt:	 [3, 0, 0, 0]
Pt:	 [0, 0, 1, 0]

i r k ncalls
0 1 0 [[1, 0]]
1 1 0 [[1, 0], [1, 0]]
0 1 1 [[1, 1], [1, 0], [0, 0], [0, 0]]
1 1 1 [[1, 1], [1, 1], [0, 0], [0, 0]]


## 5. Examples

### 5.1. Calculate the Mean Using Reduce

- Each process creates random numbers and makes a local_sum calculation.
- The local_sum is then reduced to the root process using MPI_SUM.
- The global average is then global_sum / (world_size * num_elements_per_proc).

In [36]:
random.seed(6)
values = [[random.random() for i in range(k)] for j in range(n)]

lsums = [sum(values[i]) for i in range(len(values))]

gsum = sum(lsums)
gavg = gsum / (n*k)

lsumsq = [sum([values[i][j]*values[i][j] for j in range(k)]) for i in range(n)]
gsumsq = sum(lsumsq)

gstd = math.sqrt(gsumsq / (n*k))

print('mean:  \t', gavg)
print('stddev:\t', gstd)

for i in range(n):
    node = comm.cluster.get_node(i).set_value(j, lsums[i])

mean:  	 0.5319006758610212
stddev:	 0.5964855071963291


In [35]:
for i in range(n):
    node_sum = comm.reduce_operation(comm.cluster.get_node(i)._values, 'SUM')
    comm.cluster.get_node(i).set_value(0, node_sum)
print('rmean:', comm.reduce(0,0,'SUM',0)/(n*k))

rmean: 0.5319006758610212


### 5.2. Calculate the Standard Deviation Using AllReduce

- Each process computes the local_sum of elements and sums them using MPI_Allreduce.
- After the global sum is available on all processes, the mean is computed so that local_sq_diff can be computed.
- Once all of the local squared differences are computed, global_sq_diff is found by using MPI_Reduce.
- The root process can then compute the standard deviation by taking the square root of the mean of the global squared differences.