In [1]:
import os

import numpy as np
import pandas as pd
import cudf
import dask_cudf

np.random.seed(42)

In [2]:
s = cudf.Series([1,2,3,None,4])
s

0       1
1       2
2       3
3    null
4       4
dtype: int64

In [3]:
ds = dask_cudf.from_cudf(s, npartitions=2)
ds

<dask_cudf.Series | 2 tasks | 2 npartitions>

In [4]:
ds.compute()

0       1
1       2
2       3
3    null
4       4
dtype: int64

In [6]:
df = cudf.DataFrame({
    "a": list(range(20)),
    "b": list(reversed(range(20))),
    "c": list(range(20))
})
df.head()

Unnamed: 0,a,b,c
0,0,19,0
1,1,18,1
2,2,17,2
3,3,16,3
4,4,15,4


In [7]:
ddf = dask_cudf.from_cudf(df, npartitions=2)
ddf.compute().head()

Unnamed: 0,a,b,c
0,0,19,0
1,1,18,1
2,2,17,2
3,3,16,3
4,4,15,4


In [8]:
pdf = pd.DataFrame({
    "a": [0,1,2,3],
    "b": [.1,.2,None,.3]
})
pdf.head()

Unnamed: 0,a,b
0,0,0.1
1,1,0.2
2,2,
3,3,0.3


In [9]:
gdf = cudf.from_pandas(pdf)
gdf

Unnamed: 0,a,b
0,0,0.1
1,1,0.2
2,2,
3,3,0.3


In [11]:
dask_gdf = dask_cudf.from_cudf(gdf, npartitions=2)
dask_gdf.compute()

Unnamed: 0,a,b
0,0,0.1
1,1,0.2
2,2,
3,3,0.3


In [13]:
df.head(2)

Unnamed: 0,a,b,c
0,0,19,0
1,1,18,1


In [14]:
ddf.head(2)

Unnamed: 0,a,b,c
0,0,19,0
1,1,18,1


In [18]:
df.sort_values("b").head()

Unnamed: 0,a,b,c
19,19,0,19
18,18,1,18
17,17,2,17
16,16,3,16
15,15,4,15


In [19]:
ddf.sort_values("b").compute().head()

Unnamed: 0,a,b,c
19,19,0,19
18,18,1,18
17,17,2,17
16,16,3,16
15,15,4,15


In [20]:
df["a"].head()

0    0
1    1
2    2
3    3
4    4
Name: a, dtype: int64

In [23]:
ddf["a"].compute().head()

0    0
1    1
2    2
3    3
4    4
Name: a, dtype: int64

In [24]:
df.loc[2:5, ["b", "c"]]

Unnamed: 0,b,c
2,17,2
3,16,3
4,15,4
5,14,5


In [26]:
ddf.loc[2:5, ["b", "c"]].compute()

Unnamed: 0,b,c
2,17,2
3,16,3
4,15,4
5,14,5


In [30]:
df.iloc[0]

a     0
b    19
c     0
Name: 0, dtype: int64

In [31]:
df.iloc[0:2, 0:3]

Unnamed: 0,a,b,c
0,0,19,0
1,1,18,1


In [39]:
ddf.loc[0:2, ["b", "c"]].compute()

Unnamed: 0,b,c
0,19,0
1,18,1
2,17,2


In [40]:
df[3:5]

Unnamed: 0,a,b,c
3,3,16,3
4,4,15,4


In [42]:
s[3:5]

3    null
4       4
dtype: int64

In [43]:
ddf[3:5]

NotImplementedError: 'DataFrame.iloc' only supports selecting columns. It must be used like 'df.iloc[:, column_indexer]'.

In [44]:
df[df["b"] > 17]

Unnamed: 0,a,b,c
0,0,19,0
1,1,18,1


In [46]:
ddf[ddf["b"] > 17].compute()

Unnamed: 0,a,b,c
0,0,19,0
1,1,18,1


In [48]:
df.query("b == 3")

Unnamed: 0,a,b,c
16,16,3,16


In [50]:
ddf.query("b == 3").compute()

Unnamed: 0,a,b,c
16,16,3,16


In [51]:
cudf_comparator = 3
df.query("b == @cudf_comparator")

