# Introduction to Dask: Hello World!

While the world’s data doubles each year, CPU computing has hit a brick wall with the end of Moore’s law. For the same reasons, scientific computing and deep learning has turned to NVIDIA GPU acceleration, data analytics and machine learning where GPU acceleration is ideal. 

NVIDIA created RAPIDS – an open-source data analytics and machine learning acceleration platform that leverages GPUs to accelerate computations. RAPIDS is based on Python, has pandas-like and Scikit-Learn-like interfaces, is built on Apache Arrow in-memory data format, and can scale from 1 to multi-GPU to multi-nodes. RAPIDS integrates easily into the world’s most popular data science Python-based workflows. RAPIDS accelerates data science end-to-end – from data prep, to machine learning, to deep learning. And through Arrow, Spark users can easily move data into the RAPIDS platform for acceleration.

In this notebook, we will show how one can use the RAPIDS container to quickly setup Dask and run a "Hello World" example.

**Table of Contents**

* Setup
* Load Libraries
* Setup Dask
* Hello World!
* Sleeping in Parallel
* Conclusion

## Setup

Before we begin, let's check out our hardware setup by running the `nvidia-smi` command.

In [None]:
!nvidia-smi

Next, let's see what CUDA version we have:

In [None]:
!nvcc --version

## Load Lbraries

Next, let's load some libraries.

In [None]:
import dask; print('Dask Version:', dask.__version__)
from dask.delayed import delayed
from dask.distributed import Client
import os
import subprocess
import time

## Setup Dask

Dask is a library the allows for parallelized computing. Written in Python, it allows one to schedule tasks and do so dynamically as well handle large data structures - similar to those found in NumPy and Pandas. In the subsequent tutorials, we'll show how to use Dask with Pandas and cuDF and how we can use both to accelerate common ETL tasks as well as build ML models like XGBoost.

To learn more about Dask, check out the documentation here: http://docs.dask.org/en/latest/

The several steps below will setup Dask. We'll walk through them one at a time.

In [None]:
# worker settings
n_workers = 4

Dask operates using a concept of a "Client" and "workers". The client tells the workers what tasks to perform and when to perform. Typically, we set the number of works to be equal to the number of computing resources we have available to us. For example, wer might set `n_workers = 8` if we have 8 CPU cores on our machine that can each operate in parallel. This allows us to take advantage of all of our computing resources and enjoy the most benefits from parallelization.

As we'll see later, Dask is a first class citizen in the world of General Purpose GPU computing and the RAPIDS ecosystem makes it very easy to use Dask with cuDF and XGBoost. As a best practice, we set the number of workers equal to the number of GPUs available to us.

Above, we set the number of works to be 4 - this value should work for many workstations and servers. Based on the output of `nvidia-smi` in the first cell of this notebook, that value may be raised or lowered.

Once we've determined how many workers we want to employ for our computing, we need to determine the IP Address and port for the scheduler. The scheduler's IP Address is the same as the host name and can be found using `!hostname --all-ip-addresses` and parsing that information using `scheduler_ip[0].split()[0]`. 

We are running this notebook in Single Node, Multiple GPU fashion - we will see in subsequent tutorials how these values may change when running in Multiple Node, Multiple GPU fashion.

In [None]:
scheduler_ip = !hostname --all-ip-addresses
scheduler_ip = scheduler_ip[0].split()[0]
scheduler_port = '8786'
scheduler_uri = scheduler_ip + ':' +  scheduler_port
print(scheduler_uri)

Our next step is to configure the Dask environment settings. We do the following:

* Create a copy of `os.environ`, a Python dictionary containing our environment variables
* Disable NCCL Peer-to-Peer (P2P) transport by setting `NCCL_P2P_DISABLE` to 1. 
  * P2P transport allows CUDA direct access between GPUs via NVLink or PCI. Setting this variable to 1 disables direct GPU-to-GPU communication.
* Next, we set `DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING` to be False.
* Finally, we set `DASK_DISTRIBUTED__SCHEDULER__BANDWIDTH` to 1.


In [None]:
# dask environment settings
dask_env = os.environ.copy()
dask_env['NCCL_P2P_DISABLE'] = '1'
dask_env['DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING'] = 'False'
dask_env['DASK_DISTRIBUTED__SCHEDULER__BANDWIDTH'] = '1'

