# Lecture 25 : MPI Collective Communication

# Part 1 : Standard Deviation with Collective Communications

## Here is the standard deviation code using parallel message passing.

In [None]:
%%writefile mpi_stdev_v1.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <math.h>
#include <mpi.h>

int main (int argc, char** argv) {

    MPI_Init (&argc, &argv);

    // MPI_COMM_WORLD is the default communicator that contains all ranks
    int rank, size;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    // get N from the command line
    if (argc < 2) {
        printf ("Command usage : %s %s\n",argv[0],"N");
        return 1;
    }
    long long N = atoll(argv[1]);

    // start the timer
    double start_time, end_time;
    start_time = MPI_Wtime();

    // each rank computes a partial sum
    long long sum = 0;
    for (long long i=1+rank;i<=N;i+=size) {
        sum += i;
    }

    // use parallel message passing to reduce the partial sums with result on rank 0
    // we assume that size = 2^k for some integer k >= 0
    int alive = size;
    while (alive > 1) {
        if (rank < alive/2) {
            // rank is a receiver
            long long rank_sum;
            MPI_Status status;
            int src = rank + alive/2;
            MPI_Recv (&rank_sum, 1, MPI_LONG_LONG, src, 0, MPI_COMM_WORLD, &status);
            sum += rank_sum;
        } else if (rank < alive) {
            // rank is a sender */
            int dest = rank - alive/2;
            MPI_Send (&sum, 1, MPI_LONG_LONG, dest, 0, MPI_COMM_WORLD);
        }
        alive = alive/2;
    }

    // use parallel message passing to broadcast the sum on rank 0 to all other ranks
    // we assume that size = 2^k for some integer k >= 0
    alive = 1;
    while (alive < size) {
        alive = alive*2;
        if (rank < alive/2) {
            // rank is a sender
            int dest = rank + alive/2;
            MPI_Send (&sum, 1, MPI_LONG_LONG, dest, 0, MPI_COMM_WORLD);
        } else if (rank < alive) {
            // rank is a receiver */
            MPI_Status status;
            int src = rank - alive/2;
            MPI_Recv (&sum, 1, MPI_LONG_LONG, src, 0, MPI_COMM_WORLD, &status);
        }
    }

    // every rank has the correct sum and can now compute the mean
    double mean = 1.0*sum/N;

    // each rank computes a partial sum of difference squares
    double sum_diff_sq = 0;
    for (long long i=1+rank;i<=N;i+=size) {
        sum_diff_sq += (i-mean)*(i-mean);
    }

    // use parallel message passing to reduce the partial sums of difference squares
    // with result on rank 0 (we assume that size = 2^k for some integer k >= 0)
    alive = size;
    while (alive > 1) {
        if (rank < alive/2) {
            // rank is a receiver
            double rank_sum_diff_sq;
            MPI_Status status;
            int src = rank + alive/2;
            MPI_Recv (&rank_sum_diff_sq, 1, MPI_DOUBLE, src, 0, MPI_COMM_WORLD, &status);
            sum_diff_sq += rank_sum_diff_sq;
        } else if (rank < alive) {
            // rank is a sender */
            int dest = rank - alive/2;
            MPI_Send (&sum_diff_sq, 1, MPI_DOUBLE, dest, 0, MPI_COMM_WORLD);
        }
        alive = alive/2;
    }

    // only rank 0 has the correct sum of diff sqs
    // and can now compute the correct variance
    // (other ranks will not compute the correct variance)
    double variance = sum_diff_sq/N;

    // calculate the standard deviation
    double stdev = sqrt(variance);

    // stop the timer
    end_time = MPI_Wtime();

    // only rank 0 has the correct standard deviation
    if (rank == 0) {
        printf ("num ranks = %d, elapsed time = %g\n",size,end_time-start_time);
        printf ("standard deviation is %.3lf, sqrt((N^2-1)/12) is %.3lf\n",
                stdev,sqrt((N*N-1)/12.0));
    }

    MPI_Finalize();
}


## For version 2, we use MPI_Reduce and MPI_Bcast.

In [1]:
%%writefile mpi_stdev_v2.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <math.h>
#include <mpi.h>

