# Dask Overview

Dask is a flexible library for parallel computing in Python that makes scaling out your workflow smooth and simple. On the CPU, Dask uses Pandas (NumPy) to execute operations in parallel on DataFrame (array) partitions.

Dask.distributed is a lightweight library for distributed computing in Python. It the dask APIs to moderate sized clusters. Dask-CUDA provides GPU-specific specializations for Dask.distributed clusters. Dask-CUDA features include: automatic instantiation of per-GPU workers, memory spilling from GPU, allocation of GPU memory, and more, see https://docs.rapids.ai/api/dask-cuda/stable/api.html for details.

Dask-cuDF extends Dask where necessary to allow its DataFrame partitions to be processed by cuDF GPU DataFrames as opposed to Pandas DataFrames. For instance, when you call `dask_cudf.read_csv(...)`, your cluster’s GPUs do the work of parsing the CSV file(s) with underlying cudf.read_csv(). Dask also supports array based workflows using CuPy.

## When to use Dask
If your workflow is fast enough on a single GPU or your data comfortably fits in memory on a single GPU, you would want to use cuDF or CuPy. If you want to distribute your workflow across multiple GPUs, have more data than you can fit in memory on a single GPU, or want to analyze data spread across many files at once, you would want to use Dask.

One additional benefit Dask provides is that it lets us easily spill data between device and host memory. This can be very useful when we need to do work that would otherwise cause out of memory errors.

In this brief notebook, you'll walk through an example of using Dask on a single GPU. Because we're using Dask, the same code in this notebook would work on two, eight, 16, or 100s of GPUs.

# Creating a Local Cluster

The easiest way to scale workflows on a single node is to use the `LocalCUDACluster` API. This lets us create a GPU cluster, using one worker per GPU by default.

In this case, we'll pass the following arguments. 

- `CUDA_VISIBLE_DEVICES`, to limit our cluster to two GPUs (for demonstration purposes).
- `device_memory_limit`, to illustrate how we can spill data between GPU and CPU memory. Artificial memory limits like this reduce our performance if we don't actually need them, but can let us accomplish much larger tasks when we do.
- `rmm_pool_size`, to use the RAPIDS Memory Manager to allocate one big chunk of memory upfront rather than having our operations call `cudaMalloc` all the time under the hood. This improves performance, and is generally a best practice.

In [1]:
from dask.distributed import Client, fire_and_forget, wait
from dask_cuda import LocalCUDACluster
from dask.utils import parse_bytes
import dask


cluster = LocalCUDACluster(
    CUDA_VISIBLE_DEVICES="0,3",
    device_memory_limit=parse_bytes("4GiB"),
    rmm_pool_size=parse_bytes("16GiB"),
    interface="enp1s0f0",
)

client = Client(cluster)
client

2022-06-17 00:33:45,442 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
2022-06-17 00:33:45,443 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize


0,1
Connection method: Cluster object,Cluster type: dask_cuda.LocalCUDACluster
Dashboard: http://10.33.227.163:8787/status,

0,1
Dashboard: http://10.33.227.163:8787/status,Workers: 2
Total threads: 2,Total memory: 0.98 TiB
Status: running,Using processes: True

0,1
Comm: tcp://10.33.227.163:43769,Workers: 2
Dashboard: http://10.33.227.163:8787/status,Total threads: 2
Started: Just now,Total memory: 0.98 TiB

0,1
Comm: tcp://10.33.227.163:34819,Total threads: 1
Dashboard: http://10.33.227.163:46133/status,Memory: 503.90 GiB
Nanny: tcp://10.33.227.163:44365,
Local directory: /datasets/pentschev/hpc-ai/dask-worker-space/worker-scbddony,Local directory: /datasets/pentschev/hpc-ai/dask-worker-space/worker-scbddony
GPU: Tesla V100-SXM2-32GB,GPU memory: 31.75 GiB

0,1
Comm: tcp://10.33.227.163:41013,Total threads: 1
Dashboard: http://10.33.227.163:39167/status,Memory: 503.90 GiB
Nanny: tcp://10.33.227.163:38087,
Local directory: /datasets/pentschev/hpc-ai/dask-worker-space/worker-fjxp2w0n,Local directory: /datasets/pentschev/hpc-ai/dask-worker-space/worker-fjxp2w0n
GPU: Tesla V100-SXM2-32GB,GPU memory: 31.75 GiB


We are using an RMM pool. CuPy has its own default allocator, we must replace it with RMM's allocator.

In [2]:
import cupy as cp
import rmm

client.run(cp.cuda.set_allocator, rmm.rmm_cupy_allocator);

Click the **Dashboard** link above to view your Dask dashboard. 