With our environment defined, let's start the Dask Scheduler. We pass the `dask_env` dictionary as the environment into `subprocess.Popen` using the `env` keyword.

In [None]:
# start the Dask Scheduler using an environment with the Dask settings set above
subprocess.Popen('dask-scheduler', env = dask_env)

Next, we instantiate our Dask Client using the Scheduler URI we previously defined. As a best practice, we retire any workers that were previously running.

In [None]:
# instantiate Client
client = Client(scheduler_uri)

# retire existing Dask workers
client.retire_workers()

Entering `client` into the cell below will show the current status of the Dask Client - ideally, there are no workers since we retired them in the previous step.

In [None]:
# show current Dask status
client

With our Dask Client instantiated, the next step is to define and instantiate each of the Dask Workers. To do so, we'll use the `dask-worker` command and pass the below CLI arguments.

In [None]:
# create list of arguments to pass to dask-worker
argument_list = ['--no-nanny', '--nprocs=1', '--nthreads=1', '--memory-limit=0', '--host=' + scheduler_ip]

Next, we iterate through the number of workers and instantiate each Dask Worker.

In [None]:
for worker_id in range(n_workers):
    dask_env['CUDA_VISIBLE_DEVICES'] = str(worker_id)
    subprocess.Popen(['dask-worker', scheduler_uri] + argument_list, env=dask_env)
time.sleep(3)  # this will give Dask time to setup each worker

Now, let's show our current Dask status. We should see the IP Address for our Scheduler as well the the number of workers in our Cluster (hopefully `n_workers`!). 

In [None]:
# show current Dask status
client

You can also see the status and more information at the Dashboard, found at `http://<scheduler_uri>/status`. You can ignore this for now, we'll dive into this in subsequent tutorials.

## Hello World

Our Dask Client and Dask Workers have been setup. It's time to execute our first program in parallel. We'll define a function that takes some value `x` and adds 5 to it.

In [None]:
def add_x_to_5(x):
    return x + 5

Next, we'll iterate through our `n_workers` and create an execution graph, where each worker is responsible for taking it's ID and passing it to the function `add_x_to_5`. For example, Dask Worker 2 will result in the value 7.

An important thing to note is that the Dask Workers aren't actually executing these results - we're just defining the execution graph for our Dask Client to execute later. The `delayed` function wrapper ensures that this computation is in fact "delayed" and not executed on the spot.

In [None]:
results_delayed = [delayed(add_x_to_5)(i) for i in range(n_workers)]

In [None]:
results_delayed

We'll use the Dask Client to compute the results. 

In [None]:
results = client.compute(results_delayed, optimize_graph=False, fifo_timeout="0ms")
time.sleep(1)  # this will give Dask time to execute each worker

In [None]:
results

Note that the results are not the "actual results" of adding 5 to each of `[0, 1, 2, 3]` - we need to collect and print the results. We can do so by calling the `result()` method for each of our results.

In [None]:
print([result.result() for result in results])

## Sleeping in Parallel

To see that Dask is truly executing in parallel, we'll define a function that sleeps for 1 second and returns the string "Success!". In serial, this function will take 4 seconds to execute.

In [None]:
def sleep_1():
    time.sleep(1)
    return 'Success!'

In [None]:
%%time

for _ in range(n_workers):
    sleep_1()

Using Dask, we see that this whole process takes a little over second - each worker is executing in parallel!

In [None]:
%%time

# define delayed execution graph
results_delayed = [delayed(sleep_1)() for _ in range(n_workers)]

# use client to perform computations using execution graph
results = client.compute(results_delayed, optimize_graph=False, fifo_timeout="0ms")

# collect and print results
print([result.result() for result in results])

## Conclusion

To learn more about RAPIDS, be sure to check out: 

* [Open Source Website](http://rapids.ai)
* [GitHub](https://github.com/rapidsai/)
* [Press Release](https://nvidianews.nvidia.com/news/nvidia-introduces-rapids-open-source-gpu-acceleration-platform-for-large-scale-data-analytics-and-machine-learning)
* [NVIDIA Blog](https://blogs.nvidia.com/blog/2018/10/10/rapids-data-science-open-source-community/)
* [Developer Blog](https://devblogs.nvidia.com/gpu-accelerated-analytics-rapids/)
* [NVIDIA Data Science Webpage](https://www.nvidia.com/en-us/deep-learning-ai/solutions/data-science/)
