# Collective Communication Operations with NCCL on Multi-GPU Systems - Multiple GPUs with Peer-to-Peer Comunications

In this notebook we will introduce direct peer-to-peer memory access across GPUs, and refactor the multi-GPU code from the previous notebook to use it.

## Objectives

By the time you complete this notebook you will:

- Understand how to check for and enable direct peer-to-peer memory for applications running on multiple GPUs.
- Understant the API NCCL.

## Collective Communication Operations

It is necessary to carry out communication operations involving multiple computational resources in most parallel applications. These communication operations can be implemented through point-to-point operations. However, this approach could be more efficient for the programmer. Parallel and distributed solutions based on collective operations have long been chosen for these applications. The MPI pattern has very efficient routines that perform collective operations, taking better advantage of the computing power of the available computational resources. Likewise, with the advent of new computational resources, similar routines appear for multi-GPU systems, for example,  [NCCL (NVIDIA Collective Communications Library)](https://docs.nvidia.com/deeplearning/sdk/nccl-developer-guide/docs/index.html). This notebook will cover handling these routines for multi-GPU environments, constantly comparing them with the MPI standard and showing the differences and similarities between the two computational execution environments.

One of the main characteristics of using these types of operations is that communications can have different symmetries and asynchronous, considering factors such as emission and reception. For collective operations, symmetry is defined as the characteristic that all involved resources have to perform the same functions with similar parameters. Asynchronous is defined as the inherent characteristic that all the detailed resources have of not waiting for the others to finish to continue the execution. Considering that collective operations are communication patterns that affect all computational resources of an execution group, it was possible to compare aspects of collective operations using multiprocessor and multi-GPU systems, described below.

### Broadcast

`Broadcast` is a collective operation where a computational resource sends the same information to all other elements of an execution group. In NCCL the responsible function that performs this operation is __ncclBcast__, being practically equivalent to the __MPI_Bcast__ function of the MPI.

```cpp
    ncclBcast(void* buff,                        
              size_t count,                      
              ncclDataType_t datatype,                     
              int root,                                    
              ncclComm_t comm,                             
              cudaStream_t stream                          
              );
```

The structure above shows the comparative structure scheme of the functions __ncclBcast__. The function is practically identical to the __MPI_Bcast__. All resources must invoke both in the __comm__ communicator group. Parts send the information stored in __buff__ of the __root__ resource to everyone else belonging to the execution group. The parameters __count__ and __datatype__ have the functions of specifying the amount of memory that the resource __root__ should send to others and the space they should reserve to store the received message. The only difference between the two approaches is found in the last parameter called __stream__, which represents the sending format between the GPUs, so the operator structures are maintained through variable-sized memory spaces in buffer shipping.

The collective operation of `Broadcast` using NCCL with the __ncclBcast__ function allows disseminating information to all communicators of a CUDA application, starting from the identifier GPU 0. The function has the same parameters (__count__, __datatype__, __root__ and __comm__) as __MPI_Bcast__ add the parameter __stream__. 

In [None]:
%%writefile ncclBcast.cu
#include <stdio.h>
#include <stdlib.h>
#include <cuda_runtime.h>
#include <nccl.h>
 
__global__ void kernel(int *a) 
{
  int index = threadIdx.x;
  a[index] *= 2;
  printf("%d\t", a[index]);
}
 
void printVector(int *in, int n)
{
 printf("\nThis is the host\n");
 for(int i = 0; i < n; i++)
  printf("%d\t", in[i]);
 printf("\n");
}

int main(int argc, char* argv[]) 
{
  /*Variables*/
  int dataSize = 8;
  int nGPUs = 0;
  cudaGetDeviceCount(&nGPUs); 
  printf("nGPUs = %d\n",nGPUs);
    
  int *h_data       = (int*)  malloc (dataSize  * sizeof(int));
  int **d_data      = (int**) malloc (nGPUs     * sizeof(int*));
  
  int *DeviceList   = (int *) malloc (nGPUs     * sizeof(int));
  for(int i = 0; i < nGPUs; i++)
      DeviceList[i] = i;
  
  /*Initializing NCCL with Multiples Devices per Thread*/
  ncclComm_t* comms      = (ncclComm_t*)  malloc(sizeof(ncclComm_t)  * nGPUs);  
  cudaStream_t* stream   = (cudaStream_t*)malloc(sizeof(cudaStream_t)* nGPUs);
  ncclCommInitAll(comms, nGPUs, DeviceList);
  
  /*Population the data vector*/
  for(int i = 0; i < dataSize; i++)
      h_data[i] = rand()%(10-2)*2;
 
  printVector(h_data, dataSize);
      
  for(int g = 0; g < nGPUs; g++) 
  {
      cudaSetDevice(g);
      cudaStreamCreate(&stream[g]);
      cudaMalloc(&d_data[g], dataSize * sizeof(int));
     
      if(g == 0)  /*Copy from Host to Device*/
         cudaMemcpy(d_data[g], h_data, dataSize * sizeof(int), cudaMemcpyHostToDevice);
  }
        
  ncclGroupStart();
  for(int g = 0; g < nGPUs; g++) 
  {
    cudaSetDevice(DeviceList[g]);
    ncclBcast(d_data[g], dataSize, ncclInt, 0, comms[g], stream[g]); /*Broadcasting it to all*/
  }
  ncclGroupEnd();       

  for(int g = 0; g < nGPUs; g++) 
  {
    cudaSetDevice(DeviceList[g]);
    printf("\nThis is the device [%d]\n", g);
    kernel <<< 1 , dataSize >>> (d_data[g]); /*Call the CUDA Kernel*/
    cudaDeviceSynchronize();             
  }

  printf("\n");
    
  for(int g = 0; g < nGPUs; g++)  /*Synchronizing CUDA Streams*/
  {                                
    cudaSetDevice(DeviceList[g]);
    cudaStreamSynchronize(stream[g]);
  }
 
  for(int g = 0; g < nGPUs; g++)   /*Destroy CUDA Streams*/
  {                                
    cudaSetDevice(DeviceList[g]);
    cudaStreamDestroy(stream[g]);
  }

  for(int g = 0; g < nGPUs; g++)   /*Finalizing NCCL*/
     ncclCommDestroy(comms[g]);
  
  /*Freeing memory*/
  free(h_data); 
  free(DeviceList);
  cudaFree(stream);
  cudaFree(d_data);

  return 0;
}

This function allows you to `Broadcast` information to multiple GPUs that are on the same execution group, which follows the scheme in the follow figure: 

<center><img src="images/bcast.png" width="1000"></center>

#### Run the Code

##### Compile with Shell Script

In [None]:
%%writefile howtocompile.sh
#!/bin/bash

usage()
{
 echo "howtocompile.sh: wrong number of input parameters. Exiting."
 echo -e "Usage: bash howtocompile.sh <supercomputer>"
 echo -e "  g.e: bash howtocompile.sh sdumont"
}

sdumont()
{
 module load nccl/2.13_cuda-11.2
 nvcc ncclBcast.cu -o ncclBcast -lnccl $CPPFLAGS $LDFLAGS
}

#args in comand line
if [ "$#" ==  0 ]; then
 usage
 exit
fi

#sdumont
if [[ $1 == "sdumont" ]];then
 sdumont
fi

In [None]:
!bash howtocompile.sh sdumont

##### Execute with Shell Script

In [None]:
%%writefile v100-ncclBcast.sh
#!/bin/bash

#SBATCH --job-name=ncclBcast                   # Job name
#SBATCH --nodes=1                              # Run on 1 node  
#SBATCH --partition=sequana_gpu_dev            # Partition SDUMONT
#SBATCH --output=out_v100_%j-ncclBcast.log     # Standard output and error log
#SBATCH --ntasks-per-node=1                    # 1 job per node

module load nccl/2.13_cuda-11.2
./ncclBcast

In [None]:
%%writefile howtoexecute.sh
#!/bin/bash

usage()
{
 echo "howtoexecute.sh: wrong number of input parameters. Exiting."
 echo -e "Usage: bash howtoexecute.sh <supercomputer>"
 echo -e "  g.e: bash howtoexecute.sh sdumont"
}

sdumont()
{
 sbatch v100-ncclBcast.sh
}

#args in comand line
if [ "$#" ==  0 ]; then
 usage
 exit
fi

#sdumont
if [[ $1 == "sdumont" ]];then
 sdumont
fi

In [None]:
!bash howtoexecute.sh sdumont

#### Print output in log file

In [None]:
!cat *-ncclBcast.log

### Reduce

`Reduce` is a collective operation where each computational resource involved contributes an operand to perform the global calculation of an associative or commutative operation (i.e., maximum, minimum, sum, product, etc). In NCCL, the responsible function that performs this operation is __ncclReduce__, being equivalent to the __MPI\_Reduce__ function of the MPI.

```cpp
     ncclReduce(const void* sendbuff,           
                void* recvbuff,                            
                size_t count,                              
                ncclDataType_t datatype,                   
                ncclRedOp_t op,                            
                int root,                                  
                ncclComm_t comm,                           
                cudaStream_t stream                      
                );
```

The structure above shows the comparative scheme of the functions __ncclReduce__ and your similitude with the command __MPI_Reduce__. As before, the two functions are identical. In the *Reduce* operation, the operator is applied to the data located in each computational resource's send buffer (__sendbuff__). The result of this function is passed back to all resources in the receive buffer (__recbuff__). The __count__ and __datatype__ parameters again specify how much memory the __root__ resource should send to others and how much space they should reserve to store the incoming message. And again, the difference is in the last parameter called __stream__, which represents the format of the sending buffer between the GPUs.

To show an example of the `Reduce` function, we will use the example of the dot product [dot product](https://en.wikipedia.org/wiki/Dot_product) of vectors. The code will distribute the scalar product of two vectors *x* and *y*, as shown below. First, the partial result $(x * y)$ is calculated, and then the `Reduce` operation is performed. After the operation, the final result will be stored in the source resource, called resource 0. This reduce operation is performed in NCCL by the __ncclReduce__ function on multi-GPU systems.

In [None]:
%%writefile ncclReduce.cu
#include <stdio.h>
#include <stdlib.h>
#include <cuda_runtime.h>
#include <nccl.h>

__global__ void Dev_dot(double *x, double *y, int n) 
{   
   __shared__ double tmp[512];

   int i = threadIdx.x;
   int t = blockDim.x * blockIdx.x + threadIdx.x;
   
   if (t < n) 
    tmp[i] = x[t];
   
   __syncthreads();

   for (int stride = blockDim.x / 2; stride >  0; stride /= 2) 
   {

      if (i < stride)
         tmp[i] += tmp[i + stride];

      __syncthreads();

   }

   if (threadIdx.x == 0) 
   {
      y[blockIdx.x] = tmp[0];
      printf("dot(x,y) = %1.2f\n", y[blockIdx.x]); 
   }
}    

__global__ void Dev_print(double *x) 
{   
  int i = threadIdx.x;
  printf("%1.2f\t", x[i]);
}

void printVector(double *in, int n)
{
  for(int i=0; i < n; i++)
    printf("%1.2f\t", in[i]);
  printf("\n");
}


int main(int argc, char* argv[]) 
{
  /*Variables*/
  int nGPUs = 0;
  cudaGetDeviceCount(&nGPUs);
  printf("nGPUs = %d\n",nGPUs);
    
  int dataSize = 8;
  double *x          = (double*)    malloc(dataSize  * sizeof(double));
  double *y          = (double*)    malloc(dataSize  * sizeof(double)); 
  double **x_d_data  = (double**)   malloc(nGPUs     * sizeof(double*));
  double **y_d_data  = (double**)   malloc(nGPUs     * sizeof(double*));
  double **Sx_d_data = (double**)   malloc(nGPUs     * sizeof(double*));
  double **Sy_d_data = (double**)   malloc(nGPUs     * sizeof(double*));
 
  int *DeviceList = (int *) malloc (nGPUs * sizeof(int));  
  for(int i = 0; i < nGPUs; ++i)
      DeviceList[i] = i;
  
  /*Initializing NCCL with Multiples Devices per Thread*/
  ncclComm_t* comms      = (ncclComm_t*)  malloc(sizeof(ncclComm_t)  * nGPUs);  
  cudaStream_t* stream   = (cudaStream_t*)malloc(sizeof(cudaStream_t)* nGPUs);
  ncclCommInitAll(comms, nGPUs, DeviceList);
      
  /*Population vectors*/
  for(int i = 0; i < dataSize; i++)
  { 
    x[i] = 1;                
    y[i] = 2;
  }                
  
  printf("\nThis is the host\n");      
  printVector(x, dataSize); 
  printVector(y, dataSize);
    
  for(int g = 0; g < nGPUs; g++) 
  {
    cudaSetDevice(DeviceList[g]);
    cudaStreamCreate(&stream[g]);

    cudaMalloc(&x_d_data[g],    dataSize * sizeof(double));
    cudaMalloc(&y_d_data[g],    dataSize * sizeof(double));  
    cudaMalloc(&Sx_d_data[g],   dataSize * sizeof(double));
    cudaMalloc(&Sy_d_data[g],   dataSize * sizeof(double));
     
    cudaMemcpy(x_d_data[g],  x, dataSize * sizeof(double), cudaMemcpyHostToDevice); /*Copy x from Host to Devices*/
    cudaMemcpy(y_d_data[g],  y, dataSize * sizeof(double), cudaMemcpyHostToDevice); /*Copy y from Host to Devices*/      
  }
      
  ncclGroupStart(); 
  for(int g = 0; g < nGPUs; g++) 
  {
    cudaSetDevice(DeviceList[g]);
    ncclReduce(x_d_data[g], Sx_d_data[g], dataSize, ncclDouble, ncclSum, 0, comms[g], stream[g]); /*Reducing x vector*/
    ncclReduce(y_d_data[g], Sy_d_data[g], dataSize, ncclDouble, ncclSum, 0, comms[g], stream[g]); /*Reducing y vector*/
  }
  ncclGroupEnd(); 

  for(int g = 0; g < nGPUs; g++) 
  {
    cudaSetDevice(DeviceList[g]);            
    printf("\nThis is the device [%d]\n", g);
    Dev_dot <<< 1, dataSize >>> (Sy_d_data[g], Sx_d_data[g], dataSize); /*Call the CUDA Kernel*/
    cudaDeviceSynchronize();  
  }
  
  for(int g = 0; g < nGPUs; g++)  /*Synchronizing CUDA Streams*/
  { 
    cudaSetDevice(DeviceList[g]);
    cudaStreamSynchronize(stream[g]);
  }
  
  for(int g = 0; g < nGPUs; g++)  /*Destroy CUDA Streams*/
  {  
    cudaSetDevice(DeviceList[g]);
    cudaStreamDestroy(stream[g]);
  }

  for(int g = 0; g < nGPUs; g++)  /*Finalizing NCCL*/
     ncclCommDestroy(comms[g]);
  
  /*Freeing memory*/
  free(x);
  free(y);
  free(DeviceList);
    
  cudaFree(stream);  
  cudaFree(x_d_data);
  cudaFree(y_d_data);
  cudaFree(Sx_d_data);
  cudaFree(Sy_d_data);

  return 0;
}

This function allows you to `Reduce` information to multiple GPUs that are on the same execution group, which follows the scheme in the follow figure: 

<center><img src="images/reduce.png" width="1000"></center>

#### Run the Code

##### Compile with Shell Script

In [None]:
%%writefile howtocompile.sh
#!/bin/bash

usage()
{
 echo "howtocompile.sh: wrong number of input parameters. Exiting."
 echo -e "Usage: bash howtocompile.sh <supercomputer>"
 echo -e "  g.e: bash howtocompile.sh sdumont"
}

sdumont()
{
 module load nccl/2.13_cuda-11.2
 nvcc ncclReduce.cu -o ncclReduce -lnccl $CPPFLAGS $LDFLAGS
}

#args in comand line
if [ "$#" ==  0 ]; then
 usage
 exit
fi

#sdumont
if [[ $1 == "sdumont" ]];then
 sdumont
fi

In [None]:
!bash howtocompile.sh sdumont

##### Execute with Shell Script

In [None]:
%%writefile v100-ncclReduce.sh
#!/bin/bash

#SBATCH --job-name=ncclReduce                  # Job name
#SBATCH --nodes=1                              # Run on 1 node  
#SBATCH --partition=sequana_gpu_dev            # Partition SDUMONT
#SBATCH --output=out_v100_%j-ncclReduce.log    # Standard output and error log
#SBATCH --ntasks-per-node=1                    # 1 job per node

module load nccl/2.13_cuda-11.2
./ncclReduce

In [None]:
%%writefile howtoexecute.sh
#!/bin/bash

usage()
{
 echo "howtoexecute.sh: wrong number of input parameters. Exiting."
 echo -e "Usage: bash howtoexecute.sh <supercomputer>"
 echo -e "  g.e: bash howtoexecute.sh sdumont"
}

sdumont()
{
 sbatch v100-ncclReduce.sh
}

#args in comand line
if [ "$#" ==  0 ]; then
 usage
 exit
fi

#sdumont
if [[ $1 == "sdumont" ]];then
 sdumont
fi

In [None]:
!bash howtoexecute.sh sdumont

#### Print output in log file

In [None]:
!cat *-ncclReduce.log

### Gather

A `Gather` operation is a collective operation where a computational resource scans information from a set of resources. From one point of view, the `Gather` operation is the inverse of the `Scatter` operation. The difference for this last one resides in a combination of data from a receiving resource, which is solely stored. The syntax of the `Gather` function for GPUs corresponds to the __ncclAllGather__ command, which is related to the concept of `AllGather`, which is a routine invocation equivalent to performing *n* calls to the operation, which each time acts as __root__.

```cpp
      ncclAllGather(const void* sendbuff,           
                    void* recvbuff,                               
                    size_t sendcount,                             
                    ncclDataType_t datatype,                      
                    ncclComm_t comm,                              
                    cudaStream_t stream                           
                    );
```

All communicator __comm__ processes send the content __sendbuff__ to the process with identifier __root__. It concatenates all received data ordered by the sender identifier, starting from the position pointed by __recvbuff__. Process data with identifier 0 are stored before resources 1, and so on. The receive arguments __recvdbuff__, __recvcount__, __recvtype__, only have meaning for the __root__ resource. In NCCL, this information is implicit in the function arguments. Only add __stream__ arguments as before.

In [None]:
%%writefile ncclAllGather.cu
#include <stdio.h>
#include <stdlib.h>
#include <cuda_runtime.h>
#include <nccl.h>

__global__ void Dev_print(float *x) 
{   
   int i = threadIdx.x; 
   printf("%1.2f\t", x[i]); 
}

void printVector(float *in, int n)
{
  for(int i=0; i < n; i++)
    if(in[i])
     printf("%1.2f\t", in[i]);
}

int main(int argc, char* argv[])
{
  /*Variables*/
  int nGPUs = 0;
  cudaGetDeviceCount(&nGPUs);
  printf("nGPUs = %d\n",nGPUs);  
  int sendcount = 1;
  int size      = nGPUs;
      
  int *DeviceList = (int *) malloc (nGPUs * sizeof(int));
  for(int i = 0; i < nGPUs; ++i)
      DeviceList[i] = i;
    
 /*Initializing NCCL with Multiples Devices per Thread*/
  ncclComm_t* comms      = (ncclComm_t*)  malloc(sizeof(ncclComm_t)  * nGPUs);  
  cudaStream_t* stream   = (cudaStream_t*)malloc(sizeof(cudaStream_t)* nGPUs);
  ncclCommInitAll(comms, nGPUs, DeviceList);

  /*Allocating and initializing device buffers*/
  float** sendbuff = (float**) malloc(nGPUs * sizeof(float*));
  float** recvbuff = (float**) malloc(nGPUs * sizeof(float*));

  /*Host vectors*/ 
  float host_x0[4] = { 10,   0,  0,  0};
  float host_x1[4] = {  0,  20,  0,  0};
  float host_x2[4] = {  0,   0, 30,  0};
  float host_x3[4] = {  0,   0,  0,  40};
    
  printf("\nThis is the host\n");      
  printVector(host_x0, size); 
  printVector(host_x1, size);
  printVector(host_x2, size);
  printVector(host_x3, size);  
  printf("\n");  

  for(int i = 0; i < nGPUs; ++i) 
  {
    cudaSetDevice(i);
    cudaMalloc(&sendbuff[i],  size * sizeof(float));
    cudaMalloc(&recvbuff[i],  size * sizeof(float));

    switch(i) /*Copy from host to devices*/
    {
      case 0 : cudaMemcpy(sendbuff[i], host_x0,   size * sizeof(float), cudaMemcpyHostToDevice); break; 
      case 1 : cudaMemcpy(sendbuff[i], host_x1,   size * sizeof(float), cudaMemcpyHostToDevice); break; 
      case 2 : cudaMemcpy(sendbuff[i], host_x2,   size * sizeof(float), cudaMemcpyHostToDevice); break; 
      case 3 : cudaMemcpy(sendbuff[i], host_x3,   size * sizeof(float), cudaMemcpyHostToDevice); break; 
    }

    cudaStreamCreate(stream+i);
  } 

  ncclGroupStart();        
  for(int g = 0; g < nGPUs; g++) 
  {
   	cudaSetDevice(g);
    ncclAllGather(sendbuff[g] + g, recvbuff[g], sendcount, ncclFloat, comms[g], stream[g]); /*All Gathering the data on GPUs*/
  }
  ncclGroupEnd();

  for(int g = 0; g < nGPUs; g++) 
  {
    cudaSetDevice(g); 
    printf("\nThis is the device [%d]\n", g);
    Dev_print <<< 1, size >>> (recvbuff[g]); /*Call the CUDA Kernel*/
    cudaDeviceSynchronize();    
  }
  printf("\n");

  for(int i = 0; i < nGPUs; ++i) /*Synchronizing CUDA Streams*/
  {                                  
   cudaSetDevice(i);
   cudaStreamSynchronize(stream[i]);
  }

  for(int i = 0; i < nGPUs; ++i) /*Destroy CUDA Streams*/ 
  { 
   cudaSetDevice(i);
   cudaFree(sendbuff[i]);
   cudaFree(recvbuff[i]);
  }

  for(int i = 0; i < nGPUs; ++i)   /*Finalizing NCCL*/
    ncclCommDestroy(comms[i]);

 /*Freeing memory*/
  free(DeviceList);  
  cudaFree(stream);  
  cudaFree(sendbuff);
  cudaFree(recvbuff);

  return 0;
}

This function allows you to `Gather` information to multiple GPUs that are on the same execution group, which follows the scheme in the follow figure: 

<center><img src="images/allgather.png" width="1000"></center>

#### Run the Code

##### Compile with Shell Script

In [None]:
%%writefile howtocompile.sh
#!/bin/bash

usage()
{
 echo "howtocompile.sh: wrong number of input parameters. Exiting."
 echo -e "Usage: bash howtocompile.sh <supercomputer>"
 echo -e "  g.e: bash howtocompile.sh sdumont"
}

sdumont()
{
 module load nccl/2.13_cuda-11.2
 nvcc ncclAllGather.cu -o ncclAllGather -lnccl $CPPFLAGS $LDFLAGS
}

#args in comand line
if [ "$#" ==  0 ]; then
 usage
 exit
fi

#sdumont
if [[ $1 == "sdumont" ]];then
 sdumont
fi

In [None]:
!bash howtocompile.sh sdumont

##### Execute with Shell Script

In [None]:
%%writefile v100-ncclAllGather.sh
#!/bin/bash

#SBATCH --job-name=ncclAllGather               # Job name
#SBATCH --nodes=1                              # Run on 1 node  
#SBATCH --partition=sequana_gpu_dev            # Partition SDUMONT
#SBATCH --output=out_v100_%j-ncclAllGather.log # Standard output and error log
#SBATCH --ntasks-per-node=1                    # 1 job per node

module load nccl/2.13_cuda-11.2
./ncclAllGather

In [None]:
%%writefile howtoexecute.sh
#!/bin/bash

usage()
{
 echo "howtoexecute.sh: wrong number of input parameters. Exiting."
 echo -e "Usage: bash howtoexecute.sh <supercomputer>"
 echo -e "  g.e: bash howtoexecute.sh sdumont"
}

sdumont()
{
 sbatch v100-ncclAllGather.sh
}

#args in comand line
if [ "$#" ==  0 ]; then
 usage
 exit
fi

#sdumont
if [[ $1 == "sdumont" ]];then
 sdumont
fi

In [None]:
!bash howtoexecute.sh sdumont

#### Print output in log file

In [None]:
!cat *-ncclAllGather.log

### ReduceScatter

The `ReduceScatter` operation is a collective operation present in NCCL that merges two operations into one. The `Reduce` operation applies to the `Scatter` operation, which involves a reduction operation by distributing operated blocks among computational resources based on their identifying index.


```cpp
      ncclReduceScatter(const void* sendbuff,
                       void* recvbuff, 
                       size_t recvcount, 
                       ncclDataType_t datatype, 
                       ncclRedOp_t op, 
                       ncclComm_t comm, 
                       cudaStream_t stream
                       );
```

The __root__ process concatenates all received data sorted by the sender's range, starting from the position pointed to by __recbuff__. From the position pointed to by __recbbuff__, the __root__ process concatenates all received data ordered by the receiver's interval. That is, the partial data of the lines of all computational resources are stored in a reduced way in the destination resources.

In [None]:
%%writefile ncclReduceScatter.cu
#include <stdio.h>
#include <stdlib.h>
#include <cuda_runtime.h>
#include <nccl.h>

__global__ void Dev_print(float *x) 
{   
   int i = threadIdx.x; 
   printf("%1.2f\t", x[i]); 
}

void printVector(float *in, int n)
{
  for(int i=0; i < n; i++)
   printf("%1.2f\t", in[i]);
  printf("\n");
}

int main(int argc, char* argv[])
{
 /*Variables*/
  int nGPUs = 0;
  cudaGetDeviceCount(&nGPUs);
  printf("nGPUs = %d\n",nGPUs);  
  int recvcount = 1;
  int size      = nGPUs;   
    
  int *DeviceList = (int *) malloc (nGPUs * sizeof(int));
  for(int i = 0; i < nGPUs; ++i)
      DeviceList[i] = i;
    
 /*Initializing NCCL with Multiples Devices per Thread*/
  ncclComm_t* comms      = (ncclComm_t*)  malloc(sizeof(ncclComm_t)  * nGPUs);  
  cudaStream_t* stream   = (cudaStream_t*)malloc(sizeof(cudaStream_t)* nGPUs);
  ncclCommInitAll(comms, nGPUs, DeviceList);

  /*Allocating and initializing device buffers*/
  float** sendbuff = (float**) malloc(nGPUs * sizeof(float*));
  float** recvbuff = (float**) malloc(nGPUs * sizeof(float*));

  /*Host vectors*/ 
  float host_x0[4] = { 10,  50,  90,   130};
  float host_x1[4] = { 20,  60,  100,  140};
  float host_x2[4] = { 30,  70,  110,  150};
  float host_x3[4] = { 40,  80,  120,  160};
    
  printf("\nThis is the host\n");    
 
  if(nGPUs == 4)  
  {
    printVector(host_x0, size); 
    printVector(host_x1, size);
    printVector(host_x2, size);
    printVector(host_x3, size); 
  }else //nGPUs == 3
  {
    printVector(host_x0, size); 
    printVector(host_x1, size);
    printVector(host_x2, size);
  }  
    
  for(int i = 0; i < nGPUs; ++i) 
  {
    cudaSetDevice(i);
    cudaMalloc(&sendbuff[i],  size * sizeof(float));
    cudaMalloc(&recvbuff[i],  size * sizeof(float));

    switch(i)  /*Copy from host to devices*/
    {
      case 0 : cudaMemcpy(sendbuff[i], host_x0,   size * sizeof(float), cudaMemcpyHostToDevice); break; 
      case 1 : cudaMemcpy(sendbuff[i], host_x1,   size * sizeof(float), cudaMemcpyHostToDevice); break; 
      case 2 : cudaMemcpy(sendbuff[i], host_x2,   size * sizeof(float), cudaMemcpyHostToDevice); break; 
      case 3 : cudaMemcpy(sendbuff[i], host_x3,   size * sizeof(float), cudaMemcpyHostToDevice); break; 
    }
      
    cudaStreamCreate(stream+i);
  } 

  ncclGroupStart();      
  for(int g = 0; g < nGPUs; g++) 
  {
   cudaSetDevice(g);
   ncclReduceScatter(sendbuff[g], recvbuff[g], recvcount, ncclFloat, ncclSum, comms[g], stream[g]); /*All Reducing and Scattering the data on GPUs*/   
  }
  ncclGroupEnd();

  for(int g = 0; g < nGPUs; g++) 
  {
    cudaSetDevice(g); 
    printf("\nThis is the device [%d]\n", g);
    Dev_print <<< 1, size >>> (recvbuff[g]); /*Call the CUDA Kernel*/
    cudaDeviceSynchronize();    
  }
  printf("\n");

  for (int i = 0; i < nGPUs; ++i)  /*Synchronizing CUDA Streams*/
  {                                 
   cudaSetDevice(i);
   cudaStreamSynchronize(stream[i]);
  }

  for (int i = 0; i < nGPUs; ++i)  /*Destroy CUDA Streams*/
  { 
   cudaSetDevice(i);
   cudaFree(sendbuff[i]);
   cudaFree(recvbuff[i]);
  }

  for(int i = 0; i < nGPUs; ++i)   /*Finalizing NCCL*/
    ncclCommDestroy(comms[i]);

 /*Freeing memory*/
  free(DeviceList);
    
  cudaFree(stream);  
  cudaFree(sendbuff);
  cudaFree(recvbuff);

  return 0;
}

This function allows you to `ReduceScatter` information to multiple GPUs that are on the same execution group, which follows the scheme in the follow figure: 

<center><img src="images/reducescatter.png" width="1000"></center>

#### Run the Code

##### Compile with Shell Script

In [None]:
%%writefile howtocompile.sh
#!/bin/bash

usage()
{
 echo "howtocompile.sh: wrong number of input parameters. Exiting."
 echo -e "Usage: bash howtocompile.sh <supercomputer>"
 echo -e "  g.e: bash howtocompile.sh sdumont"
}

sdumont()
{
 module load nccl/2.13_cuda-11.2
 nvcc ncclReduceScatter.cu -o ncclReduceScatter -lnccl $CPPFLAGS $LDFLAGS
}

#args in comand line
if [ "$#" ==  0 ]; then
 usage
 exit
fi

#sdumont
if [[ $1 == "sdumont" ]];then
 sdumont
fi

In [None]:
!bash howtocompile.sh sdumont

##### Execute with Shell Script

In [None]:
%%writefile v100-ncclReduceScatter.sh
#!/bin/bash

#SBATCH --job-name=ncclReduceScatter               # Job name
#SBATCH --nodes=1                                  # Run on 1 node  
#SBATCH --partition=sequana_gpu_dev                # Partition SDUMONT
#SBATCH --output=out_v100_%j-ncclReduceScatter.log # Standard output and error log
#SBATCH --ntasks-per-node=1                        # 1 job per node

module load nccl/2.13_cuda-11.2
./ncclReduceScatter

In [None]:
%%writefile howtoexecute.sh
#!/bin/bash

usage()
{
 echo "howtoexecute.sh: wrong number of input parameters. Exiting."
 echo -e "Usage: bash howtoexecute.sh <supercomputer>"
 echo -e "  g.e: bash howtoexecute.sh sdumont"
}

sdumont()
{
 sbatch v100-ncclReduceScatter.sh
}

#args in comand line
if [ "$#" ==  0 ]; then
 usage
 exit
fi

#sdumont
if [[ $1 == "sdumont" ]];then
 sdumont
fi

In [None]:
!bash howtoexecute.sh sdumont

#### Print output in log file

In [None]:
!cat *-ncclReduceScatter.log

### Peer-to-Peer Communications (P2P)

Peer-to-peer communication can be used to express any communication pattern between multiple GPUs. Any peer-to-peer communication needs two NCCL calls: one to send the message (__ncclSend__) and the other to receive it (__ncclRecv__), and every message must have the exact count and data typing. Multiple calls to __ncclSend__ and __ncclRecv__ can be combined with __ncclGroupStart__ and __ncclGroupEnd__ to form more complex communication patterns, i.e., the NCCL semantics allow all variants with different sizes, data types, and buffers, by classification, for example: scattering communications, meetings or communication between neighbors in N-dimensional spaces. The syntax of the __ncclSend__ and __ncclRecv__ routines is shown below, as their respective anacronyms in MPI.

Peer-to-peer communications within a split will be asymmetric and blocked until the group call is completed. Still, calls within a division can be seen as progressing independently, so they should always be open to each other. Analogous to MPI, a point-to-point operation can be expressed as follows:

```cpp
   ncclSend(const void* sendbuff,                               ncclRecv(const void* recvbuff,                 
            size_t sendcount,                                            size_t recvcount,                            
            ncclDataType_t datatype,                                     ncclDataType_t datatype, 
            int peer,                                                    int peer,             
            ncclComm_t comm,                                             ncclComm_t comm,                           
            cudaStream_t stream                                          cudaStream_t stream 
            );                                                           );                                        
```

In [None]:
%%writefile ncclSendRecv.cu
#include <stdio.h>
#include <stdlib.h>
#include <cuda_runtime.h>
#include <nccl.h>

__global__ void kernel(int *a, int rank) 
{ 
  if(rank == 0)
    printf("%d\t", a[threadIdx.x]); 
      else
        printf("%d\t", a[threadIdx.x] * 10); 
}
 
void showAll(int *in, int n)
{
  printf("\nThis is the host\n");
  for(int i = 0; i < n; i++)
    printf("%d\t", in[i]);     
  printf("\n");
}

int main(int argc, char* argv[]) 
{
  /*Variables*/  
  int size = 8;
  int nGPUs = 0;
  cudaGetDeviceCount(&nGPUs);
  printf("nGPUs = %d\n",nGPUs);
  
  int *host       = (int*) malloc(size      * sizeof(int));
  int **sendbuff  = (int**)malloc(nGPUs     * sizeof(int*));
  int **recvbuff  = (int**)malloc(nGPUs     * sizeof(int*));
    
  int *DeviceList = (int *) malloc ( nGPUs * sizeof(int));
  for(int i = 0; i < nGPUs; ++i)
      DeviceList[i] = i;
  
  /*Initializing NCCL with Multiples Devices per Thread*/
  ncclComm_t* comms         = (ncclComm_t*)  malloc(sizeof(ncclComm_t)  * nGPUs);  
  cudaStream_t* stream      = (cudaStream_t*)malloc(sizeof(cudaStream_t)* nGPUs);
  ncclCommInitAll(comms, nGPUs, DeviceList); 

  /*Population of vector*/
  for(int i = 0; i < size; i++)
      host[i] = i + 1;

  showAll(host, size);

  for(int g = 0; g < nGPUs; g++) 
  {
      cudaSetDevice(DeviceList[g]);
      cudaStreamCreate(&stream[g]);
      cudaMalloc(&sendbuff[g], size * sizeof(int));
      cudaMalloc(&recvbuff[g], size * sizeof(int));
     
      if(g == 0) /*Copy from host to devices*/
        cudaMemcpy(sendbuff[g], host, size * sizeof(int),cudaMemcpyHostToDevice);    
  }
  
   
  ncclGroupStart();        
  for(int g = 0; g < nGPUs; g++) 
  {
      ncclSend(sendbuff[0], size, ncclInt, g, comms[g], stream[g]);
      ncclRecv(recvbuff[g], size, ncclInt, g, comms[g], stream[g]);
  }
  ncclGroupEnd();          
    
  for(int g = 0; g < nGPUs; g++) 
  {
      cudaSetDevice(DeviceList[g]);
      printf("\nThis is the device [%d]\n", g);
      
      if(g == 0)
        kernel <<< 1 , size >>> (sendbuff[g], 0); 
          else
             kernel <<< 1 , size >>> (recvbuff[g], g); 
 
      cudaDeviceSynchronize();
  }
  printf("\n");

  for(int g = 0; g < nGPUs; g++) /*Synchronizing CUDA Streams*/
  {
    cudaSetDevice(DeviceList[g]);
    cudaStreamSynchronize(stream[g]);
  }
 
  for(int g = 0; g < nGPUs; g++) /*Destroy CUDA Streams*/
  {
    cudaSetDevice(DeviceList[g]);
    cudaStreamDestroy(stream[g]);
  }

  for(int g = 0; g < nGPUs; g++) /*Finalizing NCCL*/
     ncclCommDestroy(comms[g]);
  
  /*Freeing memory*/
  free(host);
  free(DeviceList); 
    
  cudaFree(stream);
  cudaFree(sendbuff);
  cudaFree(recvbuff);

  return 0;
}

#### Run the Code

##### Compile with Shell Script

In [None]:
%%writefile howtocompile.sh
#!/bin/bash

usage()
{
 echo "howtocompile.sh: wrong number of input parameters. Exiting."
 echo -e "Usage: bash howtocompile.sh <supercomputer>"
 echo -e "  g.e: bash howtocompile.sh sdumont"
}

sdumont()
{
 module load nccl/2.13_cuda-11.2
 nvcc ncclSendRecv.cu -o ncclSendRecv -lnccl $CPPFLAGS $LDFLAGS
}

#args in comand line
if [ "$#" ==  0 ]; then
 usage
 exit
fi

#sdumont
if [[ $1 == "sdumont" ]];then
 sdumont
fi

In [None]:
!bash howtocompile.sh sdumont

##### Execute with Shell Script

In [None]:
%%writefile v100-ncclSendRecv.sh
#!/bin/bash

#SBATCH --job-name=ncclSendRecv                # Job name
#SBATCH --nodes=1                              # Run on 1 node  
#SBATCH --partition=sequana_gpu_dev            # Partition SDUMONT
#SBATCH --output=out_v100_%j-ncclSendRecv.log  # Standard output and error log
#SBATCH --ntasks-per-node=1                    # 1 job per node

module load nccl/2.13_cuda-11.2
./ncclSendRecv

In [None]:
%%writefile howtoexecute.sh
#!/bin/bash

usage()
{
 echo "howtoexecute.sh: wrong number of input parameters. Exiting."
 echo -e "Usage: bash howtoexecute.sh <supercomputer>"
 echo -e "  g.e: bash howtoexecute.sh sdumont"
}

sdumont()
{
 sbatch v100-ncclSendRecv.sh
}

#args in comand line
if [ "$#" ==  0 ]; then
 usage
 exit
fi

#sdumont
if [[ $1 == "sdumont" ]];then
 sdumont
fi

In [None]:
!bash howtoexecute.sh sdumont

#### Print output in log file

In [None]:
!cat *-ncclSendRecv.log

## Exercise 1: Calculation of PI Number using the Riemann Integral

From the following MPI code, write a parallel program using NCCL that makes use of the collective communication functions`Broadcast` and `Reduce` to calculate the PI number through the Integration of the $\frac{1}{1+x^2}$, where the Riemann sum approximates the integral.

```c++
#include <stdio.h>
#include <mpi.h>

int main(int argc, char **argv) 
{ 
  int master = 0, size, myrank, npoints, npointslocal, i;
  double delta, add, addlocal, x;

  MPI_Init( &argc, &argv );
  MPI_Comm_size( MPI_COMM_WORLD, &size );
  MPI_Comm_rank( MPI_COMM_WORLD, &myrank );

  if(myrank == master)
  {
    npoints = 1000;
    printf("\nNumbers of divide points (%d):\n",npoints);
  }

  MPI_Bcast(&npoints, 1, MPI_INT, master, MPI_COMM_WORLD);

  delta = 1.0/((double) npoints);
  npointslocal = npoints/size;

  printf("===================> %ld %ld %ld\n", myrank, npoints, npointslocal);

  addlocal = 0;

  x = myrank * npointslocal * delta;

  for(i = 1; i <= npointslocal; ++i)
  {
    addlocal = addlocal + 1.0/(1+x*x);
    x = x + delta;
  }

  MPI_Reduce(&addlocal, &add, 1, MPI_DOUBLE, MPI_SUM, master, MPI_COMM_WORLD);
    
  if(myrank == master)
  {
     add = 4.0 * delta * add;
     printf("Pi = %20.16lf\n", add);
  }
 
  MPI_Finalize(); 
  
  return 0;  
}   
```

## Exercise 2: AllReduce on Multi-GPU Systems

From the definition of the __ncclAllReduce__ function, write a parallel program using NCCL that makes use of the collective communication function of `AllReduce`.

```cpp
     ncclAllReduce(const void* sendbuff,           
                   void* recvbuff,                            
                   size_t count,                              
                   ncclDataType_t datatype,                   
                   ncclRedOp_t op,                                                              
                   ncclComm_t comm,                           
                   cudaStream_t stream                      
                  );
```

## Exercise 3: Matrix-Vector Multiply on Multi-GPU Systems

Create a parallel program on multi-GPU systems that executes a matrix-vector product $y=Ax+b$  using a data distribution where the matrix $A$ and the result vector $y$ are distributed in blocks by rows of size $b$, and the vector $x$ is spread in its entirety to all computational resources. Once the partial products have been made, the computational resource $P_i$ should only have the $y_i$ block of $y$. The simplest way to perform this operation is through the `AllGather` operation of the block stored in all resources in the group. Below is the MPI solution for $n$ elements of the vector and distributed among all the resources $y_{distr}$ of the group.

```cpp
void function(int n, double *y_distr, double *y) 
{ 
 int P;                             //number of resources 'P'
 MPI_Comm_size(MPI_COMM_WORLD, &P); //Getting the number of features 'P'
 send_count =  n / P;               //send_count' generates the value of 'n' and 'P'

 MPI_Allgather(y_distr, send_count, MPI_DOUBLE, Y, send_count, MPI_DOUBLE, MPI_COMM_WORLD); 
}  
```

## Clear the Memory

Before moving on, please execute the following cell to clear up the CPU memory. This is required to move on to the next notebook.

In [None]:
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)

## Next

In the next notebook we will look at CUDA-aware MPI which will give us the benefits of the SPMD programming model [_3-SDumont-NCCL-P2P.ipynb_](3-SDumont-NCCL-P2P.ipynb).