int main (int argc, char** argv) {

    MPI_Init (&argc, &argv);

    // MPI_COMM_WORLD is the default communicator that contains all ranks
    int rank, size;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    // get N from the command line
    if (argc < 2) {
        printf ("Command usage : %s %s\n",argv[0],"N");
        return 1;
    }
    long long N = atoll(argv[1]);

    // start the timer
    double start_time, end_time;
    start_time = MPI_Wtime();

    // each rank computes a partial sum
    long long sum = 0;
    for (long long i=1+rank;i<=N;i+=size) {
        sum += i;
    }

    // use collective communication to reduce the partial sums with result on rank 0
    long long rank_sum = sum;
    MPI_Reduce(&rank_sum,&sum,1,MPI_LONG_LONG,MPI_SUM,0,MPI_COMM_WORLD);

    // use collective communication to broadcast sum on rank 0 to all ranks
    MPI_Bcast(&sum,1,MPI_LONG_LONG,0,MPI_COMM_WORLD);

    // every rank has the correct sum and can now compute the mean
    double mean = 1.0*sum/N;

    // each rank computes a partial sum of difference squares
    double sum_diff_sq = 0;
    for (long long i=1+rank;i<=N;i+=size) {
        sum_diff_sq += (i-mean)*(i-mean);
    }

    // use collective communication to reduce the partial sum_diff_sqs with result on rank 0
    double rank_sum_diff_sq = sum_diff_sq;
    MPI_Reduce(&rank_sum_diff_sq,&sum_diff_sq,1,MPI_DOUBLE,MPI_SUM,0,MPI_COMM_WORLD);

    // only rank 0 has the correct sum of diff sqs
    // and can now compute the correct variance
    // (other ranks will not compute the correct variance)
    double variance = sum_diff_sq/N;

    // calculate the standard deviation
    double stdev = sqrt(variance);

    // stop the timer
    end_time = MPI_Wtime();

    // only rank 0 has the correct standard deviation
    if (rank == 0) {
        printf ("num ranks = %d, elapsed time = %g\n",size,end_time-start_time);
        printf ("standard deviation is %.3lf, sqrt((N^2-1)/12) is %.3lf\n",
                stdev,sqrt((N*N-1)/12.0));
    }

    MPI_Finalize();
}

Writing mpi_stdev_v2.c


## In the third version we use MPI_Allreduce to combine the MPI_Reduce and the MPI_Bcast

In [2]:
%%writefile mpi_stdev_v3.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <math.h>
#include <mpi.h>

int main (int argc, char** argv) {

    MPI_Init (&argc, &argv);

    // MPI_COMM_WORLD is the default communicator that contains all ranks
    int rank, size;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    // get N from the command line
    if (argc < 2) {
        printf ("Command usage : %s %s\n",argv[0],"N");
        return 1;
    }
    long long N = atoll(argv[1]);

    // start the timer
    double start_time, end_time;
    start_time = MPI_Wtime();

    // each rank computes a partial sum
    long long sum = 0;
    for (long long i=1+rank;i<=N;i+=size) {
        sum += i;
    }

    // use collective communication to reduce the partial sums with result on all ranks
    long long rank_sum = sum;
    MPI_Allreduce(&rank_sum,&sum,1,MPI_LONG_LONG,MPI_SUM,MPI_COMM_WORLD);

    // every rank has the correct sum and can now compute the mean
    double mean = 1.0*sum/N;

    // each rank computes a partial sum of difference squares
    double sum_diff_sq = 0;
    for (long long i=1+rank;i<=N;i+=size) {
        sum_diff_sq += (i-mean)*(i-mean);
    }

    // use collective communication to reduce the partial sum_diff_sqs with result on rank 0
    double rank_sum_diff_sq = sum_diff_sq;
    MPI_Reduce(&rank_sum_diff_sq,&sum_diff_sq,1,MPI_DOUBLE,MPI_SUM,0,MPI_COMM_WORLD);

    // only rank 0 has the correct sum of diff sqs
    // and can now compute the correct variance
    // (other ranks will not compute the correct variance)
    double variance = sum_diff_sq/N;

    // calculate the standard deviation
    double stdev = sqrt(variance);

    // stop the timer
    end_time = MPI_Wtime();

    // only rank 0 has the correct standard deviation
    if (rank == 0) {
        printf ("num ranks = %d, elapsed time = %g\n",size,end_time-start_time);
        printf ("standard deviation is %.3lf, sqrt((N^2-1)/12) is %.3lf\n",
                stdev,sqrt((N*N-1)/12.0));
    }

    MPI_Finalize();
}

Writing mpi_stdev_v3.c


## In our final version 4, we demonstrate the MPI_IN_PLACE option for MPI_Allreduce.

## When using this option we do not need a separate send and receive buffer.

## Note that **the MPI_IN_PLACE option is available for MPI_Allreduce but not MPI_Reduce!**

In [3]:
%%writefile mpi_stdev_v4.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <math.h>
#include <mpi.h>

