# Large-to-Small joins with Dask and cuDF

This joins a distributed dask-cudf dataframe with a single cudf dataframe.  This is a simple, but very common case.

## Use a DGX

In [None]:
from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster

cluster = LocalCUDACluster(diagnostics_port=9000)
client = Client(cluster)
client

## Create Random Dataset

This runs on the GPU, and so is a little bit slow

Also, cudf doesn't handle datetime indexes well yet, so we convert to integer dtype

In [None]:
import dask, cudf, numpy as np

lam = 1000000

left = dask.datasets.timeseries(
    '2000', '2001', 
    dtypes={'id': int, 'x': float, 'y': float},
    freq='10ms',
    partition_freq='2d',
    id_lam=lam,
).reset_index().persist()


n = 1000000
right = cudf.DataFrame({
    'id': np.random.randint(0, lam, size=n),
    'z': np.random.random(n),
})

## Convert data to GPU and persist in device memory

In [None]:
gleft = left.map_partitions(cudf.from_pandas).persist()

## Join on the index

The indexes of both dataframes are co-sorted, so relatively little communication has to happen.  We just need to do a bit of rearrangement so that the 2-day partitioned dataframe aligns with the 5-day partitioned dataframe.

In [None]:
import dask.dataframe as dd
out = dd.merge(gleft, right, how='inner', on=['id'])
out

In [None]:
import time
start = time.time()

In [None]:
out = out.persist()
%time _ = wait(out)

In [None]:
len(out)

In [None]:
_ = client.profile(start=start, filename='dask-cudf-join-small-profile.html')

## Inspect output

In [None]:
from distributed.utils import format_bytes

In [None]:
format_bytes(len(left) * 8 * len(left.columns))  # TODO: cudf needs `.memory_usage()` method

In [None]:
format_bytes(len(out) * 8 * len(out.columns))  # TODO: cudf needs `.memory_usage()` method