Unnamed: 0,a,b,c
16,16,3,16


In [52]:
cudf_comparator = 3
df.query("b == @value", local_dict={"value": cudf_comparator})

Unnamed: 0,a,b,c
16,16,3,16


In [54]:
dask_cudf_comparator = 3
ddf.query("b == @value", local_dict={"value": dask_cudf_comparator}).compute()

Unnamed: 0,a,b,c
16,16,3,16


In [55]:
df[df["a"].isin([1,2,3])]

Unnamed: 0,a,b,c
1,1,18,1
2,2,17,2
3,3,16,3


## MultiIndex

In [57]:
arrays = [
    ["a", "a", "b", "b"],
    [1, 2, 3, 4]
]
tuples = list(zip(*arrays))
idx = cudf.MultiIndex.from_tuples(tuples)
idx

MultiIndex(levels=[0    a
1    b
dtype: object, 0    1
1    2
2    3
3    4
dtype: int64],
codes=   0  1
0  0  0
1  0  1
2  1  2
3  1  3)

In [58]:
gdf1 = cudf.DataFrame({
    "first": np.random.rand(4),
    "second": np.random.rand(4)
})
gdf1

Unnamed: 0,first,second
0,0.37454,0.156019
1,0.950714,0.155995
2,0.731994,0.058084
3,0.598658,0.866176


In [60]:
gdf1.index = idx
gdf1

Unnamed: 0,Unnamed: 1,first,second
a,1,0.37454,0.156019
a,2,0.950714,0.155995
b,3,0.731994,0.058084
b,4,0.598658,0.866176


In [61]:
gdf2 = cudf.DataFrame({'first': np.random.rand(4), 'second': np.random.rand(4)}).T
gdf2.columns = idx
gdf2

Unnamed: 0_level_0,a,a,b,b
Unnamed: 0_level_1,1,2,3,4
first,0.601115,0.708073,0.020584,0.96991
second,0.832443,0.212339,0.181825,0.183405


In [64]:
gdf1.loc[("b", 3)]

Unnamed: 0_level_0,Unnamed: 1_level_0,first,second
0,1,Unnamed: 2_level_1,Unnamed: 3_level_1
b,3,0.731994,0.058084


In [71]:
gdf2.columns

MultiIndex([('a', 1),
            ('a', 2),
            ('b', 3),
            ('b', 4)],
           )

## missing data

In [74]:
s.fillna(999)

0      1
1      2
2      3
3    999
4      4
dtype: int64

In [76]:
ds.fillna(999).compute()

0      1
1      2
2      3
3    999
4      4
dtype: int64

# stats

In [77]:
s.mean(), s.std()

(2.5, 1.2909944487358054)

In [78]:
ds.mean().compute(), ds.std().compute()

(2.5, 1.2909944487358056)

## apply map

In [82]:
def add_ten(num):
    return num + 10

df["a"].applymap(add_ten).head()

0    10
1    11
2    12
3    13
4    14
Name: a, dtype: int64

In [85]:
ddf["a"].map_partitions(add_ten).compute().head()

0    10
1    11
2    12
3    13
4    14
Name: a, dtype: int64

## histogramming

In [87]:
df["a"].value_counts()[:5]

0    1
1    1
2    1
3    1
4    1
Name: a, dtype: int32

In [89]:
ddf["a"].value_counts().compute()[:5]

0    1
1    1
2    1
3    1
4    1
Name: a, dtype: int64

## string

In [91]:
s = cudf.Series(['A', 'B', 'C', 'Aaba', 'Baca', None, 'CABA', 'dog', 'cat'])
s.str.lower()

0       a
1       b
2       c
3    aaba
4    baca
5    None
6    caba
7     dog
8     cat
dtype: object

In [92]:
ds = dask_cudf.from_cudf(s, npartitions=2)
ds.str.lower().compute()

0       a
1       b
2       c
3    aaba
4    baca
5    None
6    caba
7     dog
8     cat
dtype: object

## concat

In [108]:
s = cudf.Series([1,2,3,None,5])
cudf.concat([s, s], axis=0), cudf.concat([s, s], axis=1)

(0       1
 1       2
 2       3
 3    null
 4       5
 0       1
 1       2
 2       3
 3    null
 4       5
 dtype: int64,       0     1
 0     1     1
 1     2     2
 2     3     3
 3  null  null
 4     5     5)

