10 Minutes to cuDF and Dask-cuDF
=======================

Modeled after 10 Minutes to Pandas, this is a short introduction to cuDF and Dask-cuDF, geared mainly for new users.

### What are these Libraries?

[cuDF](https://github.com/rapidsai/cudf) is a Python GPU DataFrame library (built on the Apache Arrow columnar memory format) for loading, joining, aggregating, filtering, and otherwise manipulating data.

[Dask](https://dask.org/) is a flexible library for parallel computing in Python that makes scaling out your workflow smooth and simple.

[Dask-cuDF](https://github.com/rapidsai/dask-cudf) is a library that provides a partitioned, GPU-backed dataframe, using Dask.


### When to use cuDF and Dask-cuDF

If your workflow is fast enough on a single GPU or your data comfortably fits in memory on a single GPU, you would want to use cuDF. If you want to distribute your workflow across multiple GPUs, have more data than you can fit in memory on a single GPU, or want to analyze data spread across many files at once, you would want to use Dask-cuDF.

In [1]:
import os

import numpy as np
import pandas as pd
import cudf
import dask_cudf

np.random.seed(12)

#### Portions of this were borrowed and adapted from the
#### cuDF cheatsheet, existing cuDF documentation,
#### and 10 Minutes to Pandas.

Object Creation
---------------

Creating a `cudf.Series` and `dask_cudf.Series`.

In [2]:
s = cudf.Series([1,2,3,None,4])
print(s)

0    1
1    2
2    3
3     
4    4
dtype: int64


In [3]:
ds = dask_cudf.from_cudf(s, npartitions=2) 
print(ds.compute())

0    1
1    2
2    3
3     
4    4
dtype: int64


Creating a `cudf.DataFrame` and a `dask_cudf.DataFrame` by specifying values for each column.

In [4]:
df = pd.DataFrame([('a', list(range(20))),('b', list(reversed(range(20)))),('c', list(range(20)))])
df

Unnamed: 0,0,1
0,a,"[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13,..."
1,b,"[19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8,..."
2,c,"[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13,..."


pandas initialize in a different way

In [5]:
df = cudf.DataFrame([('a', list(range(20))),('b', list(reversed(range(20)))),('c', list(range(20)))])
print(df)

   a   b  c
0  0  19  0
1  1  18  1
2  2  17  2
3  3  16  3
4  4  15  4
5  5  14  5
6  6  13  6
7  7  12  7
8  8  11  8
9  9  10  9
[10 more rows]


In [7]:
ddf = dask_cudf.from_cudf(df, npartitions=5) 
print(ddf.compute())

   a   b  c
0  0  19  0
1  1  18  1
2  2  17  2
3  3  16  3
4  4  15  4
5  5  14  5
6  6  13  6
7  7  12  7
8  8  11  8
9  9  10  9
[10 more rows]


Creating a `cudf.DataFrame` from a pandas `Dataframe` and a `dask_cudf.Dataframe` from a `cudf.Dataframe`.

*Note that best practice for using Dask-cuDF is to read data directly into a `dask_cudf.DataFrame` with something like `read_csv` (discussed below).*

In [8]:
pdf = pd.DataFrame({'a': [0, 1, 2, 3],'b': [0.1, 0.2, None, 0.3]})
gdf = cudf.DataFrame.from_pandas(pdf)
print(gdf)

   a    b
0  0  0.1
1  1  0.2
2  2     
3  3  0.3


In [9]:
dask_df = dask_cudf.from_cudf(pdf, npartitions=2)
dask_gdf = dask_cudf.from_dask_dataframe(dask_df)
print(dask_gdf.compute())

   a    b
0  0  0.1
1  1  0.2
2  2     
3  3  0.3


Viewing Data
-------------

Viewing the top rows of a GPU dataframe.

In [10]:
print(df.head(2))

   a   b  c
0  0  19  0
1  1  18  1


In [16]:
print(ddf.head(2), type(ddf), type(ddf.compute()))

   a   b  c
0  0  19  0
1  1  18  1 <class 'dask_cudf.core.DataFrame'> <class 'cudf.dataframe.dataframe.DataFrame'>


Sorting by values.

In [13]:
print(df.sort_values(by='b'))

    a  b   c
19  19  0  19
18  18  1  18
17  17  2  17
16  16  3  16
15  15  4  15
14  14  5  14
13  13  6  13
12  12  7  12
11  11  8  11
10  10  9  10
[10 more rows]


In [14]:
print(ddf.sort_values(by='b').compute())

    a  b   c
0  19  0  19
1  18  1  18
2  17  2  17
3  16  3  16
4  15  4  15
5  14  5  14
6  13  6  13
7  12  7  12
8  11  8  11
9  10  9  10
[10 more rows]


Selection
------------

## Getting

Selecting a single column, which initially yields a `cudf.Series` or `dask_cudf.Series`. Calling `compute` results in a `cudf.Series` (equivalent to `df.a`).

In [15]:
print(df['a'])

0    0
1    1
2    2
3    3
4    4
5    5
6    6
7    7
8    8
9    9
[10 more rows]
Name: a, dtype: int64


In [16]:
print(ddf['a'].compute())

0    0
1    1
2    2
3    3
4    4
5    5
6    6
7    7
8    8
9    9
[10 more rows]
Name: a, dtype: int64


## Selection by Label

Selecting rows from index 2 to index 5 from columns 'a' and 'b'.

In [17]:
print(df.loc[2:5, ['a', 'b']])

   a   b
2  2  17
3  3  16
4  4  15
5  5  14


In [18]:
print(ddf.loc[2:5, ['a', 'b']].compute())

   a   b
2  2  17
3  3  16
4  4  15
5  5  14


## Selection by Position

Selecting via integers and integer slices, like numpy/pandas. Note that this functionality is not available for Dask-cuDF DataFrames.

In [17]:
print(df.iloc[0])

a     0
b    19
c     0
Name: 0, dtype: int64


In [19]:
print(df.iloc[0:3, 0:3])

   a   b  c
0  0  19  0
1  1  18  1
2  2  17  2


You can also select elements of a `DataFrame` or `Series` with direct index access.

In [20]:
print(df[3:5])

   a   b  c
3  3  16  3
4  4  15  4


In [21]:
print(s[3:5])

3     
4    4
dtype: int64


## Boolean Indexing

Selecting rows in a `DataFrame` or `Series` by direct Boolean indexing.

In [22]:
print(df[df.b > 15])

   a   b  c
0  0  19  0
1  1  18  1
2  2  17  2
3  3  16  3


In [23]:
print(ddf[ddf.b > 15].compute())

   a   b  c
0  0  19  0
1  1  18  1
2  2  17  2
3  3  16  3


Selecting values from a `DataFrame` where a Boolean condition is met, via the `query` API.

In [24]:
df.query??

[0;31mSignature:[0m [0mdf[0m[0;34m.[0m[0mquery[0m[0;34m([0m[0mexpr[0m[0;34m,[0m [0mlocal_dict[0m[0;34m=[0m[0;34m{[0m[0;34m}[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mSource:[0m   
    [0;32mdef[0m [0mquery[0m[0;34m([0m[0mself[0m[0;34m,[0m [0mexpr[0m[0;34m,[0m [0mlocal_dict[0m[0;34m=[0m[0;34m{[0m[0;34m}[0m[0;34m)[0m[0;34m:[0m[0;34m[0m
[0;34m[0m        [0;34m"""[0m
[0;34m        Query with a boolean expression using Numba to compile a GPU kernel.[0m
[0;34m[0m
[0;34m        See pandas.DataFrame.query.[0m
[0;34m[0m
[0;34m        Parameters[0m
[0;34m        ----------[0m
[0;34m[0m
[0;34m        expr : str[0m
[0;34m            A boolean expression. Names in expression refer to columns.[0m
[0;34m[0m
[0;34m            Names starting with `@` refer to Python variables[0m
[0;34m[0m
[0;34m        local_dict : dict[0m
[0;34m            Containing the local variable to be used in query.[0m
[0;34m[0m
[0;34m  

In [21]:
print(df.query("b == 3"))  

    a  b   c
16  16  3  16


In [25]:
print(ddf.query("b == 3").compute())  

    a  b   c
16  16  3  16


You can also pass local variables to Dask-cuDF queries, via the `local_dict` keyword. With standard cuDF, you may either use the `local_dict` keyword or directly pass the variable via the `@` keyword.

In [26]:
cudf_comparator = 3
print(df.query("b == @cudf_comparator"))

    a  b   c
16  16  3  16


In [24]:
dask_cudf_comparator = 3
print(ddf.query("b == @val", local_dict={'val':dask_cudf_comparator}).compute())  

    a  b   c
16  16  3  16


Supported logical operators include `>`, `<`, `>=`, `<=`, `==`, and `!=`.

## MultiIndex

cuDF supports hierarchical indexing of DataFrames using MultiIndex. Grouping hierarchically (see `Grouping` below) automatically produces a DataFrame with a MultiIndex.

In [30]:
zip??

[0;31mInit signature:[0m [0mzip[0m[0;34m([0m[0mself[0m[0;34m,[0m [0;34m/[0m[0;34m,[0m [0;34m*[0m[0margs[0m[0;34m,[0m [0;34m**[0m[0mkwargs[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m     
zip(iter1 [,iter2 [...]]) --> zip object

Return a zip object whose .__next__() method returns a tuple where
the i-th element comes from the i-th iterable argument.  The .__next__()
method continues until the shortest iterable in the argument sequence
is exhausted and then it raises StopIteration.
[0;31mType:[0m           type
[0;31mSubclasses:[0m     


In [32]:
list(zip([1,2,3], [4,5,6], [7,8,9]))

[(1, 4, 7), (2, 5, 8), (3, 6, 9)]

In [33]:
arrays = [['a', 'a', 'b', 'b'], [1, 2, 3, 4]]
tuples = list(zip(*arrays))
idx = cudf.MultiIndex.from_tuples(tuples)
idx

MultiIndex(levels=[array(['a', 'b'], dtype=object) array([1, 2, 3, 4])],
codes=   0  1
0  0  0
1  0  1
2  1  2
3  1  3)

This index can back either axis of a DataFrame.

In [35]:
gdf1 = cudf.DataFrame({'first': np.random.rand(4), 'second': np.random.rand(4)})
gdf1.index = idx
print(gdf1.to_pandas())

        first    second
a 1  0.956949  0.944225
  2  0.137209  0.852736
b 3  0.283828  0.002259
  4  0.606083  0.521226


In [36]:
gdf2 = cudf.DataFrame({'first': np.random.rand(4), 'second': np.random.rand(4)}).T
gdf2.columns = idx
print(gdf2.to_pandas())

               a                   b          
               1         2         3         4
first   0.552038  0.485377  0.768134  0.160717
second  0.764560  0.020810  0.135210  0.116273


Accessing values of a DataFrame with a MultiIndex. Note that slicing is not yet supported.

In [37]:
print(gdf1.loc[('b', 3)].to_pandas())

        first    second
b 3  0.283828  0.002259


Missing Data
------------

Missing data can be replaced by using the `fillna` method.

In [26]:
print(s.fillna(999))

0      1
1      2
2      3
3    999
4      4
dtype: int64


In [31]:
print(ds.fillna(999).compute(), type(ds.fillna(999).compute()))

0      1
1      2
2      3
3    999
4      4
dtype: int64 <class 'cudf.dataframe.series.Series'>


Operations
------------

## Stats

Calculating descriptive statistics for a `Series`.

In [32]:
print(s.mean(), s.var())

2.5 1.666666666666666


In [38]:
print(ds.mean().compute(), ds.var().compute())

2.5 1.6666666666666667


## Applymap

Applying functions to a `Series`. Note that applying user defined functions directly with Dask-cuDF is not yet implemented. For now, you can use [map_partitions](http://docs.dask.org/en/stable/dataframe-api.html#dask.dataframe.DataFrame.map_partitions) to apply a function to each partition of the distributed dataframe.

In [39]:
def add_ten(num):
    return num + 10

print(df['a'].applymap(add_ten))

0    10
1    11
2    12
3    13
4    14
5    15
6    16
7    17
8    18
9    19
[10 more rows]
Name: a, dtype: int64


In [40]:
print(ddf['a'].map_partitions(add_ten).compute())

0    10
1    11
2    12
3    13
4    14
5    15
6    16
7    17
8    18
9    19
[10 more rows]
dtype: int64


## Histogramming

Counting the number of occurrences of each unique value of variable.

In [36]:
print(df.a.value_counts())

0    1
1    1
2    1
3    1
4    1
5    1
6    1
7    1
8    1
9    1
[10 more rows]
dtype: int64


In [37]:
print(ddf.a.value_counts().compute())

0    1
1    1
2    1
3    1
4    1
5    1
6    1
7    1
8    1
9    1
[10 more rows]
dtype: int64


## String Methods

Like pandas, cuDF provides string processing methods in the `str` attribute of `Series`. Full documentation of string methods is a work in progress. Please see the cuDF API documentation for more information.

In [41]:
s = cudf.Series(['A', 'B', 'C', 'Aaba', 'Baca', None, 'CABA', 'dog', 'cat'])
print(s.str.lower())

0       a
1       b
2       c
3    aaba
4    baca
5    None
6    caba
7     dog
8     cat
dtype: object


In [43]:
ds = dask_cudf.from_cudf(s, npartitions=2)
print(ds.str.lower().compute())

AttributeError: lower

## Concat

Concatenating `Series` and `DataFrames` row-wise.

In [44]:
s = cudf.Series([1, 2, 3, None, 5])
print(cudf.concat([s, s]))

0    1
1    2
2    3
3     
4    5
0    1
1    2
2    3
3     
4    5
dtype: int64


In [45]:
ds2 = dask_cudf.from_cudf(s, npartitions=2)
print(dask_cudf.concat([ds2, ds2]).compute())

0    1
1    2
2    3
3     
4    5
0    1
1    2
2    3
3     
4    5
dtype: int64


## Join

Performing SQL style merges. Note that the dataframe order is not maintained, but may be restored post-merge by sorting by the index.

In [49]:
df_a = cudf.DataFrame()
df_a['key'] = ['a', 'b', 'c', 'd', 'e']
df_a['vals_a'] = [float(i + 10) for i in range(5)]

df_b = cudf.DataFrame()
df_b['key'] = ['a', 'c', 'e']
df_b['vals_b'] = [float(i+100) for i in range(3)]

merged = df_a.merge(df_b, on=['key'], how='left')
print(merged)

   key  vals_a  vals_b
0    a    10.0   100.0
1    c    12.0   101.0
2    e    14.0   102.0
3    b    11.0        
4    d    13.0        


In [47]:
ddf_a = dask_cudf.from_cudf(df_a, npartitions=2)
ddf_b = dask_cudf.from_cudf(df_b, npartitions=3)

merged = ddf_a.merge(ddf_b, on=['key'], how='left').compute()
print(merged)

   key  vals_a  vals_b
0    a    12.0        
0    c    11.0   101.0
1    e    14.0   102.0
2    b    10.0        
3    d    13.0        


## Append

Appending values from another `Series` or array-like object.

In [None]:
print(s.append(s))

In [None]:
print(ds2.append(ds2).compute())

## Grouping

Like pandas, cuDF and Dask-cuDF support the Split-Apply-Combine groupby paradigm.

In [50]:
df['agg_col1'] = [1 if x % 2 == 0 else 0 for x in range(len(df))]
df['agg_col2'] = [1 if x % 3 == 0 else 0 for x in range(len(df))]

ddf = dask_cudf.from_cudf(df, npartitions=2)

Grouping and then applying the `sum` function to the grouped data.

In [51]:
print(df.groupby('agg_col1').sum())

     a    b    c  agg_col2
agg_col1
0  100   90  100         3
1   90  100   90         4


In [52]:
print(ddf.groupby('agg_col1').sum().compute())

     a    b    c  agg_col2
0  100   90  100         3
1   90  100   90         4


Grouping hierarchically then applying the `sum` function to grouped data. We send the result to a pandas dataframe only for printing purposes.

In [None]:
print(df.groupby(['agg_col1', 'agg_col2']).sum().to_pandas())

In [None]:
ddf.groupby(['agg_col1', 'agg_col2']).sum().compute().to_pandas()

Grouping and applying statistical functions to specific columns, using `agg`.

In [None]:
print(df.groupby('agg_col1').agg({'a':'max', 'b':'mean', 'c':'sum'}))

In [None]:
print(ddf.groupby('agg_col1').agg({'a':'max', 'b':'mean', 'c':'sum'}).compute())

## Transpose

Transposing a dataframe, using either the `transpose` method or `T` property. Currently, all columns must have the same type. Transposing is not currently implemented in Dask-cuDF.

In [53]:
sample = cudf.DataFrame({'a':[1,2,3], 'b':[4,5,6]})
print(sample)

   a  b
0  1  4
1  2  5
2  3  6


In [54]:
print(sample.transpose())

   0  1  2
a  1  2  3
b  4  5  6


In [55]:
sample.transpose().index

StringIndex(['a' 'b'], dtype='object', name=None)

Time Series
------------


`DataFrames` supports `datetime` typed columns, which allow users to interact with and filter data based on specific timestamps.

In [56]:
import datetime as dt

date_df = cudf.DataFrame()
date_df['date'] = pd.date_range('11/20/2018', periods=72, freq='D')
date_df['value'] = np.random.sample(len(date_df))

search_date = dt.datetime.strptime('2018-11-23', '%Y-%m-%d')
print(date_df.query('date <= @search_date'))

                     date                value
0 2018-11-20T00:00:00.000  0.30989758449002924
1 2018-11-21T00:00:00.000   0.6714526452120027
2 2018-11-22T00:00:00.000   0.4712297782500141
3 2018-11-23T00:00:00.000   0.8161682980460269


In [None]:
date_ddf = dask_cudf.from_cudf(date_df, npartitions=2)
print(date_ddf.query('date <= @search_date', local_dict={'search_date':search_date}).compute())

Categoricals
------------

`DataFrames` support categorical columns.

In [57]:
pdf = pd.DataFrame({"id":[1,2,3,4,5,6], "grade":['a', 'b', 'b', 'a', 'a', 'e']})
pdf["grade"] = pdf["grade"].astype("category")

gdf = cudf.DataFrame.from_pandas(pdf)
print(gdf)

   id  grade
0   1      a
1   2      b
2   3      b
3   4      a
4   5      a
5   6      e


In [58]:
dgdf = dask_cudf.from_cudf(gdf, npartitions=2)
print(dgdf.compute())

   id  grade
0   1      a
1   2      b
2   3      b
3   4      a
4   5      a
5   6      e


Accessing the categories of a column. Note that this is currently not supported in Dask-cuDF.

In [59]:
gdf.grade.cat.categories

('a', 'b', 'e')

Accessing the underlying code values of each categorical observation.

In [60]:
print(gdf.grade.cat.codes)

0    0
1    1
2    1
3    0
4    0
5    2
dtype: int8


In [61]:
print(dgdf.grade.cat.codes.compute())

0    0
1    1
2    1
0    0
1    0
2    2
dtype: int8


Converting Data Representation
--------------------------------

## Pandas

Converting a cuDF and Dask-cuDF `DataFrame` to a pandas `DataFrame`.

In [62]:
print(df.head().to_pandas())

   a   b  c  agg_col1  agg_col2
0  0  19  0         1         1
1  1  18  1         0         0
2  2  17  2         1         0
3  3  16  3         0         1
4  4  15  4         1         0


In [63]:
print(ddf.compute().head().to_pandas())

   a   b  c  agg_col1  agg_col2
0  0  19  0         1         1
1  1  18  1         0         0
2  2  17  2         1         0
3  3  16  3         0         1
4  4  15  4         1         0


## Numpy

Converting a cuDF or Dask-cuDF `DataFrame` to a numpy `ndarray`.

In [40]:
print(df.as_matrix())

[[ 0 19  0]
 [ 1 18  1]
 [ 2 17  2]
 [ 3 16  3]
 [ 4 15  4]
 [ 5 14  5]
 [ 6 13  6]
 [ 7 12  7]
 [ 8 11  8]
 [ 9 10  9]
 [10  9 10]
 [11  8 11]
 [12  7 12]
 [13  6 13]
 [14  5 14]
 [15  4 15]
 [16  3 16]
 [17  2 17]
 [18  1 18]
 [19  0 19]]


In [41]:
print(ddf.compute().as_matrix())

[[ 0 19  0]
 [ 1 18  1]
 [ 2 17  2]
 [ 3 16  3]
 [ 4 15  4]
 [ 5 14  5]
 [ 6 13  6]
 [ 7 12  7]
 [ 8 11  8]
 [ 9 10  9]
 [10  9 10]
 [11  8 11]
 [12  7 12]
 [13  6 13]
 [14  5 14]
 [15  4 15]
 [16  3 16]
 [17  2 17]
 [18  1 18]
 [19  0 19]]


Converting a cuDF or Dask-cuDF `Series` to a numpy `ndarray`.

In [42]:
print(df['a'].to_array())

[ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19]


In [43]:
print(ddf['a'].compute().to_array())

[ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19]


## Arrow

Converting a cuDF or Dask-cuDF `DataFrame` to a PyArrow `Table`.

In [44]:
print(df.to_arrow())

pyarrow.Table
a: int64
b: int64
c: int64
__index_level_0__: int64
metadata
--------
OrderedDict([(b'pandas',
              b'{"index_columns": ["__index_level_0__"], "column_indexes": ['
              b'{"name": null, "field_name": null, "pandas_type": "unicode",'
              b' "numpy_type": "object", "metadata": {"encoding": "UTF-8"}}]'
              b', "columns": [{"name": "a", "field_name": "a", "pandas_type"'
              b': "int64", "numpy_type": "int64", "metadata": null}, {"name"'
              b': "b", "field_name": "b", "pandas_type": "int64", "numpy_typ'
              b'e": "int64", "metadata": null}, {"name": "c", "field_name": '
              b'"c", "pandas_type": "int64", "numpy_type": "int64", "metadat'
              b'a": null}, {"name": null, "field_name": "__index_level_0__",'
              b' "pandas_type": "int64", "numpy_type": "int64", "metadata": '
              b'null}], "pandas_version": "0.23.4"}')])


In [None]:
print(ddf.compute().to_arrow())

Getting Data In/Out
------------------------


## CSV

Writing to a CSV file, by first sending data to a pandas `Dataframe` on the host.

In [None]:
if not os.path.exists('example_output'):
    os.mkdir('example_output')
    
df.to_pandas().to_csv('example_output/foo.csv', index=False)

In [None]:
ddf.compute().to_pandas().to_csv('example_output/foo_dask.csv', index=False)

Reading from a csv file.

In [None]:
df = cudf.read_csv('example_output/foo.csv')
print(df)

In [None]:
ddf = dask_cudf.read_csv('example_output/foo_dask.csv')
print(ddf.compute())

Reading all CSV files in a directory into a single `dask_cudf.DataFrame`, using the star wildcard.

In [None]:
ddf = dask_cudf.read_csv('example_output/*.csv')
print(ddf.compute())

## Parquet

Writing to parquet files, using the CPU via PyArrow.

In [None]:
df.to_parquet('example_output/temp_parquet')

Reading parquet files with a GPU-accelerated parquet reader.

In [None]:
df = cudf.read_parquet('example_output/temp_parquet/72706b163a0d4feb949005d22146ad83.parquet')
print(df.to_pandas())

Writing to parquet files from a `dask_cudf.DataFrame` using PyArrow under the hood.

In [None]:
ddf.to_parquet('example_files')  

## ORC

Reading ORC files.

In [None]:
df2 = cudf.read_orc('/cudf/python/cudf/tests/data/orc/TestOrcFile.test1.orc')
df2.to_pandas()