# Dask Dataframe with cuDF joins

This shows using Dask DataFrame with cuDF on an eight-GPU machine.

This makes three points:

1.  Joins work
2.  They're slow due to communication
3.  Agnostic Pandas/cuDF workflows provide usability gains

## Use a DGX

In [None]:
import dask
import distributed
import cudf
import os

In [None]:
print(f'Dask: {dask.__file__}')
print(f'Distributed: {distributed.__file__}')
print(f'cuDF: {cudf.__file__}')

In [None]:
base_env = {
    "UCX_RNDV_SCHEME": "put_zcopy",
    "UCX_MEMTYPE_CACHE": "n",
    "UCX_TLS": "rc,cuda_copy",
    "CUDA_VISIBLE_DEVICES": "0,1,2,3",
}
os.environ.update(base_env)

In [None]:
from dask.distributed import Client, wait
from dask_cuda import DGX

cluster = DGX(CUDA_VISIBLE_DEVICES=[0, 1, 2, 3], 
              dashboard_address='10.33.227.165:8789')
client = Client(cluster)
# client = Client('ucx://10.33.225.165:42841')
client

## Create Random Dataset

In [None]:
import dask.array as da
import dask.dataframe as dd

n_rows = 500000000
n_keys = 5000000

left = dd.concat([
    da.random.random(n_rows).to_dask_dataframe(columns='x'),
    da.random.randint(0, n_keys, size=n_rows).to_dask_dataframe(columns='id'),
], axis=1).persist()
left

In [None]:
left.npartitions

In [None]:
n_rows = 50000000

right = dd.concat([
    da.random.random(n_rows).to_dask_dataframe(columns='y'),
    da.random.randint(0, n_keys, size=n_rows).to_dask_dataframe(columns='id'),
], axis=1).persist()
right

In [None]:
right.npartitions

## Convert data to GPU and persist in device memory

In [None]:
gleft = left.map_partitions(cudf.from_pandas)

In [None]:
gleft.npartitions

In [None]:
gright = right.map_partitions(cudf.from_pandas)
gright = gright.repartition(npartitions=gleft.npartitions)
gleft, gright = dask.persist(gleft, gright)  # persist data in device memory

In [None]:
gleft.npartitions

In [None]:
len(gleft)

In [None]:
cluster.worker_spec

In [None]:
%time gleft.x.sum().compute()

In [None]:
1+1

In [None]:
from distributed.utils import format_bytes
format_bytes(len(gleft) * 8 * 2)  # TODO: cudf needs `.memory_usage()` method

In [None]:
format_bytes(len(gright) * 8 * 2)

## Join on the ID column

In [None]:
out = gleft.merge(gright, on=['id'], left_index=False)  # this is lazy
out

In [None]:
import time
start = time.time()
_ = client.get_task_stream(start=start,
                           filename='dask-join-ucx-task-less-data.html')

In [None]:
out = out.persist()
%time _ = wait(out)

In [None]:
_ = client.get_task_stream(start=start, plot='save', filename='dask-join-ucx-task.html')

## Inspect output

In [None]:
len(out)

In [None]:
out.head().to_pandas()

In [None]:
from distributed.utils import format_bytes
format_bytes(len(out) * 8 * 3)  # TODO: cudf needs `.memory_usage()` method