## join

In [110]:
df_a = cudf.DataFrame()
df_a['key'] = ['a', 'b', 'c', 'd', 'e']
df_a['vals_a'] = [float(i + 10) for i in range(5)]
df_a

Unnamed: 0,key,vals_a
0,a,10.0
1,b,11.0
2,c,12.0
3,d,13.0
4,e,14.0


In [111]:
df_b = cudf.DataFrame()
df_b['key'] = ['a', 'c', 'e']
df_b['vals_b'] = [float(i+100) for i in range(3)]
df_b

Unnamed: 0,key,vals_b
0,a,100.0
1,c,101.0
2,e,102.0


In [120]:
merged = df_a.merge(df_b, on=["key"], how="outer")
merged

Unnamed: 0,key,vals_a,vals_b
0,a,10.0,100.0
1,c,12.0,101.0
2,e,14.0,102.0
3,b,11.0,
4,d,13.0,


In [122]:
ddf_a = dask_cudf.from_cudf(df_a, npartitions=2)
ddf_b = dask_cudf.from_cudf(df_b, npartitions=2)

dmerged = ddf_a.merge(ddf_b, on="key", how="inner").compute()
dmerged

Unnamed: 0,key,vals_a,vals_b
0,a,10.0,100.0
1,c,12.0,101.0
0,e,14.0,102.0


## append

In [124]:
s.append(s)

0       1
1       2
2       3
3    null
4       5
0       1
1       2
2       3
3    null
4       5
dtype: int64

In [130]:
result = ds.append(ds).compute()
type(result)

cudf.core.series.Series

# grouping

In [132]:
df.head()

Unnamed: 0,a,b,c
0,0,19,0
1,1,18,1
2,2,17,2
3,3,16,3
4,4,15,4


In [133]:
df['agg_col1'] = [1 if x % 2 == 0 else 0 for x in range(len(df))]
df['agg_col2'] = [1 if x % 3 == 0 else 0 for x in range(len(df))]
df.head()

Unnamed: 0,a,b,c,agg_col1,agg_col2
0,0,19,0,1,1
1,1,18,1,0,0
2,2,17,2,1,0
3,3,16,3,0,1
4,4,15,4,1,0


In [134]:
ddf = dask_cudf.from_cudf(df, npartitions=2)

In [136]:
df.groupby("agg_col1", as_index=False).sum()

Unnamed: 0,agg_col1,a,b,c,agg_col2
0,0,100,90,100,3
1,1,90,100,90,4


In [142]:
ddf.groupby("agg_col2").mean().compute()

Unnamed: 0_level_0,a,b,c,agg_col1
agg_col2,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
0,9.769231,9.230769,9.769231,0.461538
1,9.0,10.0,9.0,0.571429


In [143]:
df.groupby(["agg_col1", "agg_col2"]).sum()

Unnamed: 0_level_0,Unnamed: 1_level_0,a,b,c
agg_col1,agg_col2,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
0,0,73,60,73
0,1,27,30,27
1,0,54,60,54
1,1,36,40,36


In [144]:
ddf.groupby(["agg_col1", "agg_col2"]).sum().compute()

Unnamed: 0_level_0,Unnamed: 1_level_0,a,b,c
agg_col1,agg_col2,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
0,0,73,60,73
0,1,27,30,27
1,0,54,60,54
1,1,36,40,36


In [158]:
df.groupby("agg_col2").agg({"a": "median", "b": "max", "c": "min"})

Unnamed: 0_level_0,a,b,c
agg_col2,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,10.0,18,1
1,9.0,19,0


In [161]:
ddf.groupby("agg_col2").agg({"a": "mean", "b": "max", "c": "min"}).compute()

Unnamed: 0_level_0,a,b,c
agg_col2,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,9.769231,18,1
1,9.0,19,0


## transpose

In [163]:
df.transpose()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19
a,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19
b,19,18,17,16,15,14,13,12,11,10,9,8,7,6,5,4,3,2,1,0
c,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19
agg_col1,1,0,1,0,1,0,1,0,1,0,1,0,1,0,1,0,1,0,1,0
agg_col2,1,0,0,1,0,0,1,0,0,1,0,0,1,0,0,1,0,0,1,0


