# **Multi-GPU Numerical Computing: Dask cuDF + Numba CUDA**

## **Prerequisites**

This tutorial assume proficiency in Python and the following libraries:

* pandas/cuDF
* NumPy/CuPy
* Numba

Demo System - Benchmarking was performed on a DGX Station A100 320GB

## **Why Dask cuDF + Numba**

[Numba CUDA Python](https://numba.readthedocs.io/en/stable/cuda/index.html) has been used to GPU accelerate code without leaving Python. This is extremely compelling for those performing rapid prototyping or maintain a desire to stay in Python. Most examples and applications of Numba are single GPU. This notebook demonstrates achiving higher kernel proformance on a compute intensive workload using multiple GPUs in a local cluster managed by Dask.

**When to consider this programming pattern:**
1. You have a larger than memory data problem
2. You are fully saturating a single GPUs
3. Streaming data needs to bel loaded and rapidly processed by an expensive kernel
4. You do not want to develop your own job scheduling software

**Note:**
1. Sending data between devices implies an I/O penalty and overhead -- performance improvements will be most pronounced if already saturating a single GPU (for smaller problems, overheads could dominate performance)
2. Dask cuDF will be more difficult to use with many n-dimensional array problems -- also consider Dask Arrays and cuNumeric


## **Problem Overview**

We will explore a common nearest neighbors problem in ND-array computing to explore these programming paradigms. Although there are some more efficient techniques to solve this problem -- we add an interesting constraint, our reference points change after each calculation, making it more difficult to apply precomputed indexing strategies. As a result, we focus our attention on the brute force technique. The brute force technique is easier to grasp and increases the amount of calculations and comparisons we need to do, and thereby, its arithmetic intensity will challenge each method. We'll leverage a data generation script to simulate geospatial points in radians and use the haversine great circle distance as our distance metric. This is a popular technique used to calculate the distance between two points on earth.

<center><img src="./media/haversine-graphic.png" alt="RAPIDS Logo" style="width: 150;"/></center></br>

The graphic below illustrates the dynamic nature of the problem we are solving and implies the need for compute efficient as each timestep approaches zero.

<center><img src="./media/DynamicDecisionBoundaries.png" alt="RAPIDS Logo" style="width: 1000;"/></center>

In this notebook, we will evaluate running our Numba CUDA Kernel (from our single cpu/gpu notebook) using all the GPUs on our demo system on a problem scaled up by 2048x - 8.8T

**Spoiler Alert -- This Multi-GPU techniques out perform the Multi-CPU technique by orders of magnitude.**

<center><img src="./media/AllScaledCpuGpuPerfTable.png" alt="RAPIDS Logo" style="width: 1000;"/></center>

 # **Multi-GPU Experiment**

In [None]:
from dask_cuda import LocalCUDACluster
from dask.distributed import Client

import cudf
import dask_cudf
import cupy as cp
from numba import cuda

from src.solvers import (block_min_reduce,
                         global_min_reduce)

from src.simulator import generate_geos
from src.utils import check_accuracy

import numpy as np
import math

Define constants for the size of our experiment and evaluation criteria.

In [None]:
N_OBS, N_REF = 2**27, 2**16 # single processor experiment
N_OBS_VAL, N_REF_VAL = 500, 200 # check accuracy
print("Problem Size (N_OBS * N_REF): {:.2f}T".format(N_OBS * N_REF * 1e-12))

## **Start Dask CUDA Cluster**

With a few lines of code spin up a local dask cluster of GPUs that can be scheduled to complete our workload

In [None]:
cluster = LocalCUDACluster()
client = Client(cluster)
client

## **Define ```map_partitions``` function**

The ```_get_nearest_part``` function will perform our double Numba CUDA kernal launch pattern outlined in the single gpu/cpu notebook.  Here we are launching these kernel to execute on chunks of our data partitioned by Dask.

In [None]:
def _get_nearest_part(part_df, coord2=None, block_idx=None, block_dist=None):
    
    coord1 = part_df[["LAT_RAD", "LON_RAD"]].to_cupy()
    coord2 = coord2.to_cupy()

    block_idx_mat = cp.empty(
        (coord1.shape[0], 32), 
        dtype=np.uint32
    )
    
    block_dist_mat = cp.empty(
        (coord1.shape[0], 32),
        dtype=np.float32
    )        
    
    out_idx = cp.empty(
        (coord1.shape[0]), 
        dtype=np.uint32
    )
    
    out_dist = cp.empty(
        (coord1.shape[0]), 
        dtype=np.float32
    )    
    
    bpg = 32, 108
    tpb = 32, 16    
    
    block_min_reduce[bpg, tpb](
        coord2, 
        coord1, 
        block_idx_mat,
        block_dist_mat
    )   

    bpg = (1, 108*20)
    tpb = (32, 16)    
    
    global_min_reduce[bpg, tpb](
        block_dist_mat, 
        block_idx_mat, 
        out_dist, 
        out_idx
    )      
    
    cuda.synchronize()
    
    part_df["out_idx"] = out_idx
    part_df["out_dist"] = out_dist
            
    return (part_df)

## **Define Multi-GPU Function**

Define a function that will distribute our dataset across the local CUDA cluster and map our ```_get_nearest_part``` function to each data partition. Final results are returned in a cuDF DataFrame residing on the default GPU.

Note - we did not have to develop our own job scheduling mechanism, Dask handles this for us!

In [None]:
def get_nearest(d_obs, d_ref):

    gdf_ref = cudf.DataFrame()
    gdf_obs = cudf.DataFrame()
    
    gdf_ref["LAT_RAD"] = d_ref[:,0]
    gdf_ref["LON_RAD"] = d_ref[:,1]
    gdf_obs["LAT_RAD"] = d_obs[:,0]
    gdf_obs["LON_RAD"] = d_obs[:,1]
    
    ddf = dask_cudf.from_cudf(gdf_obs, npartitions=4)
    
    gdf_result = ddf.map_partitions(
        _get_nearest_part, 
        coord2=gdf_ref,
    ).compute()
    
    return (cp.asarray(gdf_result["out_idx"]),
            cp.asarray(gdf_result["out_dist"]))
                      

## **Generate Dataset**

Let's generate a scaled up synthetic dataset and validation dataset for our work today using an included utility function. These datasets represent the following:

* ```d_obs``` contains ```N_OBS``` geospatial observations in radians on the GPU, used for our full scale benchmark
* ```d_ref``` contains ```N_REF``` geospatial reference points in radians on the GPU, used for our full scale benchmark
* ```d_obs_val``` contains ```N_OBS_VAL``` geospatial observations points in radians on the GPU, used to validate accuracy
* ```d_ref_val``` contains ```N_REF_VAL``` geospatial reference points in radians on the GPU, used to validate accuracy

In [None]:
d_ref = generate_geos(N_REF, random_state=1)
d_obs = generate_geos(N_OBS, random_state=2)

d_ref_val = generate_geos(N_REF_VAL, random_state=1)
d_obs_val = generate_geos(N_OBS_VAL, random_state=2)

## **Validate Accuracy**

Verify our multi-GPU implementation is producing the correct results.

In [None]:
d_val_idx, d_val_dist = get_nearest(
    d_obs_val, 
    d_ref_val
)

print("Accuracy - Dask Numba CUDA Multi-GPU:", 
      check_accuracy(
          d_obs_val, 
          d_ref_val,
          d_val_idx, 
          d_val_dist)
     )

## **Benchmark Performance**

We observe our kernel completes in roughly 14.7ms on our demo system, ~511x faster than the multi-CPU alternative!

In [None]:
%%timeit
d_val_idx, d_val_dist = get_nearest(
    d_obs, 
    d_ref
)

# **Summarize Results**

In summary, we observe our multi-GPU technique solves our scaled up problem orders of magnitude faster than the parallel CPU alternative. This implementation required less developer effort than the tailor-made solution implementation, but achieved slightly lower performance for this use case. We also acknowledge this might be less appropriate many n-dimensional array problems (e.g. non-DataFrame operations).

<img src="./media/MultiScaledCpuGpuPerfTable.png" alt="RAPIDS Logo" style="width: 150;"/>

<br>
<div align="left"><h2><b>Please Restart the Kernel<b></h2></div>

In [None]:
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)