10 Minutes to cuDF and Dask-cuDF
=======================

Modeled after 10 Minutes to Pandas, this is a short introduction to cuDF and Dask-cuDF, geared mainly for new users.

### What are these Libraries?

[cuDF](https://github.com/rapidsai/cudf) is a Python GPU DataFrame library (built on the Apache Arrow columnar memory format) for loading, joining, aggregating, filtering, and otherwise manipulating tabular data using a DataFrame style API.

[Dask](https://dask.org/) is a flexible library for parallel computing in Python that makes scaling out your workflow smooth and simple. On the CPU, Dask uses Pandas to execute operations in parallel on DataFrame partitions.

[Dask-cuDF](https://github.com/rapidsai/cudf/tree/main/python/dask_cudf) extends Dask where necessary to allow its DataFrame partitions to be processed by cuDF GPU DataFrames as opposed to Pandas DataFrames. For instance, when you call dask_cudf.read_csv(...), your cluster’s GPUs do the work of parsing the CSV file(s) with underlying cudf.read_csv().


### When to use cuDF and Dask-cuDF

If your workflow is fast enough on a single GPU or your data comfortably fits in memory on a single GPU, you would want to use cuDF. If you want to distribute your workflow across multiple GPUs, have more data than you can fit in memory on a single GPU, or want to analyze data spread across many files at once, you would want to use Dask-cuDF.

In [None]:
import os

import cupy as cp
import pandas as pd
import cudf
import dask_cudf

cp.random.seed(12)

#### Portions of this were borrowed and adapted from the
#### cuDF cheatsheet, existing cuDF documentation,
#### and 10 Minutes to Pandas.

Object Creation
---------------

Creating a `cudf.Series` and `dask_cudf.Series`.

In [None]:
s = cudf.Series([1,2,3,None,4])
s

In [None]:
ds = dask_cudf.from_cudf(s, npartitions=2) 
ds.compute()

Creating a `cudf.DataFrame` and a `dask_cudf.DataFrame` by specifying values for each column.

In [None]:
df = cudf.DataFrame({'a': list(range(20)),
                     'b': list(reversed(range(20))),
                     'c': list(range(20))
                    })
df

In [None]:
ddf = dask_cudf.from_cudf(df, npartitions=2) 
ddf.compute()

Creating a `cudf.DataFrame` from a pandas `Dataframe` and a `dask_cudf.Dataframe` from a `cudf.Dataframe`.

*Note that best practice for using Dask-cuDF is to read data directly into a `dask_cudf.DataFrame` with something like `read_csv` (discussed below).*

In [None]:
pdf = pd.DataFrame({'a': [0, 1, 2, 3],'b': [0.1, 0.2, None, 0.3]})
gdf = cudf.DataFrame.from_pandas(pdf)
gdf

In [None]:
dask_gdf = dask_cudf.from_cudf(gdf, npartitions=2)
dask_gdf.compute()

Viewing Data
-------------

Viewing the top rows of a GPU dataframe.

In [None]:
df.head(2)

In [None]:
ddf.head(2)

Sorting by values.

In [None]:
df.sort_values(by='b')

In [None]:
ddf.sort_values(by='b').compute()

Selection
------------

## Getting

Selecting a single column, which initially yields a `cudf.Series` or `dask_cudf.Series`. Calling `compute` results in a `cudf.Series` (equivalent to `df.a`).

In [None]:
df['a']

In [None]:
ddf['a'].compute()

## Selection by Label

Selecting rows from index 2 to index 5 from columns 'a' and 'b'.

In [None]:
df.loc[2:5, ['a', 'b']]

In [None]:
ddf.loc[2:5, ['a', 'b']].compute()

## Selection by Position

Selecting via integers and integer slices, like numpy/pandas. Note that this functionality is not available for Dask-cuDF DataFrames.

In [None]:
df.iloc[0]

In [None]:
df.iloc[0:3, 0:2]

You can also select elements of a `DataFrame` or `Series` with direct index access.

In [None]:
df[3:5]

In [None]:
s[3:5]

## Boolean Indexing

Selecting rows in a `DataFrame` or `Series` by direct Boolean indexing.

In [None]:
df[df.b > 15]

In [None]:
ddf[ddf.b > 15].compute()

Selecting values from a `DataFrame` where a Boolean condition is met, via the `query` API.

In [None]:
df.query("b == 3")

In [None]:
ddf.query("b == 3").compute()

You can also pass local variables to Dask-cuDF queries, via the `local_dict` keyword. With standard cuDF, you may either use the `local_dict` keyword or directly pass the variable via the `@` keyword. Supported logical operators include `>`, `<`, `>=`, `<=`, `==`, and `!=`.

In [None]:
cudf_comparator = 3
df.query("b == @cudf_comparator")

In [None]:
dask_cudf_comparator = 3
ddf.query("b == @val", local_dict={'val':dask_cudf_comparator}).compute()

Using the `isin` method for filtering.

In [None]:
df[df.a.isin([0, 5])]

## MultiIndex

cuDF supports hierarchical indexing of DataFrames using MultiIndex. Grouping hierarchically (see `Grouping` below) automatically produces a DataFrame with a MultiIndex.

In [None]:
arrays = [['a', 'a', 'b', 'b'], [1, 2, 3, 4]]
tuples = list(zip(*arrays))
idx = cudf.MultiIndex.from_tuples(tuples)
idx

This index can back either axis of a DataFrame.

In [None]:
gdf1 = cudf.DataFrame({'first': cp.random.rand(4), 'second': cp.random.rand(4)})
gdf1.index = idx
gdf1

In [None]:
gdf2 = cudf.DataFrame({'first': cp.random.rand(4), 'second': cp.random.rand(4)}).T
gdf2.columns = idx
gdf2

Accessing values of a DataFrame with a MultiIndex. Note that slicing is not yet supported.

In [None]:
gdf1.loc[('b', 3)]

Missing Data
------------

Missing data can be replaced by using the `fillna` method.

In [None]:
s.fillna(999)

In [None]:
ds.fillna(999).compute()

Operations
------------

## Stats

Calculating descriptive statistics for a `Series`.

In [None]:
s.mean(), s.var()

In [None]:
ds.mean().compute(), ds.var().compute()

## Applymap

Applying functions to a `Series`. Note that applying user defined functions directly with Dask-cuDF is not yet implemented. For now, you can use [map_partitions](http://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.map_partitions.html) to apply a function to each partition of the distributed dataframe.

In [35]:
def add_ten(num):
    return num + 10

df['a'].applymap(add_ten)



0     10
1     11
2     12
3     13
4     14
5     15
6     16
7     17
8     18
9     19
10    20
11    21
12    22
13    23
14    24
15    25
16    26
17    27
18    28
19    29
Name: a, dtype: int64

In [None]:
ddf['a'].map_partitions(add_ten).compute()

## Histogramming

Counting the number of occurrences of each unique value of variable.

In [None]:
df.a.value_counts()

In [None]:
ddf.a.value_counts().compute()

## String Methods

Like pandas, cuDF provides string processing methods in the `str` attribute of `Series`. Full documentation of string methods is a work in progress. Please see the cuDF API documentation for more information.

In [None]:
s = cudf.Series(['A', 'B', 'C', 'Aaba', 'Baca', None, 'CABA', 'dog', 'cat'])
s.str.lower()

In [None]:
ds = dask_cudf.from_cudf(s, npartitions=2)
ds.str.lower().compute()

## Concat

Concatenating `Series` and `DataFrames` row-wise.

In [None]:
s = cudf.Series([1, 2, 3, None, 5])
cudf.concat([s, s])

In [None]:
ds2 = dask_cudf.from_cudf(s, npartitions=2)
dask_cudf.concat([ds2, ds2]).compute()

## Join

Performing SQL style merges. Note that the dataframe order is not maintained, but may be restored post-merge by sorting by the index.

In [None]:
df_a = cudf.DataFrame()
df_a['key'] = ['a', 'b', 'c', 'd', 'e']
df_a['vals_a'] = [float(i + 10) for i in range(5)]

df_b = cudf.DataFrame()
df_b['key'] = ['a', 'c', 'e']
df_b['vals_b'] = [float(i+100) for i in range(3)]

merged = df_a.merge(df_b, on=['key'], how='left')
merged

In [None]:
ddf_a = dask_cudf.from_cudf(df_a, npartitions=2)
ddf_b = dask_cudf.from_cudf(df_b, npartitions=2)

merged = ddf_a.merge(ddf_b, on=['key'], how='left').compute()
merged

## Append

Appending values from another `Series` or array-like object.

In [45]:
s.append(s)



0       1
1       2
2       3
3    <NA>
4       5
0       1
1       2
2       3
3    <NA>
4       5
dtype: int64

In [None]:
ds2.append(ds2).compute()

## Grouping

Like pandas, cuDF and Dask-cuDF support the Split-Apply-Combine groupby paradigm.

In [None]:
df['agg_col1'] = [1 if x % 2 == 0 else 0 for x in range(len(df))]
df['agg_col2'] = [1 if x % 3 == 0 else 0 for x in range(len(df))]

ddf = dask_cudf.from_cudf(df, npartitions=2)

Grouping and then applying the `sum` function to the grouped data.

In [None]:
df.groupby('agg_col1').sum()

In [None]:
ddf.groupby('agg_col1').sum().compute()

Grouping hierarchically then applying the `sum` function to grouped data.

In [None]:
df.groupby(['agg_col1', 'agg_col2']).sum()

In [None]:
ddf.groupby(['agg_col1', 'agg_col2']).sum().compute()

Grouping and applying statistical functions to specific columns, using `agg`.

In [None]:
df.groupby('agg_col1').agg({'a':'max', 'b':'mean', 'c':'sum'})

In [None]:
ddf.groupby('agg_col1').agg({'a':'max', 'b':'mean', 'c':'sum'}).compute()

## Transpose

Transposing a dataframe, using either the `transpose` method or `T` property. Currently, all columns must have the same type. Transposing is not currently implemented in Dask-cuDF.

In [None]:
sample = cudf.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]})
sample

In [None]:
sample.transpose()

Time Series
------------

`DataFrames` supports `datetime` typed columns, which allow users to interact with and filter data based on specific timestamps.

In [None]:
import datetime as dt

date_df = cudf.DataFrame()
date_df['date'] = pd.date_range('11/20/2018', periods=72, freq='D')
date_df['value'] = cp.random.sample(len(date_df))

search_date = dt.datetime.strptime('2018-11-23', '%Y-%m-%d')
date_df.query('date <= @search_date')

In [None]:
date_ddf = dask_cudf.from_cudf(date_df, npartitions=2)
date_ddf.query('date <= @search_date', local_dict={'search_date':search_date}).compute()

Categoricals
------------

`DataFrames` support categorical columns.

In [None]:
gdf = cudf.DataFrame({"id": [1, 2, 3, 4, 5, 6], "grade":['a', 'b', 'b', 'a', 'a', 'e']})
gdf['grade'] = gdf['grade'].astype('category')
gdf

In [None]:
dgdf = dask_cudf.from_cudf(gdf, npartitions=2)
dgdf.compute()

Accessing the categories of a column. Note that this is currently not supported in Dask-cuDF.

In [None]:
gdf.grade.cat.categories

Accessing the underlying code values of each categorical observation.

In [None]:
gdf.grade.cat.codes

In [None]:
dgdf.grade.cat.codes.compute()

Converting Data Representation
--------------------------------

## Pandas

Converting a cuDF and Dask-cuDF `DataFrame` to a pandas `DataFrame`.

In [None]:
df.head().to_pandas()

In [None]:
ddf.compute().head().to_pandas()

## Numpy

Converting a cuDF or Dask-cuDF `DataFrame` to a numpy `ndarray`.

In [None]:
df.to_numpy()

In [None]:
ddf.compute().to_numpy()

Converting a cuDF or Dask-cuDF `Series` to a numpy `ndarray`.

In [None]:
df['a'].to_numpy()

In [None]:
ddf['a'].compute().to_numpy()

## Arrow

Converting a cuDF or Dask-cuDF `DataFrame` to a PyArrow `Table`.

In [None]:
df.to_arrow()

In [None]:
ddf.compute().to_arrow()

Getting Data In/Out
------------------------

## CSV

Writing to a CSV file.

In [None]:
if not os.path.exists('example_output'):
    os.mkdir('example_output')
    
df.to_csv('example_output/foo.csv', index=False)

In [None]:
ddf.compute().to_csv('example_output/foo_dask.csv', index=False)

Reading from a csv file.

In [None]:
df = cudf.read_csv('example_output/foo.csv')
df

In [None]:
ddf = dask_cudf.read_csv('example_output/foo_dask.csv')
ddf.compute()

Reading all CSV files in a directory into a single `dask_cudf.DataFrame`, using the star wildcard.

In [None]:
ddf = dask_cudf.read_csv('example_output/*.csv')
ddf.compute()

## Parquet

Writing to parquet files, using the CPU via PyArrow.

In [None]:
df.to_parquet('example_output/temp_parquet')

Reading parquet files with a GPU-accelerated parquet reader.

In [None]:
df = cudf.read_parquet('example_output/temp_parquet')
df

Writing to parquet files from a `dask_cudf.DataFrame` using PyArrow under the hood.

In [None]:
ddf.to_parquet('example_files')  

## ORC

Reading ORC files.

In [80]:
import os
from pathlib import Path
current_dir = os.path.dirname(os.path.realpath("__file__"))
cudf_root = Path(current_dir).parents[3]
file_path = os.path.join(cudf_root, "python", "cudf", "cudf", "tests", "data", "orc", "TestOrcFile.test1.orc")
file_path

'/home/mmccarty/sandbox/rapids/cudf/python/cudf/cudf/tests/data/orc/TestOrcFile.test1.orc'

In [81]:
df2 = cudf.read_orc(file_path)
df2

Unnamed: 0,boolean1,byte1,short1,int1,long1,float1,double1,bytes1,string1,middle,list,map
0,False,1,1024,65536,9223372036854775807,1.0,-15.0,�,hi,"{'list': [{'int1': 1, 'string1': 'bye'}, {'int...","[{'int1': 3, 'string1': 'good'}, {'int1': 4, '...",[]
1,True,100,2048,65536,9223372036854775807,2.0,-5.0,,bye,"{'list': [{'int1': 1, 'string1': 'bye'}, {'int...","[{'int1': 100000000, 'string1': 'cat'}, {'int1...","[{'key': 'chani', 'value': {'int1': 5, 'string..."


Dask Performance Tips
--------------------------------

Like Apache Spark, Dask operations are [lazy](https://en.wikipedia.org/wiki/Lazy_evaluation). Instead of being executed at that moment, most operations are added to a task graph and the actual evaluation is delayed until the result is needed.

Sometimes, though, we want to force the execution of operations. Calling `persist` on a Dask collection fully computes it (or actively computes it in the background), persisting the result into memory. When we're using distributed systems, we may want to wait until `persist` is finished before beginning any downstream operations. We can enforce this contract by using `wait`. Wrapping an operation with `wait` will ensure it doesn't begin executing until all necessary upstream operations have finished.

The snippets below provide basic examples, using `LocalCUDACluster` to create one dask-worker per GPU on the local machine. For more detailed information about `persist` and `wait`, please see the Dask documentation for [persist](https://docs.dask.org/en/latest/api.html#dask.persist) and [wait](https://docs.dask.org/en/latest/futures.html#distributed.wait). Wait relies on the concept of Futures, which is beyond the scope of this tutorial. For more information on Futures, see the Dask [Futures](https://docs.dask.org/en/latest/futures.html) documentation. For more information about multi-GPU clusters, please see the [dask-cuda](https://github.com/rapidsai/dask-cuda) library (documentation is in progress).

First, we set up a GPU cluster. With our `client` set up, Dask-cuDF computation will be distributed across the GPUs in the cluster.

In [82]:
import time

from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster

cluster = LocalCUDACluster()
client = Client(cluster)
client

2022-04-21 10:11:07,360 - distributed.diskutils - INFO - Found stale lock file and directory '/home/mmccarty/sandbox/rapids/cudf/docs/cudf/source/user_guide/dask-worker-space/worker-ghcx5g0e', purging
2022-04-21 10:11:07,360 - distributed.diskutils - INFO - Found stale lock file and directory '/home/mmccarty/sandbox/rapids/cudf/docs/cudf/source/user_guide/dask-worker-space/worker-wh16f0h3', purging
2022-04-21 10:11:07,360 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
2022-04-21 10:11:07,388 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize


0,1
Connection method: Cluster object,Cluster type: dask_cuda.LocalCUDACluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 2
Total threads: 2,Total memory: 125.65 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:39755,Workers: 2
Dashboard: http://127.0.0.1:8787/status,Total threads: 2
Started: Just now,Total memory: 125.65 GiB

0,1
Comm: tcp://127.0.0.1:33491,Total threads: 1
Dashboard: http://127.0.0.1:34333/status,Memory: 62.82 GiB
Nanny: tcp://127.0.0.1:43093,
Local directory: /home/mmccarty/sandbox/rapids/cudf/docs/cudf/source/user_guide/dask-worker-space/worker-jsuvfju4,Local directory: /home/mmccarty/sandbox/rapids/cudf/docs/cudf/source/user_guide/dask-worker-space/worker-jsuvfju4
GPU: NVIDIA RTX A6000,GPU memory: 47.51 GiB

0,1
Comm: tcp://127.0.0.1:44033,Total threads: 1
Dashboard: http://127.0.0.1:45225/status,Memory: 62.82 GiB
Nanny: tcp://127.0.0.1:46529,
Local directory: /home/mmccarty/sandbox/rapids/cudf/docs/cudf/source/user_guide/dask-worker-space/worker-zlsacw8_,Local directory: /home/mmccarty/sandbox/rapids/cudf/docs/cudf/source/user_guide/dask-worker-space/worker-zlsacw8_
GPU: NVIDIA RTX A6000,GPU memory: 47.54 GiB


### Persisting Data
Next, we create our Dask-cuDF DataFrame and apply a transformation, storing the result as a new column.

In [83]:
nrows = 10000000

df2 = cudf.DataFrame({'a': cp.arange(nrows), 'b': cp.arange(nrows)})
ddf2 = dask_cudf.from_cudf(df2, npartitions=5)
ddf2['c'] = ddf2['a'] + 5
ddf2

Unnamed: 0_level_0,a,b,c
npartitions=5,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,int64,int64,int64
2000000,...,...,...
...,...,...,...
8000000,...,...,...
9999999,...,...,...


In [84]:
!nvidia-smi

Thu Apr 21 10:11:07 2022       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 495.29.05    Driver Version: 495.29.05    CUDA Version: 11.5     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  NVIDIA RTX A6000    On   | 00000000:01:00.0  On |                  Off |
| 30%   48C    P2    83W / 300W |   2970MiB / 48651MiB |      7%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
|   1  NVIDIA RTX A6000    On   | 00000000:02:00.0 Off |                  Off |
| 30%   36C    P2    25W / 300W |    265MiB / 48685MiB |      5%      Default |
|       

Because Dask is lazy, the computation has not yet occurred. We can see that there are twenty tasks in the task graph and we've used about 800 MB of memory. We can force computation by using `persist`. By forcing execution, the result is now explicitly in memory and our task graph only contains one task per partition (the baseline).

In [85]:
ddf2 = ddf2.persist()
ddf2

Unnamed: 0_level_0,a,b,c
npartitions=5,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,int64,int64,int64
2000000,...,...,...
...,...,...,...
8000000,...,...,...
9999999,...,...,...


In [86]:
!nvidia-smi

Thu Apr 21 10:11:08 2022       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 495.29.05    Driver Version: 495.29.05    CUDA Version: 11.5     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  NVIDIA RTX A6000    On   | 00000000:01:00.0  On |                  Off |
| 30%   48C    P2    84W / 300W |   2970MiB / 48651MiB |      3%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
|   1  NVIDIA RTX A6000    On   | 00000000:02:00.0 Off |                  Off |
| 30%   36C    P2    37W / 300W |    265MiB / 48685MiB |      0%      Default |
|       

Because we forced computation, we now have a larger object in distributed GPU memory.

### Wait
Depending on our workflow or distributed computing setup, we may want to `wait` until all upstream tasks have finished before proceeding with a specific function. This section shows an example of this behavior, adapted from the Dask documentation.

First, we create a new Dask DataFrame and define a function that we'll map to every partition in the dataframe.

In [87]:
import random

nrows = 10000000

df1 = cudf.DataFrame({'a': cp.arange(nrows), 'b': cp.arange(nrows)})
ddf1 = dask_cudf.from_cudf(df1, npartitions=100)

def func(df):
    time.sleep(random.randint(1, 60))
    return (df + 5) * 3 - 11

This function will do a basic transformation of every column in the dataframe, but the time spent in the function will vary due to the `time.sleep` statement randomly adding 1-60 seconds of time. We'll run this on every partition of our dataframe using `map_partitions`, which adds the task to our task-graph, and store the result. We can then call `persist` to force execution.

In [88]:
results_ddf = ddf2.map_partitions(func)
results_ddf = results_ddf.persist()

However, some partitions will be done **much** sooner than others. If we had downstream processes that should wait for all partitions to be completed, we can enforce that behavior using `wait`.

In [89]:
wait(results_ddf)

DoneAndNotDoneFutures(done={<Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-cec36d97aab9d38423f8023d1b43b6d3', 0)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-cec36d97aab9d38423f8023d1b43b6d3', 2)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-cec36d97aab9d38423f8023d1b43b6d3', 3)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-cec36d97aab9d38423f8023d1b43b6d3', 1)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-cec36d97aab9d38423f8023d1b43b6d3', 4)>}, not_done=set())

## With `wait`, we can safely proceed on in our workflow.