## Distributed GPU Arrays

Let's create a random matrix and calculate the singular value decomposition of it. This is a fairly complex calculation, so it's a great introduction to Dask. Dask can use `CuPy` to create random arrays.

In [3]:
import cupy as cp
import dask
import dask.array as da

In [4]:
rs = da.random.RandomState(RandomState=cp.random.RandomState, seed=12)  # <-- we specify cupy here

x = rs.random((100000, 1000), chunks=(1000,1000))
x = x.persist() # so quick we don't need to wait

Notice the `persist` call. Like Apache Spark, Dask operations are lazy . Instead of being executed at that moment, most operations are added to a task graph and the actual evaluation is delayed until the result is needed.


Sometimes, though, we want to force the execution of operations. Calling `persist` on a Dask collection fully computes it (or actively computes it in the background), persisting the result into memory. When we’re using distributed systems, we may want to wait until persist is finished before beginning any downstream operations. We can enforce this contract by using `wait`. Wrapping an operation with `wait` will ensure it doesn’t begin executing until all necessary upstream operations have finished.

Let's look at our distributed array.

In [5]:
x

Unnamed: 0,Array,Chunk
Bytes,762.94 MiB,7.63 MiB
Shape,"(100000, 1000)","(1000, 1000)"
Count,100 Tasks,100 Chunks
Type,float64,cupy._core.core.ndarray
"Array Chunk Bytes 762.94 MiB 7.63 MiB Shape (100000, 1000) (1000, 1000) Count 100 Tasks 100 Chunks Type float64 cupy._core.core.ndarray",1000  100000,

Unnamed: 0,Array,Chunk
Bytes,762.94 MiB,7.63 MiB
Shape,"(100000, 1000)","(1000, 1000)"
Count,100 Tasks,100 Chunks
Type,float64,cupy._core.core.ndarray


Dask's visual class representation shows us some information about this distributed array. We can see the size of the array, and of individual chunks, among other things. Remember, a Dask array is made up of individual CuPy or NumPy arrays.

Let's take the SVD now.

In [6]:
u, s, v = da.linalg.svd(x)

In [7]:
u

Unnamed: 0,Array,Chunk
Bytes,762.94 MiB,7.63 MiB
Shape,"(100000, 1000)","(1000, 1000)"
Count,814 Tasks,100 Chunks
Type,float64,cupy._core.core.ndarray
"Array Chunk Bytes 762.94 MiB 7.63 MiB Shape (100000, 1000) (1000, 1000) Count 814 Tasks 100 Chunks Type float64 cupy._core.core.ndarray",1000  100000,

Unnamed: 0,Array,Chunk
Bytes,762.94 MiB,7.63 MiB
Shape,"(100000, 1000)","(1000, 1000)"
Count,814 Tasks,100 Chunks
Type,float64,cupy._core.core.ndarray


Nothing happened? Right. Dask is lazy. We've just added several hundred tasks to the task graph. We can call `persist` to execute it.

In [8]:
u, s, v = dask.persist(u, s, v)
_ = wait(u)

Now we can take a look at the results.

In [9]:
u[:5, :5].compute()

array([[ 3.14079572e-03,  3.48997036e-03, -2.00519999e-03,
        -6.57514797e-04,  5.73214976e-05],
       [ 3.17991031e-03, -9.31773216e-04,  8.70805285e-03,
        -9.45197306e-05, -4.56058156e-03],
       [ 3.06961919e-03,  6.17969354e-04, -3.16351892e-03,
         1.39421050e-03, -4.04469215e-03],
       [ 3.15257148e-03,  8.28795664e-04,  1.95901717e-03,
         3.54105925e-03, -5.78993610e-03],
       [ 3.14729734e-03, -2.37407522e-03,  7.71839010e-04,
        -2.75247437e-03,  4.19811818e-04]])

That's all there is to it. Dask lets us take array workloads and scale up to as many machines as we have!

## Larger than GPU Memory Workflows

What if we needed to scale up even more, but didn't have enough GPU memory? Dask handles spilling for us, so we don't need to worry about it. The `device_memory_limit` parameter we used while creating the LocalCluster determines when we should start spilling. In this case, we'll start spilling when we've used about 4GB of GPU memory.

Let's create a larger array to use as an example.

In [10]:
rs = da.random.RandomState(RandomState=cp.random.RandomState, seed=12)  # <-- we specify cupy here

x = rs.random((300000, 1000), chunks=(1000,1000))
x = x.persist() # so quick we don't need to wait

In [11]:
x

