# Dask Overview

Dask is a flexible library for parallel computing in Python that makes scaling out your workflow smooth and simple. On the CPU, Dask uses Pandas (NumPy) to execute operations in parallel on DataFrame (array) partitions.

Dask-cuDF extends Dask where necessary to allow its DataFrame partitions to be processed by cuDF GPU DataFrames as opposed to Pandas DataFrames. For instance, when you call dask_cudf.read_csv(…), your cluster’s GPUs do the work of parsing the CSV file(s) with underlying cudf.read_csv(). Dask also supports array based workflows using CuPy.

## When to use Dask
If your workflow is fast enough on a single GPU or your data comfortably fits in memory on a single GPU, you would want to use cuDF or CuPy. If you want to distribute your workflow across multiple GPUs, have more data than you can fit in memory on a single GPU, or want to analyze data spread across many files at once, you would want to use Dask.

One additional benefit Dask provides is that it lets us easily spill data between device and host memory. This can be very useful when we need to do work that would otherwise cause out of memory errors.

In this brief notebook, you'll walk through an example of using Dask on a single GPU. Because we're using Dask, the same code in this notebook would work on two, eight, 16, or 100s of GPUs.

# Creating a Local Cluster

The easiest way to scale workflows on a single node is to use the `LocalCUDACluster` API. This lets us create a GPU cluster, using one worker per GPU by default.

In this case, we'll pass the following arguments. 

- `CUDA_VISIBLE_DEVICES`, to limit our cluster to a single GPU (for demonstration purposes).
- `device_memory_limit`, to illustrate how we can spill data between GPU and CPU memory. Artificial memory limits like this reduce our performance if we don't actually need them, but can let us accomplish much larger tasks when we do.
- `rmm_pool_size`, to use the RAPIDS Memory Manager to allocate one big chunk of memory upfront rather than having our operations call `cudaMalloc` all the time under the hood. This improves performance, and is generally a best practice.

In [1]:
from dask.distributed import Client, fire_and_forget, wait
from dask_cuda import LocalCUDACluster
from dask.utils import parse_bytes
import dask


cluster = LocalCUDACluster(
    CUDA_VISIBLE_DEVICES="0,1",
    device_memory_limit=parse_bytes("4GB"),
    rmm_pool_size=parse_bytes("8GB"),
)    

client = Client(cluster)
client

0,1
Client  Scheduler: tcp://127.0.0.1:39763  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 2  Cores: 2  Memory: 1.62 TB


Click the **Dashboard** link above to view your Dask dashboard. 

## cuDF DataFrames to Dask DataFrames

Dask lets scale our cuDF workflows. We'll walk through a couple of examples below, and then also highlight how Dask lets us spill data from GPU to CPU memory.

First, we'll create a dataframe with CPU Dask and then send it to the GPU

In [2]:
import cudf
import dask_cudf

In [3]:
ddf = dask_cudf.from_dask_dataframe(dask.datasets.timeseries())
ddf.head()

Unnamed: 0_level_0,id,name,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-01 00:00:00,1019,Bob,0.682646,0.667236
2000-01-01 00:00:01,1030,Frank,0.96429,0.642943
2000-01-01 00:00:02,977,Ursula,0.36498,0.569051
2000-01-01 00:00:03,957,Oliver,0.28646,-0.423077
2000-01-01 00:00:04,999,Tim,-0.76839,-0.695013


### Example One: Groupby-Aggregations

In [4]:
ddf.groupby(["id", "name"]).agg({"x":['sum', 'mean']}).head()

Unnamed: 0_level_0,Unnamed: 1_level_0,x,x
Unnamed: 0_level_1,Unnamed: 1_level_1,sum,mean
id,name,Unnamed: 2_level_2,Unnamed: 3_level_2
1096,Sarah,-1.777609,-0.177761
990,Hannah,37.040702,0.031659
955,Quinn,-2.622262,-0.005789
930,Oliver,-3.321265,-0.040015
1036,Quinn,-10.153046,-0.015199


Run the code above again.

If you look at the task stream in the dashboard, you'll notice that we're creating the data every time. That's because Dask is lazy. We need to `persist` the data if we want to cache it in memory.

In [5]:
ddf = ddf.persist()
wait(ddf);

In [6]:
ddf.groupby(["id", "name"]).agg({"x":['sum', 'mean']}).head()

Unnamed: 0_level_0,Unnamed: 1_level_0,x,x
Unnamed: 0_level_1,Unnamed: 1_level_1,sum,mean
id,name,Unnamed: 2_level_2,Unnamed: 3_level_2
1096,Sarah,-1.777609,-0.177761
990,Hannah,37.040702,0.031659
955,Quinn,-2.622262,-0.005789
930,Oliver,-3.321265,-0.040015
1036,Quinn,-10.153046,-0.015199


This is the same API as cuDF, except it works across many GPUs.

### Example Two: Rolling Windows

We can also do things like rolling window calculations with Dask and GPUs.

In [7]:
ddf.head()

