# Distributed Parallel Programming Patterns using Open MPI and Java
Java adaptation done by Ruth Kurniawati (Westfield State University) using source code from [CSInParallel](https://github.com/csinparallel/CSinParallel.git)

Modified from mpi4py notebook originally written by Libby Shoop, Macalester College

Welcome!

This book contains some examples illustrating the basic fundamental concepts of distributed computing using Java code. The type of computing these examples illustrate is called *message passing*. Message passing is a form of programming that is based on processes that communicate with each other to coordinate their work. Message passing can be used on a single multicore computer or with a cluster of computers.

### Software Patterns

Patterns in software are common implementations that have been used over and over by practitioners to accomplish tasks. As practitioners use them repeatedly, the community begins to give them names and catalog them, often turning them into reusable library functions. The examples you will see in this book are based on documented patterns that have been used to solve different problems using message passing between processes. Message passing is one form of distributed computing using processes, which can be used on clusters of computers or multicore machines.

In many of these examples, the pattern's name is part of the Java code file's name. You will also see that often the MPI library functions also take on the name of the pattern, and the implementation of those functions themselves contains the pattern that practitioners found themselves using often. These pattern code examples we show you here, dubbed patternlets, are based on original work by Joel Adams:

Adams, Joel C. "Patternlets: A Teaching Tool for Introducing Students to Parallel Design Patterns." 2015 IEEE International Parallel and Distributed Processing Symposium Workshop. IEEE, 2015.

To run these examples, first you will need to install the Java mpi.jar library by running this code (this will usually take a while to install the first time):

In [None]:
!wget https://wsu-courses.s3.amazonaws.com/openmpi411/mpi.jar

### New to colab and jupyter notebook?

If you have not used this type of notebook before, these are split into *cells*. The cell you are reading is a text cell, and the cell just above it is also. the cell with [ ] to the left of it is a code cell, which contains Java code or code that can be run as if you are in a linux shell. The latter linux shell commands always begin with an exclamation point, !, as the cell above that contains a wget command, used to download the mpi.jar file.

You should execute code cells as you follow along in this notebook. Some are designed for you to re-run after changing them. You can run a cell by hovering over the [ ] and clicking on the arrow symbol.

The symbol in the upper left that looks like three .__ symbols toggles the table of contents. Revealing this enables you to navigate to different pattern examples.

The triangle next to some text cells below enables collapsing of sections for faster scrolling.

# Program structure patterns

## Single Program, Multiple Data

This code forms the basis of all of the other examples that follow. It is the fundamental way we structure parallel programs today.


In [None]:
%%writefile Spmd.java
/* Spmd.java
 * ... illustrates the single program multiple data
 *      (SPMD) pattern using basic MPI commands
 *      and OpenMPI's Java interface.
 *
 * Joel Adams, Calvin University, November 2019.
 *
 * Usage: mpirun -np 4 java ./Spmd
 *
 */

import mpi.*;

public class Spmd {

 public static void main(String [] args) throws MPIException {
    MPI.Init(args);

    int id           = MPI.COMM_WORLD.getRank();
    int numProcesses = MPI.COMM_WORLD.getSize();
    String hostName  = MPI.getProcessorName();

    String message   = "Greetings from process #" + id
                         + " of " + numProcesses 
                         + " on " + hostName + "\n";
    System.out.print(message);

    MPI.Finalize();
  }
}

Let's examine the variables created in lines 18-29 carefully.

1. *comm* The fundamental notion with this type of computing is a *process* running independently on the computer. With one single program like this, we can specify that we want to start several processes, each of which can **communicate**. The mechanism for communication is initialized when the program starts up, and the object that represents the means of using communication between processes is called MPI.COMM_WORLD.

2. *id* Every process can identify itself with a number. We get that number by asking *comm* for it using Get_rank().

3. *numProcesses* It is helpful to know haw many processes have started up, because this can be specified differently every time you run this type of program. Asking *comm* for it is done with Get_size().

4. *myHostName* When you run this code on a cluster of computers, it is sometimes useful to know which computer is running a certain piece of code. A particular computer is often called a 'host', which is why we call this variable myHostName, and get it by asking *comm* to provide it with Get_processor_name().

These four variables are often used in every MPI program. The first three are often needed for writing correct programs, and the fourth one is often used for debugging and analysis of where certain computations are running.

Next we see how we can compile and use the mpirun program to execute the above Java code using 4 processes. The value after -np is the number of processes to use when running the Java file saved by executing the previous code cell.

In [None]:
!javac -cp ./mpi.jar Spmd.java 
!mpirun --allow-run-as-root -np 4 java -cp ./mpi.jar Spmd

The fundamental idea of message passing programs can be illustrated like this:

![picture](https://drive.google.com/uc?id=1wpQaFiaubIcQBV9Lw_jwOU0y2-K-EChW)

Each process is set up within a communication network to be able to communicate with every other process via communication links. Each process is set up to have its own number, or id, which starts at 0.

**Note:** Each process holds its own copies of the above 4 data variables. **So even though there is one single program, it is running multiple times in separate processes, each holding its own data values.** This is the reason for the name of the pattern this code represents: single program, multiple data. The print line at the end of main() represents the multiple different data output produced by each process.


## Master-Worker
This is also a very common pattern used in parallel and distributed programming. Here's the sample small illustrative code. Review it and answer this: What is different between this example and the previous one?


In [None]:
%%writefile MasterWorker.java
/* MasterWorker.java
 * ... illustrates the master-worker pattern
 *      using basic MPI commands
 *      and OpenMPI's Java interface.
 *
 * Joel Adams, Calvin University, November 2019.
 *
 * Usage: mpirun -np 4 java ./MasterWorker
 *
 */

import mpi.*;

public class MasterWorker {

 public static final int MASTER = 0;

 public static void main(String [] args) throws MPIException {
    MPI.Init(args);

    int id           = MPI.COMM_WORLD.getRank();
    int numProcesses = MPI.COMM_WORLD.getSize();
    String hostName  = MPI.getProcessorName();
    String message   = "Greetings from ";

    if (id == MASTER) {
       message += "the master, #" + id
                   + " (" + hostName + ")"
                   + " of " + numProcesses + "\n";
    } else {
       message += "a worker, #" + id
                   + " (" + hostName + ")"
                   + " of " + numProcesses + "\n";
    }

    System.out.print(message);

    MPI.Finalize();
  }
}


The answer to the above question illustrates what we can do with this pattern: based on the process id, we can have one process carry out something different than the others. This concept is used a lot as a means to coordinate activities, where one process, often called the master, has the responsibility of handing out work and keeping track of results. We will see this in later examples.

**Note:** By convention, the master coordinating process is usually the process number 0.

In [None]:
!javac -cp ./mpi.jar MasterWorker.java 
!mpirun --allow-run-as-root -np 4 java -cp ./mpi.jar MasterWorker

### Exercises:

- Rerun, using varying numbers of processes from 1 through 8 (i.e., vary the argument after -np).
- Explain what stays the same and what changes as the number of processes changes.

# Decomposition using parallel for loop patterns

The most common way to complete a repeated task in any program language is a loop. We use loops because we want to do a certain number of tasks, very often because we want to work on a set of data elements found in a Buffer, an array, or some other data structure. If the work to be done in each loop is independent of previous iterations, we can use separate processes to do parts of the loop independently. This program structure pattern is called the parallel for loop pattern, which is an implementation strategy for decomposition of the work to be done into smaller parts.

## Parallel Loop Split into Equal Sized Chunks

In the code below, notice the use of the variable called `REPS`. This is designed to be the total amount or work, or repetitions, that the for loop is accomplishing. This particular code is designed so that if those repetitions do not divide equally by the number of processes, then the program will stop with a warning message printed by the master process.

Remember that because this is still also a SPMD program, all processes execute the code in the part of the if statement that evaluates to True. Each process has its own id, and we can determine how many processes there are, so we can choose where in the overall number of REPs of the loop each process will execute.

In [None]:
%%writefile ParallelLoopEqualChunks.java
/* ParallelLoopEqualChunks.java
 * ... illustrates the parallel loop pattern in OpenMPI+Java,
 *      in which processes perform the loop's iterations in equal-sized 'chunks'
 *      (preferable when loop iterations access memory/cache locations) ...
 *
 * Joel Adams, Calvin University, November 2019.
 *  with error-handling logic by Libby Shoop, Macalester College, 2017
 *
 * Usage: mpirun -np 4 java ./ParallelLoopEqualChunks
 *
 * Exercise:
 * - Compile and run, varying N: 1, 2, 4, and 8
 * - Change REPS to 16, save, recompile, rerun, varying N again.
 * - Explain how this pattern divides the iterations of the loop
 *    among the processes.
 * - What if REPS is not evenly divisible by N?
 *    What would be a better way to handle that case?
 */

import mpi.*;

public class ParallelLoopEqualChunks {

 public static final int REPS = 8;
 public static final int MASTER = 0;

 public static void main(String [] args) throws MPIException {
    MPI.Init(args);

    int id           = MPI.COMM_WORLD.getRank();
    int numProcesses = MPI.COMM_WORLD.getSize();
    String message   = "";

    // For this first example, ensure that the REPS can be evenly divided by the
    // number of processes and that the number of processes doesn't exceed REPS.
    // If that is not the case, have the master print an error msg and stop.
    if ((REPS % numProcesses) > 0 || numProcesses > REPS) {
      if (id == MASTER) {
          System.out.print("\nPlease run with -np divisible by and less than or equal to "
                  +  REPS + "\n\n");
      }
    } else {
      int chunkSize = REPS / numProcesses;      // find chunk size
      int start = id * chunkSize;               // find starting index
      int stop = start + chunkSize;             // find stopping index

      for (int i = start; i < stop; i++) {      // iterate through our range
          message = "Process " + id + " is performing iteration " + i + "\n";
          System.out.print(message);
      }
    }

    MPI.Finalize();
  }
}


In [None]:
!javac -cp ./mpi.jar ParallelLoopEqualChunks.java 
!mpirun --allow-run-as-root -np 4 java -cp ./mpi.jar ParallelLoopEqualChunks

### Exercises

- Run, using these numbers of processes, N: 1, 2, 4, and 8 (i.e., vary the  argument to -np).
- Change REPS to 16 in the code and rerun it. Then rerun with mpirun, varying N again.
- Explain how this pattern divides the iterations of the loop among the processes.

Which of the following is the correct assignment of loop iterations to processes for this code, when REPS is 8 and numProcesses is 4?


![picture](https://drive.google.com/uc?id=1eUsjxYdWXWqThO_rdLO91HaLqBLAAh_S)

## Parallel for Loop Program Structure: chunks of 1

In the code below, we again use the variable called `REPS` for the total amount or work, or repetitions, that the for loop is accomplishing. This particular code is designed so that the number of repetitions should be more than or equal to the number of processes requested.
.. note:: Typically in real problems, the number of repetitions is much higher than the number of processes. We keep it small here to illustrate what is happening.

Like the last example all processes execute the code in the part of the if statement that evaluates to True. Note that in the for loop in this case we simply have process whose id is 0 start at iteration 0, then skip to 0 + numProcesses for its next iteration, and so on. Similarly, process 1 starts at iteration 1, skipping next to 1+ numProcesses, and continuing until REPs is reached. Each process performs similar single 'slices' or 'chunks of size 1' of the whole loop.


In [None]:
%%writefile ParallelLoopChunksOf1.java
/* ParallelLoopChunksOf1.java
 * ... illustrates the parallel loop pattern in OpenMPI+Java,
 *      in which processes perform the loop's iterations in 'chunks'
 *      of size 1 (simple, and useful when loop iterations
 *      do not access memory/cache locations) ...
 * Note this is much simpler than the 'equal chunks' loop.
 *
 * Joel Adams, Calvin University, November 2019.
 *
 * Usage: mpirun -np 4 java ./ParallelLoopChunksOf1
 *
 * Exercise:
 * - Compile and run, varying N: 1, 2, 3, 4, 5, 6, 7, 8
 * - Change REPS to 16, save, recompile, rerun, varying N again.
 * - Explain how this pattern divides the iterations of the loop
 *    among the processes.
 */

import mpi.*;

public class ParallelLoopChunksOf1 {

 public static final int REPS = 8;
 public static final int MASTER = 0;

 public static void main(String [] args) throws MPIException {
    MPI.Init(args);

    int id           = MPI.COMM_WORLD.getRank();
    int numProcesses = MPI.COMM_WORLD.getSize();
    String message   = "";

    if (numProcesses > REPS) {
      if (id == MASTER) {
          System.out.print("\nPlease run with -np less than or equal to "
                  +  REPS + "\n\n");
      }
    } else {
      for (int i = id; i < REPS; i += numProcesses) { 
          message = "Process " + id + " is performing iteration " + i + "\n";
          System.out.print(message);
      }
    }

    MPI.Finalize();
  }
}


In [None]:
!javac -cp ./mpi.jar ParallelLoopChunksOf1.java 
!mpirun --allow-run-as-root -np 4 java -cp ./mpi.jar ParallelLoopChunksOf1

### Exercises
- Run, using these numbers of processes, N: 1, 2, 4, and 8
- Compare source code to output.
- Change REPS to 16, save, rerun, varying N again.
- Explain how this pattern divides the iterations of the loop among the processes.

Which of the following is the correct assignment of loop iterations to processes for this code, when REPS is 8 and numProcesses is 4?


![picture](https://drive.google.com/uc?id=1eUsjxYdWXWqThO_rdLO91HaLqBLAAh_S)

# Point to point communication: the message passing pattern

The fundamental basis of coordination between independent processes is point-to-point communication between processes through the communication links in the MPI.COMM_WORLD. The form of communication is called message passing, where one process **sends** data to another one, who in turn must **receive** it from the sender. This is illustrated as follows:

![picture](https://drive.google.com/uc?id=1WJcOXq6Dn5TKF9Lng8r18_y2b23tHFe8)

## Message Passing Pattern: Key Problem

The following code represents a common error that many programmers have inadvertently placed in their code. The concept behind this program is that we wish to use communication between pairs of processes, like this:

![picture](https://drive.google.com/uc?id=1UJ2acj6XzphD2W6gnutNF2YGp6wU529Z)

For message passing to work between a pair of processes, one must send and the other must receive. If we wish to **exchange** data, then each process will need to perform both a send and a receive.
The idea is that process 0 will send data to process 1, who will receive it from process 0. Process 1 will also send some data to process 0, who will receive it from process 1. Similarly, processes 2 and 3 will exchange messages: process 2 will send data to process 3, who will receive it from process 2. Process 3 will also send some data to process 2, who will receive it from process 3.

If we have more processes, we still want to pair up processes together to exchange messages. The mechanism for doing this is to know your process id. If your id is odd (1, 3 in the above diagram), you will send and receive from your neighbor whose id is id - 1. If your id is even (0, 2), you will send and receive from your neighbor whose id is id + 1. This should work even if we add more than 4 processes, as long as the number of processes is divisible by 2.

![warning sign](https://drive.google.com/uc?id=1SEqDBTBSKwNVXzn-zueWa7fBCm5b1_MB)
**Warning** There is a problem with the following code called *deadlock*. This happens when every process is waiting on an action from another process. The program cannot complete. **To stop the program, choose the small square that appears after you choose to run the mpirun cell.**


In [None]:
%%writefile MessagePassingDeadlock.java
/* MessagePassing.java
 * ... illustrates how use of MPI's send and receive commands
 *      can lead to deadlock.
 *
 * Goal: Have MPI processes pair up and exchange their id numbers.
 *
 * Note: Values are sent/received in Java using arrays or buffers.
 *       Buffers are preferred b/c they work for all communication calls.
 *
 * Joel Adams, Calvin University, November 2019,
 *  with error-handling from Hannah Sonsalla, Macalester College 2017.
 *
 * Usage: mpirun -np 4 java ./MessagePassing
 *
 * Exercise:
 * - Compile, then run using 1 process, then 2 processes.
 *    (Use Cntl-c to terminate.)
 * - Use source code to trace execution.
 * - Why does this fail?

 */

import mpi.*;
import java.nio.IntBuffer;

public class MessagePassingDeadlock {

 public static void main(String [] args) throws MPIException {
    MPI.Init(args);

    Comm comm         = MPI.COMM_WORLD;
    int numProcesses  = comm.getSize();
    int id            = comm.getRank();

    if ( numProcesses <= 1 || (numProcesses % 2) != 0)  {
        if (id == MASTER) {
            System.out.print("\nPlease run this program using -np N where N is positive and even.\n\n");
        }
    } else {
        IntBuffer sendBuf = MPI.newIntBuffer(1);
        sendBuf.put(id);
        IntBuffer receiveBuf = MPI.newIntBuffer(1);

        if ( odd(id) ) { // odd processes receive from their 'left' neighbor, then send
            comm.recv(receiveBuf, 1, MPI.INT, id-1, 0); 
            comm.send(sendBuf, 1, MPI.INT, id-1, 0);
        } else {         // even processes receive from their 'right' neighbor, then send
            comm.recv(receiveBuf, 1, MPI.INT, id+1, 0); 
            comm.send(sendBuf, 1, MPI.INT, id+1, 0);
        }

        String message = "Process " + id + " sent '" + sendBuf.get(0)
                         + "' and received '" + receiveBuf.get(0) + "'\n";
        System.out.print(message);
    }

    MPI.Finalize();
  }

  public static boolean odd(int number) { return number % 2 != 0; }

  private static final int MASTER = 0;
}


In [None]:
!javac -cp ./mpi.jar MessagePassingDeadlock.java 
!mpirun --allow-run-as-root -np 4 java -cp ./mpi.jar MessagePassingDeadlock

![warning sign](https://drive.google.com/uc?id=1SEqDBTBSKwNVXzn-zueWa7fBCm5b1_MB)Remember,**To stop the program, choose the small square that appears after you choose to run the mpirun cell.**

#### What causes the deadlock?

Each process, regardless of its id, will execute a receive request first. In this model, recv is a **blocking** function- it will not continue until it gets data from a send. So every process is blocked waiting to receive a message.

#### Can you think of how to fix this problem?

Since recv is a **blocking** function, we need to have some processes send first, while others correspondingly recv first from those who send first. This provides coordinated exchanges.

Go to the next example to see the solution.


## Message Passing Patterns: avoiding deadlock

Let's look at a few more correct message passing examples.

### Fix the Deadlock

To fix deadlock of the previous example, we coordinate the communication between pairs of processes so that there is an ordering of sends and receives between them.

![Important symbol](https://drive.google.com/uc?id=1AWRLAqeaqi7SG7PHyOVywZRuMDK9Z2_s)**Important:** The new code corrects deadlock with a simple change: odd process sends first, even process receives first. *This is the proper pattern for exchanging data between pairs of processes.*

In [None]:
%%writefile MessagePassing.java
/* MessagePassing.java
 * ... illustrates the use of MPI's send and receive commands,
 *      using OpenMPI's Java interface.
 *
 * Goal: Have MPI processes pair up and exchange their id numbers.
 *
 * Note: Values are sent/received in Java using arrays or buffers.
 *       Buffers are preferred b/c they work for all communication calls.
 *
 * Joel Adams, Calvin University, November 2019,
 *  with error-handling from Hannah Sonsalla, Macalester College 2017.
 *
 * Usage: mpirun -np 4 java ./MessagePassing
 *
 * Exercise:
 * - Compile and run, using N = 4, 6, 8, and 10 processes.
 * - Use source code to trace execution.
 * - Explain what each process:
 * -- sends
 * -- receives
 * -- outputs.
 * - Run using N = 5 processes. What happens?
 */

import mpi.*;
import java.nio.IntBuffer;

public class MessagePassing {

 public static void main(String [] args) throws MPIException {
    MPI.Init(args);

    Comm comm         = MPI.COMM_WORLD;
    int numProcesses  = comm.getSize();
    int id            = comm.getRank();

    if ( numProcesses <= 1 || (numProcesses % 2) != 0)  {
        if (id == MASTER) {
            System.out.print("\nPlease run this program using -np N where N is positive and even.\n\n");
        }
    } else {
        IntBuffer sendBuf = MPI.newIntBuffer(1);
        sendBuf.put(id);
        IntBuffer receiveBuf = MPI.newIntBuffer(1);

        if ( odd(id) ) { // odd processes send, then receive
            comm.send(sendBuf, 1, MPI.INT, id-1, 0);
            comm.recv(receiveBuf, 1, MPI.INT, id-1, 0); 
        } else {         // even processes receive then send
            comm.recv(receiveBuf, 1, MPI.INT, id+1, 0); 
            comm.send(sendBuf, 1, MPI.INT, id+1, 0);
        }

        String message = "Process " + id + " sent '" + sendBuf.get(0)
                         + "' and received '" + receiveBuf.get(0) + "'\n";
        System.out.print(message);
    }

    MPI.Finalize();
  }

  public static boolean odd(int number) { return number % 2 != 0; }

  private static final int MASTER = 0;
}


In [None]:
!javac -cp ./mpi.jar MessagePassing.java 
!mpirun --allow-run-as-root -np 4 java -cp ./mpi.jar MessagePassing

### Exercise

- Run, using N = 4, 6, 8, and 10 processes. (Note what happens if you use an odd number instead.)


## Sending data structures
This next example illustrates that we can exchange different Buffers of data between processes.


In [None]:
%%writefile MessagePassing2.java
/* MessagePassing2.java
 * ... illustrates the use of MPI's send and receive commands
 *      to send Strings via CharBuffers, using OpenMPI's Java interface.
 *
 * Goal: Have MPI processes pair up and exchange their host-names.
 *
 * Note: Values are sent/received in Java using arrays or buffers.
 *       Buffers are preferred as they work for both blocking and
 *        non-blocking communication calls.
 *       This example uses chars but the same approach works with numbers.
 *
 * Joel Adams, Calvin University, November 2019;
 *  error-handling adapted from Hannah Sonsalla, Macalester College 2017.
 *
 * Usage: mpirun -np 4 java ./MessagePassing2
 *
 * Exercise:
 * - Compile and run, using N = 1, 2, 4, and 8 processes.
 * - Use source code to trace execution.
 * - Compare to MessagePassing.java; note send-receive differences.
 */

import mpi.*;
import java.nio.CharBuffer;

public class MessagePassing2 {

 public static void main(String [] args) throws MPIException {
    MPI.Init(args);

    Comm comm         = MPI.COMM_WORLD;
    int numProcesses  = comm.getSize();
    int id            = comm.getRank();

    if ( numProcesses <= 1 || (numProcesses % 2) != 0)  {
        if (id == MASTER) {
            System.out.print("\nPlease run this program using -np N where N is positive and even.\n\n");
        }
        MPI.Finalize();
        System.exit(0);
    } 

    String hostName   = MPI.getProcessorName();
    CharBuffer sendBuf = MPI.newCharBuffer(BUFFER_SIZE);
    //sendBuf.put(hostName);  // this builds and is supposed to work but doesn't,
                               // (UTF-16 vs UTF-8?) so we'll do it the long way
    for (int i = 0; i < hostName.length(); ++i) {
         sendBuf.put(i, hostName.charAt(i));
    }

    CharBuffer receiveBuf = MPI.newCharBuffer(BUFFER_SIZE);
    Status status;

    if ( odd(id) ) { // odd processes send, then receive
        comm.send(sendBuf, hostName.length(), MPI.CHAR, id-1, 0);
        status = comm.recv(receiveBuf, BUFFER_SIZE, MPI.CHAR, id-1, 0); 
    } else {         // even processes receive then send
        status = comm.recv(receiveBuf, BUFFER_SIZE, MPI.CHAR, id+1, 0); 
        comm.send(sendBuf, hostName.length(), MPI.CHAR, id+1, 0);
    }

    String sentString = sendBuf.toString();
    String receivedString = receiveBuf.toString();
    String message = "Process " + id + " sent '" + hostName
                      + "' and received '" + receivedString + "'\n";
    System.out.print(message);

    MPI.Finalize();
  }

  private static boolean odd(int number) { return number % 2 != 0; }

  private static final int MASTER = 0;
  private static final int BUFFER_SIZE = 256;
}


In [None]:
!javac -cp ./mpi.jar MessagePassing2.java 
!mpirun --allow-run-as-root -np 4 java -cp ./mpi.jar MessagePassing2

### Exercise

- Run, using N = 4, 6, 8, and 10 processes. 
- In the above code, locate where the Buffer of elements to be sent is being made by each process. What is different about each Buffer per process?


## Ring of passed messages
Another pattern that appears in message passing programs is to use a ring of processes, where messages get sent in this fashion:

![picture of ring of message passing](https://drive.google.com/uc?id=16VMF9t8nD3JcVehFvs4dbzIiU5eDuZbG)

When we have 4 processes, the idea is that process 0 will send data to process 1, who will receive it from process 0 and then send it to process 2, who will receive it from process 1 and then send it to process 3, who will receive it from process 2 and then send it back around to process 0.

In [None]:
%%writefile MessagePassing3.java
/* MessagePassing3.java
 * ... illustrates the use of MPI's send and receive commands
 *      in combination with the master-worker pattern.
 *
 * Goal: The master process sends its id to process 1
 *        and receives a buffer of ids from process N-1.
 *       Every other process i receives a buffer of ids from process i-1,
 *        appends its id to the buffer, and sends the buffer to process (i+1)%N.
 *
 * Joel Adams, Calvin University, November 2019,
 *
 * Usage: mpirun -np 4 java ./MessagePassing3
 *
 * Exercise:
 * - Compile and run, varying N from 1-8.
 * - Explain the behavior you observe.
 */

import mpi.*;
import java.nio.IntBuffer;

public class MessagePassing3 {

 public static void main(String [] args) throws MPIException {
    MPI.Init(args);

    Comm comm         = MPI.COMM_WORLD;
    int numProcesses  = comm.getSize();
    int id            = comm.getRank();

    if ( numProcesses <= 1 )  {
        if (id == MASTER) {
            System.out.print("\nPlease run this program using -np N where N is at least 2.\n\n");
        }
        MPI.Finalize();
        System.exit(0);
    } 

    IntBuffer sendBuf = MPI.newIntBuffer(BUFFER_SIZE);
    IntBuffer receiveBuf = MPI.newIntBuffer(BUFFER_SIZE);
    Status status;

    if ( id == MASTER ) {                              // MASTER:
        sendBuf.put(0, id);                            // 1. put id in buffer
        comm.send(sendBuf,                             // 2. send: buffer,
                        1,                             //          number of values,
                        MPI.INT,                       //          type of values,
                        id+1,                          //          destination id,
                        0);                            //          tag.
        status = comm.recv(receiveBuf,                 // 3. recv: buffer,
                            BUFFER_SIZE,               //          buffer capacity,
                            MPI.INT,                   //          type of values,
                            numProcesses-1,            //          sender id,
                            0);                        //          tag.
        int valuesReceived = status.getCount(MPI.INT); // 4. how many did we get?
        output(receiveBuf, valuesReceived);            // 5. output what we got.
    } else {                                           // WORKERS:
        status = comm.recv(receiveBuf,                 // 1. receive: buffer,
                            BUFFER_SIZE,               //             buffer capacity,
                            MPI.INT,                   //             type of values,
                            id-1,                      //             sender id,
                             0);                       //             tag.
        int valuesReceived = status.getCount(MPI.INT); // 2. how many did we get?
        output(receiveBuf, valuesReceived);            // 3. output what we got.
        receiveBuf.put(valuesReceived, id);            // 4. append id to buffer
        comm.send(receiveBuf,                          // 5. send: buffer,
                   valuesReceived+1,                   //          number of values,
                   MPI.INT,                            //          type of values,
                   (id+1) % numProcesses,              //          destination id,
                    0);                                //          tag.
    }

    MPI.Finalize();
  }

  /* utility to print an IntBuffer with descriptive labels.
   * @param: buf, an IntBuffer.
   * @param: size, the number of ints in IntBuffer
   *          (b/c IntBuffer has no length() method,
   *            whose bright idea was that?).
   * POST: The ints in buf have been displayed on System.out,
   *        preceded by spaces, and with a newline at the end.
   */
  private static void output(IntBuffer buf, int size) throws MPIException {
      System.out.printf("Process %d of %d received:",
                          MPI.COMM_WORLD.getRank(),
                          MPI.COMM_WORLD.getSize());
      for (int i = 0; i < size; ++i) {
          System.out.print( " " );
          System.out.print( buf.get(i) );
      }
      System.out.print("\n");
  }

  private static final int MASTER = 0;
  private static final int BUFFER_SIZE = 256;
}


In [None]:
!javac -cp ./mpi.jar MessagePassing3.java 
!mpirun --allow-run-as-root -np 4 java -cp ./mpi.jar MessagePassing3

### Exercises
- Run, using N = from 1 through 8 processes.
- Make sure that you can trace how the code generates the output that you see.
- How is the finishing of the 'ring' completed, where the last process determines that it should send back to process 0?

# Collective Communication: Broadcast pattern
There are many cases when a master process obtains or creates data that needs to be sent to all of the other processes. There is a special pattern for this called **broadcast**. You will see examples of the master sending different types of data to each of the other processes.

## Broadcast from master to workers

We will look at three types of data that can be created in the master and sent to the workers. Rather than use send and receive, we will use a special new function called bcast.

![Important symbol](https://drive.google.com/uc?id=1AWRLAqeaqi7SG7PHyOVywZRuMDK9Z2_s) **Note:** In each code example, note how the master does one thing, and the workers do another, but **all of the processes execute the bcast function.**


### Broadcast an integer

Find the place in this code where the data is being broadcast to all of the processes. Match the prints to the output you observe when you run it.

In [None]:
%%writefile Broadcast.java
/* Broadcast.java
 * ... illustrates the use of MPI's broadcast command with a scalar value.
 *
 * Note: This version uses an IntBuffer of length 1 to store the scalar.
 *
 * Goal: The master process reads an 'answer' value from a file
 *        and broadcasts it to all the other processes.
 *       Each process outputs its 'answer' value before and after
 *        the broadcast.
 *
 * Joel Adams, Calvin University, November 2019,
 *
 * Usage: mpirun -np 4 java ./Broadcast
 *
 * Exercise:
 * - Compile, then run several times,
 *     using 2, 4, and 8 processes
 * - Use source code to trace execution and output
 *     (noting contents of file "data.txt");
 * - Explain behavior/effect of MPI_Bcast().
 */

import java.io.File;
import java.io.FileNotFoundException;
import java.util.Scanner;
import java.nio.IntBuffer;
import mpi.*;

public class Broadcast {

 public static void main(String [] args) throws MPIException {
    MPI.Init(args);

    Comm comm         = MPI.COMM_WORLD;
    int id            = comm.getRank();

    IntBuffer answerBuf = MPI.newIntBuffer(1);

    if ( id == MASTER ) {                      // MASTER: read data from file
        int answer = 42;
        answerBuf.put(answer);
    }

    String beforeMsg = "BEFORE the broadcast, process " + id
                       + "'s answer is: " + answerBuf.get(0) + "\n";
    System.out.print(beforeMsg);               // all: output 'before' values

    printSeparator("----", id);

    comm.bcast(answerBuf, 1, MPI.INT, 0);      // all: participate in broadcast

    String afterMsg = "AFTER the broadcast, process " + id
                       + "'s answer is: " + answerBuf.get(0) + "\n";
    System.out.print(afterMsg);                // all: output 'after' values

    MPI.Finalize();
  }

  /* utility to print a separator string between the 'before' and 'after' parts.
   * @param: separator, a String.
   * @param: id, the rank of this MPI process.
   * POST: the master has printed the separator to System.out.
   */
  public static void printSeparator(String separator, int id) throws MPIException {
     MPI.COMM_WORLD.barrier();
     if (id == MASTER) { System.out.println(separator); }
     MPI.COMM_WORLD.barrier();
  }

  private static final int MASTER = 0;
}


In [None]:
!javac -cp ./mpi.jar Broadcast.java 
!mpirun --allow-run-as-root -np 8 java -cp ./mpi.jar Broadcast

#### Exercise
- Run, using N = from 1 through 8 processes.

### Broadcast user input

The following program will take extra input that will get broadcast to all processes.

In [None]:
%%writefile BroadcastUserInput.java
/* Broadcast.java
 * ... illustrates the use of MPI's broadcast command 
 *     to broadcast a value entered from the commandline.
 *
 * Note: This version uses an IntBuffer of length 1 to store the scalar.
 *
 * Goal: The master process "reads" an input value from the commandline
 *        and broadcasts it to all the other processes.
 *       Each process outputs its value before and after
 *        the broadcast.
 *
 * Original C version by Hannah Sonsalla, Macalester College 2017,
 * Java version by Joel Adams, Calvin University, November 2019.
 *
 * Usage: mpirun -np 4 java ./BroadcastUserInput <integer>
 *
 * Exercise:
 * - Compile and run several times, varying the number of 
 *    MPI processes and input value.
 * - Explain behavior you observe.
 */

import java.nio.IntBuffer;
import mpi.*;

public class BroadcastUserInput {

 public static void main(String [] args) throws MPIException {
    MPI.Init(args);

    Comm comm         = MPI.COMM_WORLD;
    int id            = comm.getRank();
    int numProcesses  = comm.getSize();
    String hostName   = MPI.getProcessorName();

    IntBuffer answerBuf = MPI.newIntBuffer(1); // allocate buffer

    if (id == MASTER) {
        getInput(args, answerBuf);             // MASTER: fill buffer
    }

    String beforeMsg = "BEFORE the broadcast, the answer of process " + id
                       + " on host " + hostName 
                       + " is " + answerBuf.get(0) + "\n";
    System.out.print(beforeMsg);               // all: output 'before' values

    comm.bcast(answerBuf, 1, MPI.INT, 0);      // all: participate in broadcast

    printSeparator("----", id);

    String afterMsg = "AFTER the broadcast, the answer of process " + id
                       + " on host " + hostName 
                       + " is: " + answerBuf.get(0) + "\n";
    System.out.print(afterMsg);                // all: output 'after' values

    MPI.Finalize();
  }

  /* utility to hide details of having the master read an int value
   *  from the commandline (can be adapted to read from anywhere else).
   *
   * @param: args, a String array containing the commandline arguments.
   * @param: buf, an IntBuffer in which to the input value is to be stored.
   *
   * PRE: args[0] contains an integer input value (as a String).
   * POST: buf contains the integer from args[0], or else a default value.
   */
   private static void getInput(String [] args, IntBuffer buf) {
        int result = 0;
        if (args.length >= 1) {
            result = Integer.parseInt( args[0] );
        } else {
            System.err.println("\nUsage: mpirun -np <N> java BroadcastUserInput <integer>\n");
        }
        buf.put(result);
    } 

  /* utility to print a separator between the 'before' and 'after' parts.
   * @param: separator, a String.
   * @param: id, the rank of this MPI process.
   * POST: separator has been printed to System.out.
   */
  public static void printSeparator(String separator, int id) throws MPIException {
     MPI.COMM_WORLD.barrier();
     if (id == MASTER) { System.out.println(separator); }
     MPI.COMM_WORLD.barrier();
  }


  private static final int MASTER = 0;
}


![warning sign](https://drive.google.com/uc?id=1SEqDBTBSKwNVXzn-zueWa7fBCm5b1_MB)
**Warning** This program is unlike any of the others and takes in a second argument, as shown below. 

In [None]:
!javac -cp ./mpi.jar BroadcastUserInput.java 
!mpirun --allow-run-as-root -np 4 java -cp ./mpi.jar BroadcastUserInput 6222021

#### Exercise
- Run, using N = from 1 through 8 processes, with an integer of your choosing.

### Broadcast a Buffer

This is just one more example to show that other data structures can also be broadcast from the master to all worker processes.

In [None]:
%%writefile BroadcastArray.java
/* BroadcastArray.java
 * ... illustrates the use of MPI's broadcast command with multiple values.
 *
 * Note: This version uses an array to store the values.
 *
 * Goal: The master process fills an array with values
 *        and broadcasts it to all the other processes.
 *       Each process outputs its array before and after
 *        the broadcast.
 *
 * Joel Adams, Calvin University, November 2019,
 *
 * Usage: mpirun -np 4 java ./Broadcast2
 *
 * Exercise:
 * - Compile, then run several times,
 *     using 2, 4, and 8 processes
 * - Use source code to trace execution and output
 * - Explain behavior/effect of MPI_Bcast().
 */

import mpi.*;

public class BroadcastArray {

 public static void main(String [] args) throws MPIException {
    MPI.Init(args);

    Comm comm         = MPI.COMM_WORLD;
    int id            = comm.getRank();

    int [] array = new int[ARRAY_SIZE];            // all: allocate array 

    if ( id == MASTER ) {                          // MASTER: fill its array
        fill(array);
    }

    print("BEFORE", id, array);                    // all: print buffers before

    printSeparator("----", id, comm);

    comm.bcast(array, array.length, MPI.INT, 0);   // all: participate in broadcast

    print("AFTER", id, array);                     // all: print buffers after

    MPI.Finalize();
  }


  /* utility to fill an array with some values.
   * @param: a, an int array.
   * POST: a has been filled with int values.
   */
  private static void fill(int [] a) {
     for (int i = 0; i < a.length; ++i) {
         a[i] = i + 11;
     }
  }

  /* utility to print a buffer with descriptive labels.
   * @param: label, a String.
   * @param: id, this process's MPI rank.
   * @param, a, an int array.
   * POST: label, id, and a have been displayed via System.out.
   */
  private static void print(String label, int id, int [] a) {
    String msg = label + " the broadcast, process " + id
                       + "'s array contains:";
    for (int i = 0; i < a.length; ++i) {
      msg += (" " + a[i]);
    }
    msg += "\n";
    System.out.print(msg); 
  }
 
  /* utility to print a separator string between the 'before' and 'after' parts.
   * @param: separator, a String.
   * @param: id, the rank of this MPI process.
   * @param: comm, the Communicator for the processes involved.
   * POST: the master has printed the separator to System.out.
   */
  public static void printSeparator(String separator, int id, Comm comm) 
                                      throws MPIException {
     comm.barrier();
     if (id == MASTER) { System.out.println(separator); }
     comm.barrier();
  }

  private static final int MASTER = 0;
  private static final int ARRAY_SIZE = 8;
}


In [None]:
!javac -cp ./mpi.jar BroadcastArray.java 
!mpirun --allow-run-as-root -np 4 java -cp ./mpi.jar BroadcastArray

#### Exercise
- Run, using N = from 1 through 8 processes.


# Collective Communication: reduction pattern

There are often cases when every process needs to complete a partial result of an overall computation. For example if you want to process a large set of numbers by summing them together into one value (i.e. *reduce* a set of numbers into one value, its sum), you could do this faster by having each process compute a partial sum, then have all the processes communicate to add each of their partial sums together.

This is so common in parallel processing that there is a special collective communication function called **reduce** that does just this.

## Collective Communication: reduce function

The type of reduction of many values down to one can be done with different types of operators on the set of values computed by each process.


### Reduce all values using sum and max
In this example, every process computes the square of (id+1). Then all those values are summed together and also the maximum function is applied.

In [None]:
%%writefile Reduction.java
/* Reduction.java
 * ... illustrates how to use the reduction pattern 
 *     (which combines distributed values in O(lg(P)) time)
 *      using OpenMPI's Java interface.
 *
 * Joel Adams, Calvin University, November 2019.
 *
 * Usage: mpirun -np 4 java ./Reduction
 *
 * Exercise:
 * - Compile and run, varying N = 1, 2, 3, 4, 6, 8, 10.
 * - Explain behavior of the reduce operation.
 */

import mpi.*;
import java.nio.IntBuffer;

public class Reduction {

 public static void main(String [] args) throws MPIException {
    MPI.Init(args);

    Comm comm        = MPI.COMM_WORLD;
    int id           = comm.getRank();
    int numProcesses = comm.getSize();

    int square       = (id+1) * (id+1);
    IntBuffer squareBuffer = MPI.newIntBuffer(BUFFER_SIZE);
    squareBuffer.put(square);

    IntBuffer sumSquaresBuffer = MPI.newIntBuffer(BUFFER_SIZE);
    comm.reduce(squareBuffer, sumSquaresBuffer, BUFFER_SIZE,
                 MPI.INT, MPI.SUM, MASTER);

    IntBuffer maxBuffer = MPI.newIntBuffer(BUFFER_SIZE);
    comm.reduce(squareBuffer, maxBuffer, BUFFER_SIZE,
                 MPI.INT, MPI.MAX, MASTER);

    if ( id == MASTER) {
        String squareMsg = "\nThe sum of the squares from 1 to "
                            + numProcesses + " is " 
                            + sumSquaresBuffer.get(0) + "\n\n";
        String maxMsg    = "The max of the squares from 1 to "
                            + numProcesses + " is " 
                            + maxBuffer.get(0) + "\n\n";
        System.out.print(squareMsg);
        System.out.print(maxMsg);
    }

    MPI.Finalize();
  }

  private static int BUFFER_SIZE = 1;
  private static int MASTER      = 0;
}


In [None]:
!javac -cp ./mpi.jar Reduction.java
!mpirun --allow-run-as-root -np 4 java -cp ./mpi.jar Reduction

#### Exercises
- Run, using N = from 1 through 8 processes.
- Try replacing MPI.MAX with MPI.MIN(minimum) and/or replacing MPI.SUM with MPI.PROD (product). Then save and run the code again.


### Reduction on a Buffer of values

We can try reduction with a Buffer of values; note this in the following example. Then note how you can change the semantics in the exercises.


In [None]:
%%writefile Reduction2.java
/* Reduction2.java
 * ... illustrates the reduction pattern on multiple values,
 *      using buffers in OpenMPI's Java interface.
 *
 * Joel Adams, Calvin University, November 2019.
 *
 * Usage: mpirun -np 4 java ./Reduction2
 *
 * Exercise:
 * - Compile, then run with N = 1, 2, 3, 4, 
 *     comparing output to source code.
 * - Explain behavior of reduce() in terms of
 *     srcBuf and destBuf.
 */

import mpi.*;
import java.nio.IntBuffer;

public class Reduction2 {

 public static void main(String [] args) throws MPIException {
    MPI.Init(args);

    Comm comm        = MPI.COMM_WORLD;
    int id           = comm.getRank();
    int numProcesses = comm.getSize();

    IntBuffer srcBuf = MPI.newIntBuffer(BUFFER_SIZE);
    IntBuffer destBuf = MPI.newIntBuffer(BUFFER_SIZE);

    if (id == MASTER) {
        System.out.print("\nBefore reduction: ");
        printBuf(id, "destBuf", destBuf); 
    }

    for (int i = 0; i < BUFFER_SIZE; ++i) {
        srcBuf.put(i, id * i);
    }

    printSeparator("", id);
    printBuf(id, "srcBuf", srcBuf);
    printSeparator("----", id);

    comm.reduce(srcBuf, destBuf, BUFFER_SIZE,
                 MPI.INT, MPI.SUM, MASTER);

    if ( id == MASTER) {
        System.out.print("After reduction: ");
        printBuf(id, "destBuf", destBuf);
        System.out.println();
    }

    MPI.Finalize();
  }

  /* utility to display the contents of an IntBuffer.
   * @param: id, the int MPI rank of this process.
   * @param: bufName, a String that is the name of the buffer.
   * @param: buf, the IntBuffer.
   * @param: size, the size of buf.
   */
  private static void printBuf(int id, String bufName, IntBuffer buf) {
      String msg = "Process " + id + ", " + bufName + ": [";
      int size = buf.capacity();
      int sizeLessOne = size - 1;
      for (int i = 0; i < size; ++i) {
         msg += buf.get(i);
         if (i < sizeLessOne ) {
             msg += ",";
         }
      }
      msg += "]\n";
      System.out.print(msg);
  }

  /* utility to print a separator between before and after sections.
   * @param: separator, a String.
   * @param: id, the MPI rank of this process. 
   * POST: separator has been printed by the master process.
   */
  private static void printSeparator(String separator, int id) throws MPIException {
     MPI.COMM_WORLD.barrier();
     if (id == MASTER) { System.out.println(separator); }
     MPI.COMM_WORLD.barrier();
  }

  private static int BUFFER_SIZE = 5;
  private static int MASTER      = 0;
}


In [None]:
!javac -cp ./mpi.jar Reduction2.java 
!mpirun --allow-run-as-root -np 4 java -cp ./mpi.jar Reduction2

#### Exercises
- Run, using N = from 1 through 4 processes.
- Uncomment the two lines of runnable code that are commented in the main() function. Observe the new results and explain why the MPI.SUM (using the + operator underneath) behaves the way it does on Buffers, and what the new function called sumListByElements is doing instead.

![Important symbol](https://drive.google.com/uc?id=1AWRLAqeaqi7SG7PHyOVywZRuMDK9Z2_s) **Note:** There are two ways in Python that you might want to sum a set of Buffers from each process: 1) concatenating the elements together, or 2) summing the element at each location from each process and placing the sum in that location in a new Buffer. In the latter case, the new Buffer is the same length as the original Buffer on each process.


# Collective Communication: scatter and gather pattern

There are often cases when each process can work on some portion of a larger data structure. This can be carried out by having the master process maintain the larger structure and send parts to each of the worker processes, keeping part of the structure on the master. Each process then works on their portion of the data, and then the master can get the completed portions back.

This is so common in message passing parallel processing that there are two special collective communication functions called **scatter** and **gather** that handle this.


## Collective Communication: scatter and gather Buffers

When several processes need to work on portions of a data structure, such as a buffer of buffers or a 1-d or 2-d array, at various points in a program, a way to do this is to have one node, usually the master, divide the data structure and send portions to each of the other processes, often keeping one portion for itself. Each process then works on that portion of the data, and then the master can get the completed portions back. This type of coordination is so common that MPI has special patterns for it called **scatter** and **gather**.


### Scatter Buffers
The following diagrams illustrate how scattering a Buffer of Buffersworks. The master contains a Buffer of Buffers and all processes participate in the scatter:

![scatter lists diagram](https://drive.google.com/uc?id=1QDRW2JeAa_TelKxZTphCPF393Bxn_BbL)

After the scatter is completed, each process has one of the smaller Buffers to work on, like this:

![after scatter lists diagram](https://drive.google.com/uc?id=1xA2NRtm1k4_g16tJTWBFCVArKLfIEurc)

In this next code example, some small Buffers are created in a Buffer whose length is as long as the number of processes.

![Important symbol](https://drive.google.com/uc?id=1AWRLAqeaqi7SG7PHyOVywZRuMDK9Z2_s) **Note:** In the code below, note how all processes must call the scatter function.

In [None]:
%%writefile Scatter.java
/* Scatter.java
 * ... illustrates the basic scatter pattern 
 *      using buffers in OpenMPI's Java interface.
 *
 * Note: If the number of values being scattered is not
 *       evenly divisible by the number of processes,
 *       use scatterv() instead of scatter.
 *
 * Joel Adams, Calvin University, November 2019.
 *
 * Usage: mpirun -np 4 java ./Scatter
 *
 * Exercise:
 * - Compile, then run with N = 1, 2, 4, 8. 
 * - Trace execution through source code. 
 * - Explain behavior/effect of scatter. 
 * - What if BUFFER_SIZE is not evenly divisible by N?
 */

import mpi.*;
import java.nio.IntBuffer;

public class Scatter {

 public static void main(String [] args) throws MPIException {
    MPI.Init(args);

    Comm comm         = MPI.COMM_WORLD;
    int id            = comm.getRank();
    int numProcesses  = comm.getSize();

    if (numProcesses > BUFFER_SIZE) {
        if (id == MASTER) {
            System.out.println("\nPlease run this program with N <= 8 processes\n");
        }
        MPI.Finalize();
        System.exit(0);
    }
    IntBuffer sendBuf = null;
    IntBuffer recvBuf = null;

    if (id == MASTER) {
        sendBuf = MPI.newIntBuffer(BUFFER_SIZE);
        for (int i = 0; i < BUFFER_SIZE; ++i) {
           sendBuf.put(i, (i+1) * 11);
        }
        System.out.print("\nBefore scatter: ");
        printBuf(id, "sendBuf", sendBuf);
    }
 
    int numSent = BUFFER_SIZE / numProcesses;
       
    comm.barrier();                    // see comment on next barrier

    recvBuf = MPI.newIntBuffer(numSent);
    printBuf(id, "recvBuf", recvBuf);

    printSeparator("----", id);

    comm.scatter(sendBuf, numSent, MPI.INT, 
                  recvBuf, numSent, MPI.INT, MASTER); 

    if (id == MASTER) {
        System.out.print("After scatter:\n");
    }
    comm.barrier();                    // all of these barriers are here
    printBuf(id, "recvBuf", recvBuf);  //  just to make the output easier
    comm.barrier();                    //  to read; no effect on correctness
    if (id == MASTER) {
        System.out.println();
    }

    MPI.Finalize();
  }

  /* utility to display the contents of an IntBuffer.
   * @param: id, the int MPI rank of this process.
   * @param: bufName, a String that is the name of the buffer.
   * @param: buf, the IntBuffer.
   * @param: size, the size of buf.
   */
  private static void printBuf(int id, String bufName, IntBuffer buf) {
      String msg = "Process " + id + ", " + bufName + ": [";
      int size = buf.capacity();
      int sizeLessOne = size - 1;
      for (int i = 0; i < size; ++i) {
         msg += buf.get(i);
         if (i < sizeLessOne ) {
             msg += ",";
         }
      }
      msg += "]\n";
      System.out.print(msg);
  }

  /* utility to print a separator between before and after sections.
   * @param: separator, a String.
   * @param: id, the MPI rank of this process. 
   * POST: separator has been printed by the master process.
   */
  private static void printSeparator(String separator, int id) throws MPIException {
     MPI.COMM_WORLD.barrier();
     if (id == MASTER) { System.out.print(separator + "\n"); }
     MPI.COMM_WORLD.barrier();
  }

  private static int BUFFER_SIZE = 8;
  private static int MASTER      = 0;
}


In [None]:
!javac -cp ./mpi.jar Scatter.java 
!mpirun --allow-run-as-root -np 4 java -cp ./mpi.jar Scatter

#### Exercises
- Run, using N = from 2 through 8 processes.
- If you want to study the code, explain to yourself what genListofLists does in the code below.


### Gather Lists
Once several processes have their own Buffers of data, those Buffers can also be gathered back together into a Buffer of Buffers, usually in the master process. All processes participate in a gather, like this:

![before gather diagram](https://drive.google.com/uc?id=1OWHNMKCEKsGpExJCO6l5czW9QFyMZiT6)

The gather creates a Buffer of Buffers in the master, like this:

![after gather diagram](https://drive.google.com/uc?id=1W9lky1LY0L0K6iyA00jsNV4hAnmmbvP2)

In this example, each process creates some very small Buffers. Then a gather is used to create a Buffer of Buffers  on the master process.

![Important symbol](https://drive.google.com/uc?id=1AWRLAqeaqi7SG7PHyOVywZRuMDK9Z2_s) **Note:** In the code below, note how all processes must call the gather function.


In [None]:
%%writefile Gather.java
/* Gather.java
 * ... illustrates the basic gather pattern 
 *      using buffers in OpenMPI's Java interface.
 *
 * Note: If the number of values being gathered is not
 *       evenly divisible by the number of processes,
 *       use gatherv() instead of gather.
 *
 * Joel Adams, Calvin University, November 2019.
 *
 * Usage: mpirun -np 4 java ./Gather
 *
 * Exercise:
 * - Compile, then run with N = 1, 2, 3, 4, 5. 
 * - Trace execution through source code. 
 * - Explain behavior/effect of gather. 
 */

import mpi.*;
import java.nio.IntBuffer;

public class Gather {

 public static void main(String [] args) throws MPIException {
    MPI.Init(args);

    Comm comm         = MPI.COMM_WORLD;
    int id            = comm.getRank();
    int numProcesses  = comm.getSize();

    IntBuffer gatherBuf = null;

    if (id == MASTER) {
        int valuesToGather = BUFFER_SIZE * numProcesses;
        gatherBuf = MPI.newIntBuffer(valuesToGather);
        System.out.print("\nBefore gather: ");
        printBuf(id, "gatherBuf", gatherBuf);
    }
 
    IntBuffer computeBuf = MPI.newIntBuffer(BUFFER_SIZE);
    for (int i = 0; i < BUFFER_SIZE; ++i) {
        computeBuf.put(i, id * 10 + i);
    }   
    comm.barrier();                          // These barriers are just here
    printBuf(id, "computeBuf", computeBuf);  // to make the output easier to read;
    comm.barrier();                          // no effect on functional correctness

    printSeparator("----", id);

    comm.gather(computeBuf, BUFFER_SIZE, MPI.INT, 
                  gatherBuf, BUFFER_SIZE, MPI.INT, MASTER); 

    if (id == MASTER) {
        System.out.print("After gather: ");
        printBuf(id, "gatherBuf", gatherBuf); 
        System.out.println();
    }

    MPI.Finalize();
  }

  /* utility to display the contents of an IntBuffer.
   * @param: id, the int MPI rank of this process.
   * @param: bufName, a String that is the name of the buffer.
   * @param: buf, the IntBuffer.
   * @param: size, the size of buf.
   */
  private static void printBuf(int id, String bufName, IntBuffer buf) {
      String msg = "Process " + id + ", " + bufName + ": [";
      int size = buf.capacity();
      int sizeLessOne = size - 1;
      for (int i = 0; i < size; ++i) {
         msg += buf.get(i);
         if (i < sizeLessOne ) {
             msg += ",";
         }
      }
      msg += "]\n";
      System.out.print(msg);
  }

  /* utility to print a separator between before and after sections.
   * @param: separator, a String.
   * @param: id, the MPI rank of this process. 
   * POST: separator has been printed by the master process.
   */
  private static void printSeparator(String separator, int id) throws MPIException {
     MPI.COMM_WORLD.barrier();
     if (id == MASTER) { System.out.print(separator + "\n"); }
     MPI.COMM_WORLD.barrier();
  }

  private static int BUFFER_SIZE = 3;
  private static int MASTER      = 0;
}


In [None]:
!javac -cp ./mpi.jar Gather.java
!mpirun --allow-run-as-root -np 4 java -cp ./mpi.jar Gather

#### Exercises
- Run, using N = from 2 through 8 processes.
- Try with different values of SMALL_LIST_SIZE, perhaps changing printing of result for readability


## Collective Communication:  scatter and gather arrays

The OpenMPI Java bindings offers several collective communication functions that are designed to work with *Buffers* (CharBuffer, IntBuffer, etc.) from the java.nio package.

These Buffers behave like 1-dimensional arrays, where each value in the array is at a particular index. The MPI scatter function can be used to send portions of a larger Buffer on the master to the workers, like this:

![alt text](https://drive.google.com/uc?id=1n2YmY12tBrTxtJK6MFpBX9nWmopQGT_s)

The result of doing this then looks like this, where each process has a portion of the original that they can then work on:

![alt text](https://drive.google.com/uc?id=19GNbTWWJEOU16wNjwzHon4jpC5fXKj_1)

The reverse of this process can be done using the Gather function.

In this example, a 1-D Buffer is created by the master, then scattered to the workers. After the smaller Buffer used by each process is changed, the MPI gather function brings the changed small Buffers back to the master, where they are combined into a single larger Buffer.

![Important symbol](https://drive.google.com/uc?id=1AWRLAqeaqi7SG7PHyOVywZRuMDK9Z2_s) **Note:** In the code below, note how all processes must call the scatter and gather functions.

In [None]:
%%writefile ScatterLoopGather.java
/* ScatterLoopGather.java
 * ... uses MPI to scatter a buffer of values into chunks, 
 *      a loop to process those chunks, and 
 *      a gather to combine the piecemeal values.
 *
 * Goal: The master process fills a buffer with values
 *        and scatters it to all the other processes.
 *       Each process doubles the values in its buffer-chunk.
 *       All processes then gather the chunks back to the master.
 *
 * Joel Adams, Calvin University, November 2019.
 *
 * Note: This assumes BUFFER_SIZE is evenly divisible by N.
 *
 * Usage: mpirun -np 4 java ./BroadcastLoopGather
 *
 * Exercise:
 * - Compile, then run, using 1, 2, 4, and 8 processes
 * - Use source code to trace execution and output
 * - Explain behavior/effect of the scatter and gather.
 * - Optional: change BUFFER_SIZE to be another multiple of 8, such as 16
 */

import java.nio.IntBuffer;
import mpi.*;

public class ScatterLoopGather {

 public static void main(String [] args) throws MPIException {
    MPI.Init(args);

    Comm comm               = MPI.COMM_WORLD;
    int id                  = comm.getRank();
    int numProcesses        = comm.getSize();
    IntBuffer scatterBuffer = null;
    IntBuffer chunkBuffer   = null;
    IntBuffer gatherBuffer  = null;

    if ( BUFFER_SIZE % numProcesses != 0 || numProcesses > BUFFER_SIZE ) {
        String errorMsg = "\nPlease run this program with -np N where N is\n"
                         + " <= " + BUFFER_SIZE + " and divides evenly into "
                         + BUFFER_SIZE + "\n\n";
        System.err.println(errorMsg);
        MPI.Finalize();
        System.exit(0);
    }

    if ( id == MASTER ) { 
        scatterBuffer = MPI.newIntBuffer(BUFFER_SIZE);
        fill(scatterBuffer);
        gatherBuffer = MPI.newIntBuffer(BUFFER_SIZE);
    }

    printBuffers("BEFORE the scatter", id, scatterBuffer, chunkBuffer, gatherBuffer);

    int chunkSize = BUFFER_SIZE / numProcesses;
    chunkBuffer = MPI.newIntBuffer(chunkSize);

    comm.scatter(scatterBuffer, chunkSize, MPI.INT, 
                  chunkBuffer, chunkSize, MPI.INT, MASTER);

    printSeparator("----", id);
    printBuffers("AFTER the scatter", id, scatterBuffer, chunkBuffer, gatherBuffer);

    doubleChunk(chunkBuffer);

    printSeparator("----", id);
    printBuffers("AFTER the doubling", id, scatterBuffer, chunkBuffer, gatherBuffer);

    comm.gather(chunkBuffer, chunkSize, MPI.INT, 
                 gatherBuffer, chunkSize, MPI.INT, MASTER);

    printSeparator("----", id);
    printBuffers("AFTER the gather:", id, scatterBuffer, chunkBuffer, gatherBuffer);

    MPI.Finalize();
  }

  /* utility to fill a Buffer with some values.
   * @param: buf, an IntBuffer.
   * POST: buf has been filled with int values.
   */
  private static void fill(IntBuffer buf) {
     for (int i = 0; i < buf.capacity(); ++i) {
         buf.put(i, i + 11);
     }
  }

  /* utility to print a buffer with labels.
   * @param: label, a String.
   * @param: id, an int containing the MPI rank of this process.
   * @param: sBuf, the IntBuffer the master fills and scatters.
   * @param: cBuf, the IntBuffer for storing a process's chunk.
   * @param: gBuf, the IntBuffer for storing the gathered results.
   * POST: The buffers' contents have been printed, with labels.
   */
  private static void printBuffers(String label, int id, 
                                    IntBuffer sBuf, IntBuffer cBuf, IntBuffer gBuf) {
    String msg = label + ", process " + id
                       + "'s scatterBuffer is: [";
    if (sBuf != null) {
        for (int i = 0; i < sBuf.capacity(); ++i) {
            msg += sBuf.get(i);
            if (i < sBuf.capacity()-1) msg += ",";
        }
    }
    msg += "]\n\t\t\t\tchunkBuffer is: [";
    if (cBuf != null) {
        for (int i = 0; i < cBuf.capacity(); ++i) {
            msg += cBuf.get(i);
            if (i < cBuf.capacity()-1) msg += ",";
        }
    }
    msg += "]\n\t\t\t\tgatherBuffer is: [";
    if (gBuf != null) {
        for (int i = 0; i < gBuf.capacity(); ++i) {
            msg += gBuf.get(i);
            if (i < gBuf.capacity()-1) msg += ",";
        }
    }
    msg += "]\n";
    System.out.print(msg);
  }

  /* utility to double the values in a chunk of an array.
   * @param: fullBuf, an IntBuffer containing all the values.
   * @param: id, the MPI rank of this process.
   * @param: chunkBuf, an IntBuffer into which we will write our values.
   * PRE: chunkBuf.capacity() == BUFFER_SIZE / numProcesses.
   * POST: chunkBuf contains the doubled values of this process's chunk
   *        of fullBuf.
   */
  private static void doubleChunk(IntBuffer chunkBuf) {
      for (int i = 0; i < chunkBuf.capacity(); ++i) {
          int value = chunkBuf.get(i);
          chunkBuf.put(i, value * 2);
      }
  }

  /* utility to print a separator string between the 'before' and 'after' parts.
   * @param: separator, a String.
   * @param: id, the rank of this MPI process.
   * POST: the master has printed the separator to System.out.
   */
  public static void printSeparator(String separator, int id) throws MPIException {
     MPI.COMM_WORLD.barrier();
     if (id == MASTER) { System.out.println(separator); }
     MPI.COMM_WORLD.barrier();
  }

  private static final int MASTER      = 0;
  private static final int BUFFER_SIZE = 8;
}


In [None]:
!javac -cp ./mpi.jar ScatterLoopGather.java 
!mpirun --allow-run-as-root -np 4 java -cp ./mpi.jar ScatterLoopGather

#### Exercises
- Run, using N = from 2 through 8 processes.
- If you want to study the numpy part of the code, look up the numpy function linspace used in genArray().


# When amount of work varies: balancing the load

There are algorithms where the master is used to assign tasks to workers by sending them data and receiving results back as each worker completes a task (or after the worker completes all of its tasks). In many of these cases, the computation time needed by each worker process for each of its tasks can vary somewhat dramatically. This situation is where **dynamic load balancing** can be helpful.

In this example we combine the master-worker pattern with message passing. The master has many tasks that need to be completed. The master starts by sending some data needed to complete a task to each worker process. Then the master loops and waits to hear back from each worker by receiving a message from any of them. When the master receives a message from a worker, it sends that worker more data for its next task, unless there are no more tasks to complete, in which case it sends a special message to the worker to stop running.

In this simple example, each worker is sent the number of seconds it should 'sleep', which can vary from 1 to 8. This illustrates varying sizes of workloads. Because of the code's simplicity, the number of tasks each worker does doesn't vary by much. In some real examples, the time for one task my be quite different than the time for another, which could have a different outcome, in which some workers were able to complete more tasks as others were doing long ones.

This approach can sometimes be an improvement on the assignment of an equal number of tasks to all processes.

Note in this case how the master, whose id is 0, handles the assignment of tasks, while the workers simply do what they are sent until they are told to stop.

In [None]:
%%writefile DynamicLoadBalance.java
/* DynamicLoadBalance.java
 * ... illustrates how to use the dynamic load balancing pattern 
 *     using OpenMPI's Java interface.
 * This code was based on the dynamicLoadBalance.py script,
 * that was originally written by Libby Shoop (Macalester College)
 * 
 * Ruth Kurniawati, Westfield State University, June 2021.
 *
 * Usage: mpirun -np 4 java ./DynamicLoadBalance
 *
 * Exercise:
 * - Compile and run, varying N = 4, 8.
 * - Explain behavior of the dynamic load balancing of the available work
 */

import java.util.Arrays;
import java.util.Random;

import mpi.*;
import java.nio.IntBuffer;

public class DynamicLoadBalance {
    public static final int MASTER = 0;

    // tags that can be applied to messages
    public static final int WORKTAG = 1;
    public static final int DIETAG = 2;

    public static void main(String [] args) throws MPIException {
       MPI.Init(args);
   
       int id           = MPI.COMM_WORLD.getRank();
       int numProcesses = MPI.COMM_WORLD.getSize();
       //String hostName  = MPI.getProcessorName();
   
       if (id == MASTER) {
            // create an arbitrary array of numbers for how long each
            // worker task will 'work', by sleeping that amount of seconds
            int numTasks = (numProcesses-1) * 4; // avg 4 tasks per worker process
            int[] workTimes = genTasks(numTasks);
            System.out.println("master created " + workTimes.length + " values for sleep times:" + Arrays.toString(workTimes));
            handOutWork(MPI.COMM_WORLD, workTimes, numProcesses);
            
       } else {
            worker(MPI.COMM_WORLD);
       }   
       MPI.Finalize();
    }

    private static int[] genTasks(int numTasks) {
        int[] tasks = new int[numTasks];
        Random r = new Random(1000); // use the same seed
        for(int i = 0; i < numTasks; i++) {
            tasks[i] = r.nextInt(8) + 1;
        }
        return tasks;
    }

    private static void worker(Comm comm) throws MPIException {
        // keep receiving messages and do work, unless tagged to 'die'
        IntBuffer buf = MPI.newIntBuffer(1);
        while(true) {
            Status stat = comm.recv(buf, 1, MPI.INT, 0, MPI.ANY_TAG);
            int waitTime = buf.get(0);
            System.out.println("worker "+comm.getRank()+" got "+ waitTime);
            if (stat.getTag() == DIETAG) {
                System.out.println("worker "+comm.getRank()+" dying");
                return;
            }
            // simulate work by sleeping
            try {
                Thread.sleep(1000*waitTime); // sleep for waitTime seconds
            } catch (InterruptedException e) {
                e.printStackTrace();
            } 

            // indicate done with work by sending to Master
            //System.out.println("worker "+comm.getRank()+" completed work!");
            buf.put(0, waitTime);
            comm.send(buf, 1, MPI.INT, 0, WORKTAG);
        }
    }

    private static void handOutWork(Comm comm, int[] workTimes, int numProcesses) throws MPIException {
        int totalWork = workTimes.length;
        int workCount = 0, recvCount = 0;
        System.out.println("master sending first tasks");
        IntBuffer sendBuf = MPI.newIntBuffer(1);
         
        for(int id = 1; id < numProcesses; id++) {
            int work = workTimes[workCount++];
            sendBuf.put(0, work);
            comm.send(sendBuf, 1, MPI.INT, id, WORKTAG);
            System.out.println("master sent "+ work +" to "+id);
        }

        // while there is still work,
        // receive result from a worker, which also
        // signals they would like some new work
        IntBuffer recvBuf = MPI.newIntBuffer(1);
        while (workCount < totalWork) {
            // System.out.println("Master workcount " + workCount + ", total "+ totalWork);
            // receive next finished result
            Status stat = comm.recv(recvBuf, 1, MPI.INT, MPI.ANY_SOURCE, WORKTAG);
            recvCount++;
            int workerId = stat.getSource();
            int completedWorkTime = recvBuf.get(0);
            System.out.println("master received "+completedWorkTime+" from "+ workerId);
            // send next work
            int newWorkTime = workTimes[workCount++];
            sendBuf.put(0, newWorkTime);
            comm.send(sendBuf, 1, MPI.INT, workerId, WORKTAG);
            System.out.println("master sent "+newWorkTime+" to "+ workerId);
        }
        // Receive results for outstanding work requests.
        while (recvCount < totalWork) {
            Status stat = comm.recv(recvBuf, 1, MPI.INT, MPI.ANY_SOURCE, WORKTAG);
            recvCount++;
            int workerId = stat.getSource();
            int completedWorkTime = recvBuf.get(0);
            System.out.println("end: master received "+completedWorkTime+" from "+ workerId);
        }

        // Tell all workers to stop
        sendBuf.put(0, -1);
        for(int id =1; id < numProcesses; id++) {
            comm.send(sendBuf, 1, MPI.INT, id, DIETAG);
        }
    }       
}


In [None]:
!javac -cp ./mpi.jar DynamicLoadBalance.java 
!mpirun --allow-run-as-root -np 4 java -cp ./mpi.jar DynamicLoadBalance

## Exercises
- Run, using N = 4 processes
- Study the execution carefully. Note that with 4 processes, 3 are workers. The total number of tasks is 3*4, or 12. Which process does the most work? You can count by looking for the lines that end with "... from X", where X is a worker process id.
- Try with N = 8 (7 workers).