In [164]:
df.T

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19
a,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19
b,19,18,17,16,15,14,13,12,11,10,9,8,7,6,5,4,3,2,1,0
c,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19
agg_col1,1,0,1,0,1,0,1,0,1,0,1,0,1,0,1,0,1,0,1,0
agg_col2,1,0,0,1,0,0,1,0,0,1,0,0,1,0,0,1,0,0,1,0


## time series

In [165]:
import datetime as dt

date_df = cudf.DataFrame()
date_df["date"] = pd.date_range("01/01/2020", periods=72, freq="D")
date_df["value"] = np.random.sample(len(date_df))
date_df.head()

Unnamed: 0,date,value
0,2020-01-01,0.304242
1,2020-01-02,0.524756
2,2020-01-03,0.431945
3,2020-01-04,0.291229
4,2020-01-05,0.611853


In [166]:
search_date = dt.datetime.strptime("2020-01-31", "%Y-%m-%d")
search_date

datetime.datetime(2020, 1, 31, 0, 0)

In [168]:
date_df.loc[date_df["date"] < search_date].head()

Unnamed: 0,date,value
0,2020-01-01,0.304242
1,2020-01-02,0.524756
2,2020-01-03,0.431945
3,2020-01-04,0.291229
4,2020-01-05,0.611853


In [169]:
date_df.query("date < @search_date").head()

Unnamed: 0,date,value
0,2020-01-01,0.304242
1,2020-01-02,0.524756
2,2020-01-03,0.431945
3,2020-01-04,0.291229
4,2020-01-05,0.611853


## categoricals

In [170]:
gdf = cudf.DataFrame({"id":[1,2,3,4,5,6], "grade":['a', 'b', 'b', 'a', 'a', 'e']})
gdf['grade'] = gdf['grade'].astype('category')
gdf

Unnamed: 0,id,grade
0,1,a
1,2,b
2,3,b
3,4,a
4,5,a
5,6,e


In [172]:
dgdf = dask_cudf.from_cudf(gdf, npartitions=2)
dgdf.compute()

Unnamed: 0,id,grade
0,1,a
1,2,b
2,3,b
3,4,a
4,5,a
5,6,e


In [175]:
gdf["grade"].cat.codes

0    0
1    1
2    1
3    0
4    0
5    2
dtype: int32

In [178]:
dgdf["grade"].cat.codes.compute()

0    0
1    1
2    1
0    0
1    0
2    2
dtype: int32

## convert to pandas

In [182]:
type(df.head().to_pandas())

pandas.core.frame.DataFrame

In [184]:
type(ddf.compute().to_pandas())

pandas.core.frame.DataFrame

## convert to numpy

In [187]:
df.as_matrix()[:5]

array([[ 0, 19,  0,  1,  1],
       [ 1, 18,  1,  0,  0],
       [ 2, 17,  2,  1,  0],
       [ 3, 16,  3,  0,  1],
       [ 4, 15,  4,  1,  0]])

In [188]:
type(df.as_matrix())

numpy.ndarray

In [190]:
ddf.compute().as_matrix()[:5]

array([[ 0, 19,  0,  1,  1],
       [ 1, 18,  1,  0,  0],
       [ 2, 17,  2,  1,  0],
       [ 3, 16,  3,  0,  1],
       [ 4, 15,  4,  1,  0]])

## convert to arrow

In [191]:
df.to_arrow()

