# `print('Hello Dask!')`
## James Bourbeau
#### WIPAC X-meeting
October 9, 2018

## Outline

- [What is / Why Dask?](#What-is-Dask?)

- [High Level Collections](#High-Level-Collections)

    - [Dask Arrays](#Dask-Arrays)
    
    - [Dask DataFrames](#Dask-DataFrames)

- [Low Level Interface](#Low-Level-Interface)

    - [Dask Delayed](#Dask-Delayed)


- [Schedulers](#Schedulers)

    - [Single Machine Schedulers](#Single-Machine-Schedulers)
    
    - [Distributed Scheduler](#Distributed-Scheduler)
    
- [Building on Dask](#Building-on-Dask)

- [Resources](#Resources)

## What is Dask?

- Dask is a flexible, open source library for parallel computing in Python

    - GitHub: https://github.com/dask/dask
    
    - Documentation: https://docs.dask.org
    
- Two main components of Dask:

    - High- and low-level interfaces for creating task graphs to perform a computation
    
    - Task schedulers to to execute the task graph
    
<img src="images/collections-schedulers.png">

## Why Dask?

- Integrates well with the scientific Python ecosystem

- Uses familiar APIs you're used to from NumPy, Pandas, and scikit-learn

- Allows you to scale existing workflows with minimal rewriting

- Dask works on your laptop, but also scales out to clusters

- Offers great built-in diagnosic tools

## Constructing task graphs

#### High level interfaces:

- [Bags](http://docs.dask.org/en/latest/bag.html): Parallel Python lists

- [Arrays](http://docs.dask.org/en/latest/array.html): Parallel NumPy

- [DataFrames](http://docs.dask.org/en/latest/dataframe.html): Parallel Pandas


#### Low Level interfaces:

- [Delayed](http://docs.dask.org/en/latest/delayed.html): Parallel function evaluation

- [Futures](http://docs.dask.org/en/latest/futures.html): Real-time parallel function evaluation

<hr>

# High Level Collections

[ [Back to top](#Outline) ]

## Dask Arrays

- Dask arrays are a collection of NumPy ndarray arrays

- Dask arrays implements a subset of the NumPy interface using blocked algorithms

- For many purposes Dask arrays can serve as drop-in replacements for NumPy arrays

<img src="images/dask-array-black-text.svg">

In [None]:
import numpy as np
import dask.array as da

In [None]:
a_np = np.arange(1, 50, 3)
a_np

In [None]:
a_da = da.arange(1, 50, 3, chunks=5)
a_da

Dask arrays are _lazily_ evaluated; the actual data in the array is not loaded until you ask for it.

Similar to NumPy arrays, Dask arrays have a `dtype` and `shape`.

In [None]:
print(a_da.dtype)
print(a_da.shape)

In addition, Dask arrays have additional attributes:

- `npartitions` attribute tells you how many NumPy arrays make up the Dask array

- `chunks` attribute which is a sequence of chunk sizes along each dimension of the array

In [None]:
print(a_da.npartitions)
print(a_da.chunks)

You can even visualize the Dask graph using the `visualize()` method!

In [None]:
a_da.visualize()

To compute the Dask graph for this array and load the data into memory, use the `compute()` method

In [None]:
a_da.compute()

Dask arrays supports most of the Numpy interface like the following:

- Arithmetic and scalar mathematics, `+`, `*`, `exp`, `log`, ...

- Reductions along axes, `sum()`, `mean()`, `std()`, `sum(axis=0)`, ...

- Tensor contractions / dot products / matrix multiply, tensordot

- Axis reordering / transpose, transpose

- Slicing, `x[:100, 500:100:-2]`

- Fancy indexing along single axes with lists or numpy arrays, `x[:, [10, 1, 5]]`

- Array protocols like `__array__`, and `__array_ufunc__`

- Some linear algebra `svd`, `qr`, `solve`, `solve_triangular`, `lstsq`, ...

- ...

See the [Dask array API docs](http://docs.dask.org/en/latest/array-api.html) for full details about what portion of the NumPy API is implemented for Dask arrays.

As stated before, Dask leverages _blocked algorithms_ to perform computations that allow for the introduction of parallelism and a reduced RAM load. 

In [None]:
a_sum = a_da.sum()
a_sum

In [None]:
a_sum.visualize()

In [None]:
a_sum.compute()

Dask supports a large portion of the NumPy API. This can be used to build up more complex computations using the familiar NumPy operations you're used to.

In [None]:
x = da.ones((15, 15), chunks=(5, 5))
y = (x + x.T).sum()
y

In [None]:
y.visualize()

In [None]:
y.compute()

## Dask DataFrames

- Dask DataFrames are a collection of Pandas DataFrames

- Dask DataFrames implement a subset of the Pandas API

- For many purposes Dask DataFrames can serve as drop-in replacements for Pandas DataFrames

<img src="images/dask-dataframe.svg" width="400px">

In [None]:
import pandas as pd
import dask.dataframe as dd

Dask DataFrames support many of the same data I/O methods as Pandas. For example, 

- `read_hdf` \ `to_hdf`
- `read_csv` \ `to_csv`
- `read_json` \ `to_json`
- `read_parquet` \ `to_parquet`

In [None]:
ddf = dd.read_hdf('example_data.hdf', key='dataframe', chunksize=25)
ddf

Like Dask arrays, Dask DataFrames are lazily evaluated. 

In [None]:
ddf.compute()

Dask DataFrames covers a well-used portion of the Pandas API. The following class of computations works well:

- Elementwise operations: `df.x` + `df.y`, `df * df`

- Row-wise selections: `df[df.x > 0]`

- Loc: `df.loc[4.0:10.5]`

- Common aggregations: `df.x.max()`, `df.max()`

- Is in: `df[df.x.isin([1, 2, 3])]`

- Datetime/string accessors: `df.timestamp.month`

- Froupby-aggregate (with common aggregations): `df.groupby(df.x).y.max()`, `df.groupby('x').max()`

- ...

See the [Dask DataFrame API docs](http://docs.dask.org/en/latest/dataframe-api.html) for full details about what portion of the Pandas API is implemented for Dask DataFrames.

In [None]:
a_sum = ddf['col_1'].sum()
a_sum

In [None]:
a_sum.visualize()

In [None]:
a_sum.compute()

<hr>

# Low Level Interface

[ [Back to top](#Outline) ]

Sometimes problems don’t fit into one of the high-level collections like Dask arrays or Dask DataFrames. In these cases, you can parallelize custom algorithms using the simpler Dask delayed interface. This allows one to create task graphs directly with a light annotation of normal python code.

## Dask Delayed

Let's construct an example computation

In [None]:
def inc(x):
    return x + 1

def double(x):
    return x + 2

def add(x, y):
    return x + y

In [None]:
data = [1, 2, 3, 4, 5]

output = []
for x in data:
    a = inc(x)
    b = double(x)
    c = add(a, b)
    output.append(c)

total = sum(output)
total

Dask `delayed` wraps function calls. Wrapping a function in `delayed` will delay it's execution, instead returning a `Delayed` object that contains a graph of all operations done to get to the result. You can then call `compute` on a `Delayed` object to compute the task graph. 

In [None]:
from dask import delayed

In [None]:
simple_add = delayed(add)(1, 1)
simple_add

In [None]:
simple_add.visualize()

We can use `delayed` to make out previous example computation lazy by wrapping all the function calls with delayed

In [None]:
@delayed
def inc(x):
    return x + 1

@delayed
def double(x):
    return x + 2

@delayed
def add(x, y):
    return x + y

In [None]:
output = []
for x in data:
    a = inc(x)
    b = double(x)
    c = add(a, b)
    output.append(c)

total = delayed(sum)(output)
total

In [None]:
total.visualize()

In [None]:
total.compute()

<hr>

# Schedulers

[ [Back to top](#Outline) ]

After Dask generates these task graphs it needs to execute them on parallel hardware. This is the job of a task scheduler. Different task schedulers exist. Each will consume a task graph and compute the same result, but with different performance characteristics. Dask has two families of task schedulers:

- Single machine scheduler: This scheduler provides basic features on a local process or thread pool. This scheduler was made first and is the default. It is simple and cheap to use. It can only be used on a single machine and does not scale.

- Distributed scheduler: This scheduler is more sophisticated, offers more features, but also requires a bit more effort to set up. It can run locally or distributed across a cluster.


## Single Machine Schedulers

- `'synchronous'`: The single-threaded synchronous scheduler executes all computations in the local thread, with no parallelism at all. This is particularly valuable for debugging and profiling, which are more difficult when using threads or processes.

- `'threads'`: The threaded scheduler executes computations with a local `multiprocessing.pool.ThreadPool`. The threaded scheduler is the default choice for Dask arrays, Dask DataFrames, and Dask delayed. 

- `'processes'`: The multiprocessing scheduler executes computations with a local `multiprocessing.Pool`.

- Distributed: Advanced distributed scheduler (Despite having "distributed" in it's name, the distributed scheduler is)

You can configure which scheduler is used is a few different ways. You can set the scheduler globablly by using the `dask.config.set(scheduler=)` command

In [None]:
import dask

In [None]:
dask.config.set(scheduler='threads')
total.compute(); # Will use the multi-threading scheduler

or use it as a context manager to set the scheduler for a block of code

In [None]:
with dask.config.set(scheduler='processes'):
    total.compute()  # Will use the multi-processing scheduler

or even within a single compute call

In [None]:
total.compute(scheduler='threads');  # Will use the multi-threading scheduler

## Distributed Scheduler

In [None]:
from dask.distributed import Client

In [None]:
client = Client(processes=False, threads_per_worker=4,
                n_workers=1, memory_limit='2GB')
client

In [None]:
x = da.ones((20_000, 20_000), chunks=(2_000, 2_000))
y = (x + x.T).sum()
y

In [None]:
y.compute()

<hr>

# Building on Dask: Dask-ML

[ [Back to top](#Outline) ]

[Dask-ML](http://ml.dask.org/) is a Python library for scalable machine learning in Python.

Three different approaches are taken to scaling modern machine learning algorithms:

- Parallelize scikit-learn directly

- Reimplement scalable algorithms with Dask arrays

- Partner with other distributed libraries (like XGBoost and TensorFlow)

In [None]:
from dask_ml.datasets import make_classification
from dask_ml.model_selection import train_test_split
from dask_ml.metrics import accuracy_score
from dask_ml.linear_model import LogisticRegression

In [None]:
X, y = make_classification(n_samples=1000,
                           n_features=2,
                           n_classes=2,
                           random_state=2,
                           chunks=10)

In [None]:
X

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y,
                                                    test_size=0.3,
                                                    random_state=2)

In [None]:
clf = LogisticRegression(max_iter=5)
clf.fit(X_train, y_train)

In [None]:
y_pred = clf.predict(X_test)
accuracy_score(y_test, y_pred)

<hr>

# Resources

[ [Back to top](#Outline) ]

- Dask documentation: http://docs.dask.org

- Dask examples repository: https://github.com/dask/dask-examples

- There are lots of great Dask tutorial recordings from various Python conference on YouTube. For example,  

    - SciPy 2018 (Dask): https://www.youtube.com/watch?v=mqdglv9GnM8
    
    - SciPy 2018 (Dask-ML): https://www.youtube.com/watch?v=ccfsbuqsjgI