# Dask Overview

### CPU Array Processing

In [None]:
%env NUMPY_EXPERIMENTAL_ARRAY_FUNCTION=1
%env CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7

In [None]:
from dask.distributed import Client, LocalCluster

cluster = LocalCluster(
    n_workers=10,
    threads_per_worker=1
)
client = Client(cluster)
client

In [None]:
import numpy as np
import dask
import dask.array as da

rs = da.random.RandomState(RandomState=np.random.RandomState, seed=12)

x = rs.random((1000000, 1000), chunks=(10000, 1000))
x = x.persist()
x

In [None]:
x[:5, :5].compute()

In [None]:
u, s, v = da.linalg.svd(x)

In [None]:
u, s, v = dask.persist(u, s, v)

In [None]:
u[:5, :5].compute()

In [None]:
client.close()
cluster.close()

### GPU Array Processing

In [None]:
import cupy as cp
import dask
import dask.array as da

In [None]:
from dask_cuda import LocalCUDACluster

cluster = LocalCUDACluster()
client = Client(cluster)
client

In [None]:
rs = da.random.RandomState(RandomState=cp.random.RandomState, seed=12)  # <-- we specify cupy here

x = rs.random((1000000, 1000), chunks=(10000,1000))
x = x.persist()
x

In [None]:
u, s, v = da.linalg.svd(x)

In [None]:
u

In [None]:
u, s, v = dask.persist(u, s, v)

In [None]:
u[:5, :5].compute()

In [None]:
del u, s, v

# cuDF DataFrames to Dask DataFrames

In [None]:
import cudf
import dask_cudf

In [None]:
def make_random_cudf_dataframe(nrows=100000000):
    df = cudf.DataFrame()
    df['a'] = cp.random.randint(low=0, high=1000, size=nrows)
    df['b'] = cp.random.randint(low=0, high=1000, size=nrows)
    df['c'] = cp.random.random(nrows)
    df['d'] = cp.random.random(nrows)
    return df

delayed_cudf_dataframes = [dask.delayed(make_random_cudf_dataframe)() for i in range(len(cluster.workers) * 2)]

In [None]:
ddf = dask_cudf.from_delayed(delayed_cudf_dataframes).persist()

In [None]:
ddf

In [None]:
len(ddf)

### Example One: Groupby-Aggregations

In [None]:
ddf.groupby(["a", "b"]).agg({"c":['sum', 'mean']}).head()

### Example Two: Rolling Windows

In [None]:
rolling = ddf[['c','d']].rolling(window=3)
type(rolling)

In [None]:
rolling.mean().head()

RAPIDS lets us scale up, and Dask lets us scale out.