int main (int argc, char** argv) {

    MPI_Init (&argc, &argv);

    // MPI_COMM_WORLD is the default communicator that contains all ranks
    int rank, size;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    // get N from the command line
    if (argc < 2) {
        printf ("Command usage : %s %s\n",argv[0],"N");
        return 1;
    }
    long long N = atoll(argv[1]);

    // start the timer
    double start_time, end_time;
    start_time = MPI_Wtime();

    // each rank computes a partial sum
    long long sum = 0;
    for (long long i=1+rank;i<=N;i+=size) {
        sum += i;
    }

    // use collective communication to reduce the partial sums with result on all ranks
    MPI_Allreduce(MPI_IN_PLACE,&sum,1,MPI_LONG_LONG,MPI_SUM,MPI_COMM_WORLD);

    // every rank has the correct sum and can now compute the mean
    double mean = 1.0*sum/N;

    // each rank computes a partial sum of difference squares
    double sum_diff_sq = 0;
    for (long long i=1+rank;i<=N;i+=size) {
        sum_diff_sq += (i-mean)*(i-mean);
    }

    // use collective communication to reduce the partial sum_diff_sqs with result on rank 0
    double rank_sum_diff_sq = sum_diff_sq;
    MPI_Reduce(&rank_sum_diff_sq,&sum_diff_sq,1,MPI_DOUBLE,MPI_SUM,0,MPI_COMM_WORLD);

    // only rank 0 has the correct sum of diff sqs
    // and can now compute the correct variance
    // (other ranks will not compute the correct variance)
    double variance = sum_diff_sq/N;

    // calculate the standard deviation
    double stdev = sqrt(variance);

    // stop the timer
    end_time = MPI_Wtime();

    // only rank 0 has the correct standard deviation
    if (rank == 0) {
        printf ("num ranks = %d, elapsed time = %g\n",size,end_time-start_time);
        printf ("standard deviation is %.3lf, sqrt((N^2-1)/12) is %.3lf\n",
                stdev,sqrt((N*N-1)/12.0));
    }

    MPI_Finalize();
}

Writing mpi_stdev_v4.c


# Part 2 : MPI Extreme with Collective Communications

## We start with version 1 where each rank checks a subset of the pairs and outputs what it thinks the extreme pair might be.

In [4]:
%%writefile mpi_extreme_v1.c
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <mpi.h>
#include "vec.h"

int main (int argc, char** argv) {

    MPI_Init (&argc, &argv);

    // MPI_COMM_WORLD is the default communicator that contains all ranks
    int rank, size;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    // read the filename from the command line
    if (argc < 2) {
        printf ("command usage: %s %s\n",argv[0],"filename");
        return 1;
    }
    char* filename = argv[1];

    // open the text file for reading
    FILE* fptr;
    fptr = fopen(filename,"r");

    // need to check for null
    if (fptr == 0) {
        printf ("Error opening data file %s.\n",filename);
        exit(1);
    }

    // read the number of points and the dimension of each point
    int len, dim;
    if (fscanf(fptr,"%d %d",&len, &dim) != 2) {
        printf ("error reading the number of points and the dimension\n");
        return 1;
    }

    // allocate the (len x dim) data matrix on the heap using malloc
    double* data = (double*)malloc(len*dim*sizeof(double));
    if (data == NULL) {
        printf ("malloc failed to allocate data matrix\n");
        return 1;
    }
    vec_read_dataset_file (fptr,data,len,dim);

    // close the data file
    fclose(fptr);

    // start the timer
    double start_time, end_time;
    start_time = MPI_Wtime();

    // find the extreme pair
    double max_dist_sq = 0;
    int extreme[2];
    for (int i=0+rank;i<len-1;i+=size) {
        for (int j=i+1;j<len;j++) {
            double dist_sq = vec_dist_sq(data+i*dim,data+j*dim,dim);
            if (dist_sq > max_dist_sq) {
                max_dist_sq = dist_sq;
                extreme[0] = i;
                extreme[1] = j;
            }
        }
    }

    // stop the timer
    end_time = MPI_Wtime();

    // output the results
    printf ("rank %d: time = %.4f seconds, ",rank,end_time-start_time);
    printf ("Extreme Distance = %.2f, ",sqrt(max_dist_sq));
    printf ("Extreme Pair = %d %d\n",extreme[0],extreme[1]);

    // free memory allocated for dataset
    free(data);

    MPI_Finalize();
}

Writing mpi_extreme_v1.c


## In version 2 we do a max reduction on max_dist_sq with result on rank 0.