Unnamed: 0_level_0,id,name,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-01 00:00:00,1019,Bob,0.682646,0.667236
2000-01-01 00:00:01,1030,Frank,0.96429,0.642943
2000-01-01 00:00:02,977,Ursula,0.36498,0.569051
2000-01-01 00:00:03,957,Oliver,0.28646,-0.423077
2000-01-01 00:00:04,999,Tim,-0.76839,-0.695013


In [8]:
rolling = ddf[['x','y']].rolling(window=3)
type(rolling)

dask.dataframe.rolling.Rolling

In [9]:
rolling.mean().head()

Unnamed: 0_level_0,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1
2000-01-01 00:00:00,,
2000-01-01 00:00:01,,
2000-01-01 00:00:02,0.670638545,0.626410029
2000-01-01 00:00:03,0.538576576,0.262972605
2000-01-01 00:00:04,-0.038983675,-0.18301256


## Larger than GPU Memory Workflows

What if we needed to scale up even more, but didn't have enough GPU memory? Dask handles spilling for us, so we don't need to worry about it. The `device_memory_limit` parameter we used while creating the LocalCluster determines when we shoul start spilling. In this case, we'll start spilling when we've used about 4GB of GPU memory.

Let's create a larger dataframe to use as an example.

In [10]:
ddf = dask_cudf.from_dask_dataframe(dask.datasets.timeseries(start="2000-01-01", end="2000-12-31"))

ddf = ddf.persist()
len(ddf)

31536000

In [11]:
print(f"{ddf.memory_usage(deep=True).sum().compute() / 1e9} GB of data")

1.303887943 GB of data


In [12]:
ddf.head()

Unnamed: 0_level_0,id,name,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-01 00:00:00,1027,Tim,-0.107746,0.452806
2000-01-01 00:00:01,972,Tim,0.00176,-0.799538
2000-01-01 00:00:02,974,Michael,-0.71728,0.572829
2000-01-01 00:00:03,991,Yvonne,0.237424,-0.712032
2000-01-01 00:00:04,1028,Yvonne,-0.879625,-0.798144


Let's imagine we have some downstream operations that require all the data from a given unique identifier in the same partition. We can repartition our data based on the `name` column using the `shuffle` API.

Repartitioning our 31 million row dataframe will spike GPU memory higher than 4GB, so we'll need to spill to CPU memory.

In [13]:
ddf = ddf.shuffle(on="id")
ddf = ddf.persist()

len(ddf)

31536000

Watch the Dask Dashboard while this runs. You should see a lot of tasks in the stream like `disk-read` and `disk-write`. Setting a `device_memory_limit` tells dask to spill to CPU memory and potentially disk (if we overwhelm CPU memory). This lets us do these large computations even when we're almost out of memory (though in this case, we faked it).

# Dask Custom Functions

Dask dataframes also provide a `map_partitions` API, which is very useful for parallelizing custom logic that doesn't quite fit perfectly or doesn't need to be used with the Dask dataframe API. Dask will `map` the function to every partition of the distributed dataframe.

Now that we have all the rows of each `id` collected in the same partitions, what if we just wanted to sort **within each partition**. Avoiding global sorts is usually a good idea if possible, since they're very expensive operations.

In [54]:
sorted_ddf = ddf.map_partitions(lambda x: x.sort_values("id"))
len(sorted_ddf)

CPU times: user 996 ms, sys: 0 ns, total: 996 ms
Wall time: 1.57 s


31536000

We could also do something more complicated and then wrap it into a function. Let's do a rolling window on the two value columns after sorting by the id column.

In [43]:
def sort_and_rolling_mean(df):
    df = df.sort_values("id")
    df = df.rolling(3)[["x", "y"]].mean()
    return df

In [45]:
result = ddf.map_partitions(sort_and_rolling_mean)
result = result.persist()
wait(result);

In [52]:
# let's look at a random partition
result.partitions[89].head()

Unnamed: 0_level_0,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1
2000-01-15 12:31:08,,
2000-01-12 01:56:23,,
2000-02-02 20:08:14,-0.110677502,0.141042225
2000-01-22 16:25:46,0.277448249,0.527516849
2000-01-27 12:33:05,0.066509847,0.233281992


# Dask Delayed

Dask also provides a `delayed` API, which is very useful for parallelizing custom logic that doesn't quite fit into the dataframe API. We can use this to accomplish really cool things like parallelizing single-machine complex logic.

Let's imagine we wanted to run thousands of regressions models on different combinations of features. We can do this experiment super easily with dask.delayed.

In [118]:
from cuml.linear_model import LinearRegression
from dask import delayed
import dask
import numpy as np
from itertools import combinations

In [113]:
# Setup data
np.random.seed(12)

nrows = 10000
ncols = 5
df = cudf.DataFrame({f"x{i}": np.random.randn(nrows) for i in range(ncols)})
df['y'] = np.random.randn(nrows)

In [114]:
feature_combinations = list(combinations(df.columns.drop("y"), 2))
feature_combinations

