# Dask Overview

Dask is a flexible library for parallel computing in Python that makes scaling out your workflow smooth and simple. On the CPU, Dask uses Pandas (NumPy) to execute operations in parallel on DataFrame (array) partitions.

Dask-cuDF extends Dask where necessary to allow its DataFrame partitions to be processed by cuDF GPU DataFrames as opposed to Pandas DataFrames. For instance, when you call dask_cudf.read_csv(…), your cluster’s GPUs do the work of parsing the CSV file(s) with underlying cudf.read_csv(). Dask also supports array based workflows using CuPy.

## When to use Dask
If your workflow is fast enough on a single GPU or your data comfortably fits in memory on a single GPU, you would want to use cuDF or CuPy. If you want to distribute your workflow across multiple GPUs, have more data than you can fit in memory on a single GPU, or want to analyze data spread across many files at once, you would want to use Dask.

One additional benefit Dask provides is that it lets us easily spill data between device and host memory. This can be very useful when we need to do work that would otherwise cause out of memory errors.

# Creating a Local Cluster

The easiest way to scale workflows on a single node is to use the `LocalCUDACluster` API. This lets us create a GPU cluster, using one worker per GPU by default.

In [None]:
import dask
from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster

cluster = LocalCUDACluster(ip="")
client = Client(cluster)
client

## Distributed GPU Arrays

Let's create a random matrix and calculate the singular value decomposition of it. This is a fairly complex calculation, so it's a great introduction to Dask. Dask can use `CuPy` to create random arrays.

In [None]:
import numpy as np
import dask
import dask.array as da
import cupy as cp

In [None]:
rs = da.random.RandomState(RandomState=cp.random.RandomState, seed=12)  # <-- we specify cupy here

x = rs.random((1000000, 1000), chunks=(10000,1000))
x = x.persist() # so quick we don't need to wait

Notice the `persist` call. Like Apache Spark, Dask operations are lazy . Instead of being executed at that moment, most operations are added to a task graph and the actual evaluation is delayed until the result is needed.


Sometimes, though, we want to force the execution of operations. Calling `persist` on a Dask collection fully computes it (or actively computes it in the background), persisting the result into memory. When we’re using distributed systems, we may want to wait until persist is finished before beginning any downstream operations. We can enforce this contract by using `wait`. Wrapping an operation with `wait` will ensure it doesn’t begin executing until all necessary upstream operations have finished.

Let's look at our distributed array.

In [None]:
x

Dask's visual class representation shows us some information about this distributed array. We can see the size of the array, and of individual chunks, among other things. Remember, a Dask array is made up of individual CuPy or NumPy arrays.

Let's take the SVD now.

In [None]:
u, s, v = da.linalg.svd(x)

In [None]:
u

In [None]:
dask.visualize(u, s, v)

We've just added several hundred tasks to the task graph. We can call `persist` to execute it.

In [None]:
u, s, v = dask.persist(u, s, v)

Now we can take a look at the results.

In [None]:
u[:5, :5].compute()

That's all there is to it. Dask lets us take array workloads and scale up to as many machines as we have!

# Summary

RAPIDS lets us scale up and take advantage of GPU acceleration. Dask lets us scale out to multiple machines. Dask supports both CuPy arrays and cuDF DataFrames, with generally the same APIs as the single-machine libraries.

We encourage you to read the Dask [documentation](https://docs.dask.org/en/latest/) to learn more, and also look at our [10 Minute Guide to cuDF and Dask cuDF](https://docs.rapids.ai/api/cudf/nightly/10min.html)