In [6]:
%%writefile mpi_extreme_v2.c
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <mpi.h>
#include "vec.h"

int main (int argc, char** argv) {

    MPI_Init (&argc, &argv);

    // MPI_COMM_WORLD is the default communicator that contains all ranks
    int rank, size;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    // read the filename from the command line
    if (argc < 2) {
        printf ("command usage: %s %s\n",argv[0],"filename");
        return 1;
    }
    char* filename = argv[1];

    // open the text file for reading
    FILE* fptr;
    fptr = fopen(filename,"r");

    // need to check for null
    if (fptr == 0) {
        printf ("Error opening data file %s.\n",filename);
        exit(1);
    }

    // read the number of points and the dimension of each point
    int len, dim;
    if (fscanf(fptr,"%d %d",&len, &dim) != 2) {
        printf ("error reading the number of points and the dimension\n");
        return 1;
    }

    // allocate the (len x dim) data matrix on the heap using malloc
    double* data = (double*)malloc(len*dim*sizeof(double));
    if (data == NULL) {
        printf ("malloc failed to allocate data matrix\n");
        return 1;
    }
    vec_read_dataset_file (fptr,data,len,dim);

    // close the data file
    fclose(fptr);

    // start the timer
    double start_time, end_time;
    start_time = MPI_Wtime();

    // find the extreme pair
    double max_dist_sq = 0;
    int extreme[2];
    for (int i=0+rank;i<len-1;i+=size) {
        for (int j=i+1;j<len;j++) {
            double dist_sq = vec_dist_sq(data+i*dim,data+j*dim,dim);
            if (dist_sq > max_dist_sq) {
                max_dist_sq = dist_sq;
                extreme[0] = i;
                extreme[1] = j;
            }
        }
    }

    // stop the timer
    end_time = MPI_Wtime();

    // output the results
    printf ("rank %d: time = %.4f seconds, ",rank,end_time-start_time);
    printf ("Extreme Distance = %.2f, ",sqrt(max_dist_sq));
    printf ("Extreme Pair = %d %d\n",extreme[0],extreme[1]);

    // reduce max_dist_sq with result on rank 0
    double rank_max_dist_sq = max_dist_sq;
    MPI_Reduce(&rank_max_dist_sq,&max_dist_sq,1,MPI_DOUBLE,MPI_MAX,0,MPI_COMM_WORLD);

    // only rank 0 has the correct value of max_dist_sq
    if (rank == 0) {
	    printf ("Overall Extreme Distance = %.2f\n",sqrt(max_dist_sq));
    }

    // free memory allocated for dataset
    free(data);

    MPI_Finalize();
}

Overwriting mpi_extreme_v2.c


## For version 3, we do a MAX_LOC reduction so that rank 0 learns not only the max_dist_sq but also which rank found it.

In [7]:
%%writefile mpi_extreme_v3.c
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <mpi.h>
#include "vec.h"

int main (int argc, char** argv) {

    MPI_Init (&argc, &argv);

    // MPI_COMM_WORLD is the default communicator that contains all ranks
    int rank, size;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    // read the filename from the command line
    if (argc < 2) {
        printf ("command usage: %s %s\n",argv[0],"filename");
        return 1;
    }
    char* filename = argv[1];

    // open the text file for reading
    FILE* fptr;
    fptr = fopen(filename,"r");

    // need to check for null
    if (fptr == 0) {
        printf ("Error opening data file %s.\n",filename);
        exit(1);
    }

    // read the number of points and the dimension of each point
    int len, dim;
    if (fscanf(fptr,"%d %d",&len, &dim) != 2) {
        printf ("error reading the number of points and the dimension\n");
        return 1;
    }

    // allocate the (len x dim) data matrix on the heap using malloc
    double* data = (double*)malloc(len*dim*sizeof(double));
    if (data == NULL) {
        printf ("malloc failed to allocate data matrix\n");
        return 1;
    }
    vec_read_dataset_file (fptr,data,len,dim);

    // close the data file
    fclose(fptr);

    // start the timer
    double start_time, end_time;
    start_time = MPI_Wtime();

    // find the extreme pair
    double max_dist_sq = 0;
    int extreme[2];
    for (int i=0+rank;i<len-1;i+=size) {
        for (int j=i+1;j<len;j++) {
            double dist_sq = vec_dist_sq(data+i*dim,data+j*dim,dim);
            if (dist_sq > max_dist_sq) {
                max_dist_sq = dist_sq;
                extreme[0] = i;
                extreme[1] = j;
            }
        }
    }

    // stop the timer
    end_time = MPI_Wtime();

    // output the results
    printf ("rank %d: time = %.4f seconds, ",rank,end_time-start_time);
    printf ("Extreme Distance = %.2f, ",sqrt(max_dist_sq));
    printf ("Extreme Pair = %d %d\n",extreme[0],extreme[1]);

    // reduce max_dist_sq using MAX_LOC with result on rank 0
    struct { double dist_sq; int rank; } rank_max_loc = { max_dist_sq, rank };
    struct { double dist_sq; int rank; } final_max_loc;
    MPI_Reduce(&rank_max_loc,&final_max_loc,1,MPI_DOUBLE_INT,MPI_MAXLOC,0,MPI_COMM_WORLD);

    // only rank 0 has the correct value of max_dist_sq and the rank that found it
    if (rank == 0) {
	    printf ("Overall Extreme Distance = %.2f was found by rank %d\n",
		    sqrt(final_max_loc.dist_sq),final_max_loc.rank);
    }

    // free memory allocated for dataset
    free(data);

    MPI_Finalize();
}

