In [None]:
# The MIT License (MIT)

# Copyright (c) 2020, NVIDIA CORPORATION.

# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:

# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE

# Tutorial: Feature Engineering for Recommender Systems

# 6. Scaling to Production Systems

## 6.1. Introduction to dask and dask_cudf

## Theory

**Acknowledgement**: Much of the introductory material included here is borrowed from other Dask documentation and tutorials.
- ["Dask Video Tutorial"](https://github.com/jacobtomlinson/dask-video-tutorial-2020) 
- [YouTube link](https://www.youtube.com/watch?v=_u0OQm9qf_A)
- [Introduction To Dask by Richard (Rick) Zamora](https://github.com/rjzamora/notebooks/tree/master/nvtabular_dask_demo)

Other useful Dask resources:
- [Dask.org](https://dask.org/)
    - [Tutorial pages](https://tutorial.dask.org/00_overview.html)
- [GitHub Tutorial](https://github.com/dask/dask-tutorial)

## What is Dask

**Very Short Answer**: Dask is an open-source library designed to natively scale Python code.

**Slightly-Longer Short Answer**: Dask is a task-based library for parallel scheduling and execution. Although it is certainly possible to use the task-scheduling machinery directly to implement customized parallel workflows (we do it in NVTabular), most users only interact with Dask through a *Dask Collection API*.  The most popular "collection" API's include:

- [Dask DataFrame](https://docs.dask.org/en/latest/dataframe.html): Dask-based version of the [Pandas](https://pandas.pydata.org/) DataFrame/Series API.  Note that `dask_cudf` is just a wrapper around this collection module (`dask.dataframe`).
- [Dask Array](https://docs.dask.org/en/latest/array.html): Dask-based version of the [NumPy]() array API
- [Dask Bag](https://docs.dask.org/en/latest/bag.html): *Similar to* a Dask-based version of PyToolz or a Pythonic version of PySpark RDD


For example, Dask DataFrame provides a convenient API for decomposing large pandas (or cuDF) DataFrame/Series objects into a collection of DataFrame *partitions*.  This tutorial will focus mostly on this particular Dask collection (since it is the basis for `dask_cudf`).  However, instead of relying only on the established `dask.dataframe` API, we will also see how it is possible (perhaps easy) to implement a custom task graph to operate on Dask-DataFrame objects when necessary.

<img src="./images/dask-dataframe.svg" width="350px">

### Dask Uses DAGs Internally

Before we start writing any code, it is useful to understand (on a basic level) how Dask actually works. When an application or library uses a Dask collection API (like Dask DataFrame), they are typically using that API to construct a directed acyclic graph (DAG) of tasks.  Once a DAG is constructed, the **core** Dask API can be used (either directly or implicitly through the collection API) to schedule and execute the DAG on one or more threads/processes.

In other words, Dask provides various APIs to:

1. Construct a DAG of "tasks"
2. Schedule/execute those DAGs
3. (Optionally) Spin up a dedicated worker and scheduler processes to enable distributed execution

<img src='./images/dask_dag_cartoon.png' width=500>

### Important Components of the "Dask Ecosystem"

The components of the Dask ecosystem that are most critical for NVTabular (and will be discussed in this tutorial) are:

- `dask` (core Dask library): [[GitHub Repo](https://github.com/dask/dask)]  This is the core Dask library.  It also contains the Dask Dataframe API (`dask.dataframe`)
- `dask_cudf`: [[GitHub Repo](https://github.com/rapidsai/cudf/tree/branch-0.15/python/dask_cudf)] This is effectively a wrapper around the `dask.dataframe` module defined in the core Dask library.  Note that a `dask_cudf.DataFrame` object should be thought of as a `dask.dataframe.DataFrame` object, but with the underlying partitions being `cudf.DataFrame`'s (rather than `pandas.DataFrame`)
- `distributed`: [[GitHub Repo](https://github.com/dask/distributed)] Distributed version of the Dask execution model (includes the necessary code for scheduling, execution and communication between distributed processes).  This library does not deal with the construction of DAGs, just with the scheduling and execution of DAGs on distributed *workers*.
- `dask_cuda`: [[GitHub Repo](https://github.com/rapidsai/dask-cuda)] Provides various utilities to improve deployment and management of distributed Dask *workers* on CUDA-enabled systems.

## HandsOn

Before we get started, it is convenient to create a simple `dask.distributed` client. If we work with a small dataset, then it is not necessary to initialize a `dask.distributed` client. The code should run in the same way.

In [1]:
import dask
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd

In [2]:
client = Client(n_workers=8, 
                threads_per_worker=1,
                memory_limit='50GB',
                ip='127.0.0.1')
client

0,1
Client  Scheduler: tcp://127.0.0.1:40757  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 8  Cores: 8  Memory: 132.13 GB


In [3]:
%%time

ddf_train = dd.read_parquet('./data/train.parquet', blocksize=12e3)
ddf_valid = dd.read_parquet('./data/valid.parquet', blocksize=12e3)

CPU times: user 28 ms, sys: 0 ns, total: 28 ms
Wall time: 27.4 ms


In [4]:
ddf_train

Unnamed: 0_level_0,event_time,event_type,product_id,brand,price,user_id,user_session,target,cat_0,cat_1,cat_2,cat_3,timestamp,ts_hour,ts_minute,ts_weekday,ts_day,ts_month,ts_year
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1
,object,object,int64,object,float64,int64,object,int64,object,object,object,object,object,int64,int64,int64,int64,int64,int64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


Here we have created a `dask.dataframe.DataFrame` object called `ddf_train` and `ddf_valid`.  Both are essentially a (**lazy**) collection of pandas dataframes. Dask loaded the metadata (DataFrame schema) but did not load any data in-memory. Each pandas dataframe in this collection is called a **partition**.  We can access this property (the total number of partitions) using the `DataFrame.npartitions` attribute.

**It is absolutely critical to recognize that `ddf_train` and `ddf_valid` are *not* actually backed by *in-memory* pandas data, but instead by a DAG of tasks**.  This DAG (accessible via `ddf.dask`) specifies the exact network of operations needed to produce the underlying partitions.

In [5]:
ddf_train._meta

Unnamed: 0,event_time,event_type,product_id,brand,price,user_id,user_session,target,cat_0,cat_1,cat_2,cat_3,timestamp,ts_hour,ts_minute,ts_weekday,ts_day,ts_month,ts_year


Let's work on some examples: Simplified Target Encoding
1. We combine two columns cat_2 and brand 
2. We TargetEncode the new column cat_2_brand 
3. We merge the counts back to the train and validation dataset
4. We overwrite counts with less than 20 for on cat_2_brand with global_mean

We can see that the execution time is 117ms - meaning that dask has registered the operations but hasn't executed them.

In [6]:
%%time

ddf_train['cat_2_brand'] = ddf_train['cat_2'].astype(str) + '_' + ddf_train['brand'].astype(str)
ddf_valid['cat_2_brand'] = ddf_valid['cat_2'].astype(str) + '_' + ddf_valid['brand'].astype(str)

ddf_train_group = ddf_train[['cat_2_brand', 'target']].groupby(['cat_2_brand']).agg(['count', 'mean'])
ddf_train_group = ddf_train_group.reset_index()
ddf_train_group.columns = ['cat_2_brand', 'TE_count', 'TE_mean']
ddf_train = ddf_train.merge(ddf_train_group, how='left', on='cat_2_brand')
ddf_valid = ddf_valid.merge(ddf_train_group, how='left', on='cat_2_brand')
global_mean = ddf_train['target'].mean()
ddf_train['TE_mean'] = ddf_train.TE_mean.where(ddf_train['TE_count']>20, global_mean)
ddf_valid['TE_mean'] = ddf_valid.TE_mean.where(ddf_valid['TE_count']>20, global_mean)

CPU times: user 60 ms, sys: 0 ns, total: 60 ms
Wall time: 56.7 ms


We can compute the task graph by calling `.compute()` or `.persist()`

In [7]:
%%time

ddf_train.compute()
ddf_valid.compute()

CPU times: user 15.1 s, sys: 9.68 s, total: 24.8 s
Wall time: 3min 14s


Unnamed: 0,event_time,event_type,product_id,brand,price,user_id,user_session,target,cat_0,cat_1,...,timestamp,ts_hour,ts_minute,ts_weekday,ts_day,ts_month,ts_year,cat_2_brand,TE_count,TE_mean
0,2020-03-01 00:00:59 UTC,cart,6902464,zlatek,49.91,531574188,48714293-b3f9-4946-8135-eb1ea05ead74,0,electronics,telephone,...,2020-03-01 00:00:59,0,0,6,1,3,2020,nan_zlatek,607.0,0.258649
1,2020-03-01 00:01:20 UTC,cart,1002544,apple,397.10,622090790,fb5b918c-f1f6-48d9-bcf4-7eb46e83fc6b,0,construction,tools,...,2020-03-01 00:01:20,0,1,6,1,3,2020,light_apple,1013391.0,0.469441
2,2020-03-01 00:01:52 UTC,cart,1003316,apple,823.70,622090543,b821ee79-96fe-4979-be9d-21ee2e6777c3,0,construction,tools,...,2020-03-01 00:01:52,0,1,6,1,3,2020,light_apple,1013391.0,0.469441
3,2020-03-01 00:02:14 UTC,cart,16600067,rivertoys,422.15,616437533,aad023bc-c858-47ab-a3a7-ff4654f11b9a,0,sport,trainer,...,2020-03-01 00:02:14,0,2,6,1,3,2020,nan_rivertoys,10564.0,0.104411
4,2020-03-01 00:02:15 UTC,cart,3701428,arnica,69.24,516454226,ee22b80c-ed3e-3c83-d397-fb69a44d4864,0,appliances,environment,...,2020-03-01 00:02:15,0,2,6,1,3,2020,vacuum_arnica,4450.0,0.325393
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2461714,2020-03-31 23:57:47 UTC,purchase,24100293,cocochoco,2.65,513094047,d27f822c-f707-4956-a6c3-4ad8fec00cc7,1,appliances,personal,...,2020-03-31 23:57:47,23,57,1,31,3,2020,massager_cocochoco,82.0,0.146341
2461715,2020-03-31 23:58:19 UTC,purchase,100049773,,234.96,620580925,c33fde42-a5de-4a1f-9e1c-2ac7518a7d41,1,,,...,2020-03-31 23:58:19,23,58,1,31,3,2020,nan_nan,521515.0,0.277106
2461716,2020-03-31 23:58:20 UTC,purchase,3700689,samsung,223.92,514905289,e40783c5-7b21-429f-99af-539d2842e6d3,1,appliances,environment,...,2020-03-31 23:58:20,23,58,1,31,3,2020,vacuum_samsung,68239.0,0.392459
2461717,2020-03-31 23:59:19 UTC,purchase,100077607,vitek,100.36,633281427,667a8535-221c-4169-aab4-a1972610f102,1,appliances,environment,...,2020-03-31 23:59:19,23,59,1,31,3,2020,vacuum_vitek,11950.0,0.340000


In [8]:
client.close()

**About `compute`**: The `compute` method is [defined for all Dask collections](https://github.com/dask/dask/blob/51d3f1120fc55f21b5ce1ac137201ea01d9cf496/dask/base.py#L143). For Dask DataFrame, this method will (1) trigger the execution of the graph and (2) convert the Dask DataFrame into a **single** Pandas DataFrame. *This means that you should be sure the pandas equivalent of `ddf` will fit in memory before you use `compute`!*

__Using `persist`__

Since the `compute` method will convert your Dask DataFrame to a Pandas DataFrame, it is typically a **bad** idea to use compute on larger-than-memory (LTM) datasets.  In NVTabular, we do use a `compute` method, but never on a full Dask/dask_cudf DataFrame object.  Instead, we use `compute` to trigger the collection/reduction of an aggregated statistics dictionary, and/or to write out a processed dataset.

In order to execute the `ddf` DAG **without** converting it to a single pandas DataFrame, you need to use the [`persist` method](https://github.com/dask/dask/blob/51d3f1120fc55f21b5ce1ac137201ea01d9cf496/dask/base.py#L101). This function is particularly useful when using distributed systems, because the results will be kept in distributed memory, rather than returned to the local process as with compute. It will also allow the distributed cluster to clean up data that the scheduler no longer deems necessary.  For the single-machine case, the method is used less often.

**Let's move on to the GPU accelerated version with dask_cudf.**

We can use `nvidia-smi` command to check the usage of our GPU.

In [9]:
!nvidia-smi

Mon Sep 21 14:10:27 2020       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 440.64.00    Driver Version: 440.64.00    CUDA Version: 10.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|   0  Tesla T4            Off  | 00000000:00:1E.0 Off |                    0 |
| N/A   31C    P8     9W /  70W |      0MiB / 15109MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Processes:                                                       GPU Memory |
|  GPU       PID   Type   Process name                             Usage      |
|  No ru

In [10]:
import dask as dask, dask_cudf
from dask.distributed import Client
from dask_cuda import LocalCUDACluster

In [11]:
cluster = LocalCUDACluster(ip='127.0.0.1',
                           rmm_pool_size="16GB")
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://127.0.0.1:34691  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 1  Cores: 1  Memory: 16.52 GB


We reserve 14GB per GPU via `rmm_pool_size`.

In [12]:
!nvidia-smi

Mon Sep 21 14:10:33 2020       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 440.64.00    Driver Version: 440.64.00    CUDA Version: 10.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|   0  Tesla T4            Off  | 00000000:00:1E.0 Off |                    0 |
| N/A   33C    P0    26W /  70W |    605MiB / 15109MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Processes:                                                       GPU Memory |
|  GPU       PID   Type   Process name                             Usage      |
+-------

We use `dask_cudf` to read and load the data. The remaining code is exactly the same as the dask pandas version.

In [13]:
%%time

ddf_train = dask_cudf.read_parquet('./data/train.parquet')
ddf_valid = dask_cudf.read_parquet('./data/valid.parquet')

CPU times: user 996 ms, sys: 416 ms, total: 1.41 s
Wall time: 1.41 s


In [14]:
%%time

ddf_train['cat_2_brand'] = ddf_train['cat_2'].astype(str) + '_' + ddf_train['brand'].astype(str)
ddf_valid['cat_2_brand'] = ddf_valid['cat_2'].astype(str) + '_' + ddf_valid['brand'].astype(str)

ddf_train_group = ddf_train[['cat_2_brand', 'target']].groupby(['cat_2_brand']).agg(['count', 'mean'])
ddf_train_group = ddf_train_group.reset_index()
ddf_train_group.columns = ['cat_2_brand', 'TE_count', 'TE_mean']
ddf_train = ddf_train.merge(ddf_train_group, how='left', on='cat_2_brand')
ddf_valid = ddf_valid.merge(ddf_train_group, how='left', on='cat_2_brand')
global_mean = ddf_train['target'].mean()
ddf_train['TE_mean'] = ddf_train.TE_mean.where(ddf_train['TE_count']>20, global_mean)
ddf_valid['TE_mean'] = ddf_valid.TE_mean.where(ddf_valid['TE_count']>20, global_mean)

CPU times: user 544 ms, sys: 0 ns, total: 544 ms
Wall time: 652 ms


In [15]:
%%time

ddf_train.compute()
ddf_valid.compute()

CPU times: user 2.75 s, sys: 5.77 s, total: 8.52 s
Wall time: 14.3 s


Unnamed: 0,event_time,event_type,product_id,brand,price,user_id,user_session,target,cat_0,cat_1,...,timestamp,ts_hour,ts_minute,ts_weekday,ts_day,ts_month,ts_year,cat_2_brand,TE_count,TE_mean
0,2020-03-01 08:16:04 UTC,cart,1005135,apple,1516.10,620967403,2f69a6e0-3a9e-4b7c-b717-ce5b8ad85ce3,0,construction,tools,...,2020-03-01 08:16:04,8,16,6,1,3,2020,light_apple,1013391,0.469441
1,2020-03-01 08:16:08 UTC,cart,1005135,apple,1516.10,620967403,2f69a6e0-3a9e-4b7c-b717-ce5b8ad85ce3,0,construction,tools,...,2020-03-01 08:16:08,8,16,6,1,3,2020,light_apple,1013391,0.469441
2,2020-03-01 08:16:09 UTC,cart,1004996,doogee,96.89,607174356,80d6850c-7f95-4978-ba1a-dedbe802e012,0,construction,tools,...,2020-03-01 08:16:09,8,16,6,1,3,2020,light_doogee,769,0.405722
3,2020-03-01 08:16:09 UTC,cart,1005135,apple,1516.10,620967403,2f69a6e0-3a9e-4b7c-b717-ce5b8ad85ce3,0,construction,tools,...,2020-03-01 08:16:09,8,16,6,1,3,2020,light_apple,1013391,0.469441
4,2020-03-01 08:16:13 UTC,cart,1005256,xiaomi,141.29,571788375,da050faa-118a-405a-b9c8-63f9d730328e,0,construction,tools,...,2020-03-01 08:16:13,8,16,6,1,3,2020,light_xiaomi,510657,0.396346
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2461714,2020-03-31 19:25:14 UTC,purchase,18301044,,11.04,572119027,172b36e9-9259-423c-bc43-5d555ff94ce4,1,sport,ski,...,2020-03-31 19:25:14,19,25,1,31,3,2020,,,0.366924
2461715,2020-03-31 19:25:17 UTC,purchase,100058915,iqos,43.76,620477097,47786b4a-f2c3-48fa-b714-9d05556d5b98,1,apparel,trousers,...,2020-03-31 19:25:17,19,25,1,31,3,2020,,,0.366924
2461716,2020-03-31 19:25:36 UTC,purchase,32401283,,22.97,635102002,d82b8bf0-dea5-4e53-84f8-e61332eb17f1,1,apparel,underwear,...,2020-03-31 19:25:36,19,25,1,31,3,2020,,,0.366924
2461717,2020-03-31 19:26:18 UTC,purchase,4800282,samsung,38.59,622434648,4894c1b9-d00d-4418-b6c9-e8cd2f842b33,1,sport,bicycle,...,2020-03-31 19:26:18,19,26,1,31,3,2020,,,0.366924


In [16]:
!nvidia-smi

Mon Sep 21 14:10:50 2020       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 440.64.00    Driver Version: 440.64.00    CUDA Version: 10.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|   0  Tesla T4            Off  | 00000000:00:1E.0 Off |                    0 |
| N/A   36C    P0    33W /  70W |   1629MiB / 15109MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Processes:                                                       GPU Memory |
|  GPU       PID   Type   Process name                             Usage      |
+-------

In [17]:
client.close()