Unnamed: 0,Array,Chunk
Bytes,2.24 GiB,7.63 MiB
Shape,"(300000, 1000)","(1000, 1000)"
Count,300 Tasks,300 Chunks
Type,float64,cupy._core.core.ndarray
"Array Chunk Bytes 2.24 GiB 7.63 MiB Shape (300000, 1000) (1000, 1000) Count 300 Tasks 300 Chunks Type float64 cupy._core.core.ndarray",1000  300000,

Unnamed: 0,Array,Chunk
Bytes,2.24 GiB,7.63 MiB
Shape,"(300000, 1000)","(1000, 1000)"
Count,300 Tasks,300 Chunks
Type,float64,cupy._core.core.ndarray


In [12]:
u, s, v = da.linalg.svd(x)

In [13]:
u

Unnamed: 0,Array,Chunk
Bytes,2.24 GiB,7.63 MiB
Shape,"(300000, 1000)","(1000, 1000)"
Count,2414 Tasks,300 Chunks
Type,float64,cupy._core.core.ndarray
"Array Chunk Bytes 2.24 GiB 7.63 MiB Shape (300000, 1000) (1000, 1000) Count 2414 Tasks 300 Chunks Type float64 cupy._core.core.ndarray",1000  300000,

Unnamed: 0,Array,Chunk
Bytes,2.24 GiB,7.63 MiB
Shape,"(300000, 1000)","(1000, 1000)"
Count,2414 Tasks,300 Chunks
Type,float64,cupy._core.core.ndarray


In [14]:
u, s, v = dask.persist(u, s, v)
_ = wait(u)

Watch the Dask Dashboard while this runs. You should see a lot of tasks in the stream like `disk-read` and `disk-write`. Setting a `device_memory_limit` tells dask to spill to CPU memory and potentially disk (if we overwhelm CPU memory). This lets us do these large computations even when we're almost out of memory (though in this case, we faked it).

## cuDF DataFrames to Dask DataFrames

Dask lets scale our cuDF workflows. We'll walk through a couple of examples below, and then also highlight how Dask lets us spill data from GPU to CPU memory.

First, we'll create a dataframe with CPU Dask and then send it to the GPU

In [15]:
import cudf
import dask_cudf

In [16]:
ddf = dask_cudf.from_dask_dataframe(dask.datasets.timeseries())
ddf.head()

Unnamed: 0_level_0,id,name,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-01 00:00:00,998,Dan,-0.437451,-0.716565
2000-01-01 00:00:01,1027,Jerry,-0.209488,0.049667
2000-01-01 00:00:02,1011,Ingrid,-0.061314,0.974116
2000-01-01 00:00:03,957,Jerry,0.287851,0.980581
2000-01-01 00:00:04,1005,Xavier,0.439404,-0.707489


### Example: Groupby-Aggregations

In [17]:
ddf.groupby(["id", "name"]).agg({"x":['sum', 'mean']}).head()

Unnamed: 0_level_0,Unnamed: 1_level_0,"(x, sum)","(x, mean)"
id,name,Unnamed: 2_level_1,Unnamed: 3_level_1
960,Alice,20.614146,0.03748
882,Bob,0.036091,0.018046
1112,Bob,-0.615592,-0.615592
1090,Yvonne,2.561932,0.150702
1042,Victor,6.956916,0.013509


Run the code above again.

If you look at the task stream in the dashboard, you'll notice that we're creating the data every time. That's because Dask is lazy. We need to `persist` the data if we want to cache it in memory.

In [18]:
ddf = ddf.persist()
wait(ddf);

# Dask Custom Functions

Dask DataFrames also provide a `map_partitions` API, which is very useful for parallelizing custom logic that doesn't quite fit perfectly or doesn't need to be used with the Dask dataframe API. Dask will `map` the function to every partition of the distributed dataframe.

Now that we have all the rows of each `id` collected in the same partitions, what if we just wanted to sort **within each partition**. Avoiding global sorts is usually a good idea if possible, since they're very expensive operations.

In [19]:
sorted_ddf = ddf.map_partitions(lambda x: x.sort_values("id"))
len(sorted_ddf)

2592000

We could also do something more complicated and wrap it into a function. Let's do a rolling window on the two value columns after sorting by the id column.

In [20]:
def sort_and_rolling_mean(df):
    df = df.sort_values("id")
    df = df.rolling(3)[["x", "y"]].mean()
    return df

In [21]:
result = ddf.map_partitions(sort_and_rolling_mean)
result = result.persist()
wait(result);

In [22]:
# let's look at a random partition
result.partitions[12].head()

Unnamed: 0_level_0,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1
2000-01-13 14:57:27,,
2000-01-13 22:02:24,,
2000-01-13 22:23:58,0.198054578,0.019415675
2000-01-13 05:50:49,0.323313895,-0.062037341
2000-01-13 11:40:00,0.15772071,-0.036333738