Writing mpi_extreme_v3.c


## In version 4, we use MPI_Gather to gather up the rank extreme pairs onto rank 0.

In [8]:
%%writefile mpi_extreme_v4.c
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <mpi.h>
#include "vec.h"

int main (int argc, char** argv) {

    MPI_Init (&argc, &argv);

    // MPI_COMM_WORLD is the default communicator that contains all ranks
    int rank, size;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    // read the filename from the command line
    if (argc < 2) {
        printf ("command usage: %s %s\n",argv[0],"filename");
        return 1;
    }
    char* filename = argv[1];

    // open the text file for reading
    FILE* fptr;
    fptr = fopen(filename,"r");

    // need to check for null
    if (fptr == 0) {
        printf ("Error opening data file %s.\n",filename);
        exit(1);
    }

    // read the number of points and the dimension of each point
    int len, dim;
    if (fscanf(fptr,"%d %d",&len, &dim) != 2) {
        printf ("error reading the number of points and the dimension\n");
        return 1;
    }

    // allocate the (len x dim) data matrix on the heap using malloc
    double* data = (double*)malloc(len*dim*sizeof(double));
    if (data == NULL) {
        printf ("malloc failed to allocate data matrix\n");
        return 1;
    }
    vec_read_dataset_file (fptr,data,len,dim);

    // close the data file
    fclose(fptr);

    // start the timer
    double start_time, end_time;
    start_time = MPI_Wtime();

    // find the extreme pair
    double max_dist_sq = 0;
    int extreme[2];
    for (int i=0+rank;i<len-1;i+=size) {
        for (int j=i+1;j<len;j++) {
            double dist_sq = vec_dist_sq(data+i*dim,data+j*dim,dim);
            if (dist_sq > max_dist_sq) {
                max_dist_sq = dist_sq;
                extreme[0] = i;
                extreme[1] = j;
            }
        }
    }

    // stop the timer
    end_time = MPI_Wtime();

    // output the results
    printf ("rank %d: time = %.4f seconds, ",rank,end_time-start_time);
    printf ("Extreme Distance = %.2f, ",sqrt(max_dist_sq));
    printf ("Extreme Pair = %d %d\n",extreme[0],extreme[1]);

    // reduce max_dist_sq using MAX_LOC with result on rank 0
    struct { double dist_sq; int rank; } rank_max_loc = { max_dist_sq, rank };
    struct { double dist_sq; int rank; } final_max_loc;
    MPI_Reduce(&rank_max_loc,&final_max_loc,1,MPI_DOUBLE_INT,MPI_MAXLOC,0,MPI_COMM_WORLD);

    // gather up extreme pairs from each rank and give to rank 0
    int extreme_pairs[2*size];
    MPI_Gather(extreme,2,MPI_INT,extreme_pairs,2,MPI_INT,0,MPI_COMM_WORLD);

    // only rank 0 has the correct value of max_dist_sq and the rank that found it
    if (rank == 0) {
	    printf ("Overall Extreme Distance = %.2f was found by rank %d\n",
		    sqrt(final_max_loc.dist_sq),final_max_loc.rank);
	    extreme[0] = extreme_pairs[2*final_max_loc.rank];
	    extreme[1] = extreme_pairs[2*final_max_loc.rank+1];
	    printf ("Overall Extreme Pair = %d %d\n",extreme[0],extreme[1]);
    }

    // free memory allocated for dataset
    free(data);

    MPI_Finalize();
}

Writing mpi_extreme_v4.c