pyarrow.Table
a: int64
b: int64
c: int64
agg_col1: int64
agg_col2: int64
metadata
--------
OrderedDict([(b'pandas',
              b'{"index_columns": [{"kind": "range", "name": null, "start": '
              b'0, "stop": 20, "step": 1}], "column_indexes": [{"name": null'
              b', "field_name": null, "pandas_type": "unicode", "numpy_type"'
              b': "object", "metadata": {"encoding": "UTF-8"}}], "columns": '
              b'[{"name": "a", "field_name": "a", "pandas_type": "int64", "n'
              b'umpy_type": "int64", "metadata": null}, {"name": "b", "field'
              b'_name": "b", "pandas_type": "int64", "numpy_type": "int64", '
              b'"metadata": null}, {"name": "c", "field_name": "c", "pandas_'
              b'type": "int64", "numpy_type": "int64", "metadata": null}, {"'
              b'name": "agg_col1", "field_name": "agg_col1", "pandas_type": '
              b'"int64", "numpy_type": "int64", "metadata": null}, {"name": '
              b'"agg_col2"

## I/O

In [192]:
if not os.path.exists("data"):
    os.mkdir("data")

df.to_csv("data/foo.csv", index=False)

In [194]:
ddf.compute().to_csv("data/foo_dask.csv", index=False)

In [195]:
!ls data/

foo.csv  foo_dask.csv


In [198]:
df = cudf.read_csv("data/foo.csv", auto_mkdir=False)
df.head()

Unnamed: 0,a,b,c,agg_col1,agg_col2
0,0,19,0,1,1
1,1,18,1,0,0
2,2,17,2,1,0
3,3,16,3,0,1
4,4,15,4,1,0


In [199]:
ddf = dask_cudf.read_csv("data/foo_dask.csv")
ddf

Unnamed: 0_level_0,a,b,c,agg_col1,agg_col2
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
,int64,int64,int64,int64,int64
,...,...,...,...,...


In [202]:
ddf = dask_cudf.read_csv("data/foo*.csv")
len(ddf)

40

In [207]:
cudf.read_csv("data/foo*.csv")

Unnamed: 0,a,b,c,agg_col1,agg_col2
0,0,19,0,1,1
1,1,18,1,0,0
2,2,17,2,1,0
3,3,16,3,0,1
4,4,15,4,1,0
5,5,14,5,0,0
6,6,13,6,1,1
7,7,12,7,0,0
8,8,11,8,1,0
9,9,10,9,0,1


## dask performance tips

In [208]:
import time

from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster

cluster = LocalCUDACluster()
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://127.0.0.1:38057  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 1  Cores: 1  Memory: 8.21 GB


In [209]:
nrows = 1000000

df2 = cudf.DataFrame({'a':np.arange(nrows), 'b':np.arange(nrows)})
ddf2 = dask_cudf.from_cudf(df2, npartitions=5)
ddf2['c'] = ddf2['a'] + 5
ddf2

Unnamed: 0_level_0,a,b,c
npartitions=5,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,int64,int64,int64
200000,...,...,...
...,...,...,...
800000,...,...,...
999999,...,...,...


In [213]:
ddf2 = ddf2.persist()
ddf2

Unnamed: 0_level_0,a,b,c
npartitions=5,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,int64,int64,int64
200000,...,...,...
...,...,...,...
800000,...,...,...
999999,...,...,...


In [214]:
nrows = 1000000

df1 = cudf.DataFrame({'a':np.arange(nrows), 'b':np.arange(nrows)})
ddf1 = dask_cudf.from_cudf(df1, npartitions=100)

def func(df):
    time.sleep(np.random.randint(1, 60))
    return (df + 5) * 3 - 11

In [215]:
results_ddf = ddf2.map_partitions(func)
results_ddf

Unnamed: 0_level_0,a,b,c
npartitions=5,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,int64,int64,int64
200000,...,...,...
...,...,...,...
800000,...,...,...
999999,...,...,...


In [216]:
results_ddf = results_ddf.persist()

In [217]:
results_ddf

Unnamed: 0_level_0,a,b,c
npartitions=5,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,int64,int64,int64
200000,...,...,...
...,...,...,...
800000,...,...,...
999999,...,...,...


In [219]:
wait(results_ddf)

DoneAndNotDoneFutures(done={<Future: finished, type: cudf.DataFrame, key: ('func-050b0477e10682d0a8f7866542d54abd', 0)>, <Future: finished, type: cudf.DataFrame, key: ('func-050b0477e10682d0a8f7866542d54abd', 1)>, <Future: finished, type: cudf.DataFrame, key: ('func-050b0477e10682d0a8f7866542d54abd', 4)>, <Future: finished, type: cudf.DataFrame, key: ('func-050b0477e10682d0a8f7866542d54abd', 2)>, <Future: finished, type: cudf.DataFrame, key: ('func-050b0477e10682d0a8f7866542d54abd', 3)>}, not_done=set())