Pretty cool. When we're using `map_partitions`, the function is executing on the individual cuDF DataFrames that make up our Dask DataFrame. This means we can do any cuDF operation, run CuPy array manipulations, or anything else we want.

# Dask Delayed

Dask also provides a `delayed` API, which is useful for parallelizing custom logic that doesn't quite fit into the DataFrame API.

Let's imagine we wanted to run thousands of regressions models on different combinations of two features. We can do this experiment super easily with dask.delayed.

In [23]:
from cuml.linear_model import LinearRegression
from dask import delayed
import dask
import numpy as np
from itertools import combinations

In [24]:
# Setup data
np.random.seed(12)

nrows = 1000000
ncols = 50
df = cudf.DataFrame({f"x{i}": np.random.randn(nrows) for i in range(ncols)})
df['y'] = np.random.randn(nrows)

In [25]:
feature_combinations = list(combinations(df.columns.drop("y"), 2))
feature_combinations[:10]

[('x0', 'x1'),
 ('x0', 'x2'),
 ('x0', 'x3'),
 ('x0', 'x4'),
 ('x0', 'x5'),
 ('x0', 'x6'),
 ('x0', 'x7'),
 ('x0', 'x8'),
 ('x0', 'x9'),
 ('x0', 'x10')]

In [26]:
len(feature_combinations)

1225

In [27]:
# Many calls to linear regression, parallelized with Dask
@delayed
def fit_ols(df, feature_cols, target_col="y"):
    clf = LinearRegression()
    clf.fit(df[list(feature_cols)], df[target_col])
    return feature_cols, clf.coef_, clf.intercept_

In [28]:
# scatter the data to the workers beforehand
data_future = client.scatter(df, broadcast=True)

In [29]:
results = []

for features in feature_combinations:
    # note how i'm passing the scattered data future
    res = fit_ols(data_future, features)
    results.append(res)

res = dask.compute(results)
res = res[0]

print("Features\t\tCoefficients\t\t\tIntercept")
for i in range(5):
    print(res[i][0], res[i][1].values, res[i][2], sep="\t")

Features		Coefficients			Intercept
('x0', 'x1')	[ 0.00161101 -0.00083978]	-0.0002777121165950758
('x0', 'x2')	[ 1.61176583e-03 -1.30786507e-05]	-0.0002780736068258237
('x0', 'x3')	[ 0.00161252 -0.0009235 ]	-0.0002781912466837933
('x0', 'x4')	[ 0.00161228 -0.00112238]	-0.00027828041670714264
('x0', 'x5')	[ 0.00161209 -0.00047165]	-0.000279080271034221


# Understanding Persist and Compute

Before we close, it's worth coming back to the concepts of `persist` and `compute`. We've seen them several times, but haven't gone into depth.

Most Dask operations are lazy. This is a common pattern in distributed computing, but is likely unfamiliar to those who primarily use single-machine libraries like pandas and cuDF. As a result, you'll usually need to call an **eager** operation like `len` or `persist` to actually trigger work.

In general, you should avoid calling `compute` except when collecting small datasets or scalars. When we spin up a cluster, we're interacting with our cluster in what we call the `Client` Python process. When we created a `Client` object above, this is what we did. Calling `compute` brings all of the results back to a single GPU cuDF DataFrame in the client process, not in any of the worker processes. This means we're not using the same memory pool, so we could go out of memory if we're not careful.

For those of you with Spark experience, you can think of `persist` as triggering work and caching the dataframe in distributed memory and `compute` as collecting the data or results into a single GPU dataframe (cuDF) on the driver.


### Should I Persist My Data?

Persisting is generally a good idea if the data needs to be accessed multiple times, to avoid repeated computation. However, if the size of your data would lead to memory pressure, this could cause spilling, which hurts performance. As a best practice, we recommend persisting only when necessary or when you're using an eager operation in the middle of your workflow (to avoid repeating computation).

Note that calling `df.head` is an eager operation, which will trigger some computation. If you're going to be doing exploratory data analysis or visually inspecting the data, you would want to persist beforehand.

# Summary

RAPIDS lets us scale up and take advantage of GPU acceleration. Dask lets us scale out to multiple machines. Dask supports both cuDF DataFrames and CuPy arrays, with generally the same APIs as the single-machine libraries.

We encourage you to read the Dask [documentation](https://docs.dask.org/en/latest/) to learn more, and also look at our [10 Minute Guide to cuDF and Dask cuDF](https://docs.rapids.ai/api/cudf/nightly/10min.html)