[('x0', 'x1'),
 ('x0', 'x2'),
 ('x0', 'x3'),
 ('x0', 'x4'),
 ('x1', 'x2'),
 ('x1', 'x3'),
 ('x1', 'x4'),
 ('x2', 'x3'),
 ('x2', 'x4'),
 ('x3', 'x4')]

In [180]:
# Many calls to linear regression, parallelized with Dask
def fit_ols(df, feature_cols, target_col="y"):
    clf = LinearRegression()
    clf.fit(df[list(feature_cols)], df[target_col])
    return feature_cols, clf.coef_, clf.intercept_

In [181]:
# scatter the data to the workers beforehand
data_future = client.scatter(df, broadcast=True)

In [190]:
results = []

for features in feature_combinations:
    # note how i'm passing the scattered data future
    res = delayed(fit_ols)(data_future, features)
    results.append(res)

res = dask.compute(results)
res = res[0]

print("Features\tCoefficients\t\t\tIntercept")
for i in range(5):
    print(res[i][0], res[i][1].values, res[i][2], sep="\t")

Features	Coefficients			Intercept
('x0', 'x1')	[-0.00055602  0.01070128]	-0.004087908525406975
('x0', 'x2')	[-0.00048411 -0.02332247]	-0.004045267357105907
('x0', 'x3')	[-0.0006734 -0.0026353]	-0.0041057561740846385
('x0', 'x4')	[-0.00065388  0.00277832]	-0.0040876786937341135
('x1', 'x2')	[ 0.01056059 -0.02325919]	-0.004061406231079496


# Handling Parquet Files

In [23]:
ddf.to_parquet("ddf.parquet")

In [25]:
del ddf

In [31]:
ddf = dask_cudf.read_parquet("ddf.parquet/*.parquet")
ddf

Unnamed: 0_level_0,id,name,x,y
npartitions=365,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
,int64,object,float64,float64
,...,...,...,...
...,...,...,...,...
,...,...,...,...
,...,...,...,...


In [32]:
ddf = dask_cudf.read_parquet("ddf.parquet/")
ddf

Unnamed: 0_level_0,id,name,x,y
npartitions=214,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
,int64,object,float64,float64
,...,...,...,...
...,...,...,...,...
,...,...,...,...
,...,...,...,...


In [39]:
dask_cudf.read_parquet("ddf.parquet/")

Unnamed: 0_level_0,id,name,x,y
npartitions=213,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
,int64,object,float64,float64
,...,...,...,...
...,...,...,...,...
,...,...,...,...
,...,...,...,...


In [40]:
ddf.repartition(npartitions=1).to_parquet("big_ddf.parquet")

In [45]:
dask_cudf.read_parquet("big_ddf.parquet/")

Unnamed: 0_level_0,id,name,x,y
npartitions=32,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
,int64,object,float64,float64
,...,...,...,...
...,...,...,...,...
,...,...,...,...
,...,...,...,...


In [48]:
dask_cudf.read_parquet("big_ddf.parquet/", split_row_groups=False)

Unnamed: 0_level_0,id,name,x,y
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
,int64,object,float64,float64
,...,...,...,...


In [49]:
dask_cudf.read_parquet("big_ddf.parquet/", split_row_groups=True)

Unnamed: 0_level_0,id,name,x,y
npartitions=32,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
,int64,object,float64,float64
,...,...,...,...
...,...,...,...,...
,...,...,...,...
,...,...,...,...


# Understanding Persist and Compute

Most Dask operations are lazy. This is a common pattern in distributed computing, but is likely unfamiliar to those who come from primarily single-machine libraries like pandas and cuDF. As a result, you'll usually need to call an **eager** operation like `len` or `persist` to actually trigger work.

In general, you should avoid calling `compute`. When we spin up a cluster, we're interacting with our cluster in what we call the `Client` Python process. When we created a `Client` object above, this is what we did. Calling `compute` brings all of the results back to a single GPU cuDF DataFrame in the client process, not in any of the worker processes. This means we're not using the same memory pool, so we could go out of memory if we're not careful.

For those of you with Spark experience, you can think of `persist` as triggering work and caching the dataframe in distributed memory and `compute` as collecting the data or results into a single GPU dataframe (cuDF) on the driver.


### Should I Persist My Data?

Persisting is generally a good idea if the data needs to be accessed multiple times, to avoid repeated computation. However, if the size of your data would lead to memory pressure, this could cause spilling, which hurts performance. As a best practice, we recommend persisting only when necessary or when you're using an eager operation in the middle of your workflow (to avoid repeating computation).

# Summary

RAPIDS lets us scale up and take advantage of GPU acceleration. Dask lets us scale out to multiple machines. Dask supports both CuPy arrays and cuDF DataFrames, with generally the same APIs as the single-machine libraries.

We encourage you to read the Dask [documentation](https://docs.dask.org/en/latest/) to learn more, and also look at our [10 Minute Guide to cuDF and Dask cuDF](https://docs.rapids.ai/api/cudf/nightly/10min.html)