# intro to `dask`

`dask` is a Python library developed by Anaconda that we make extensive use of "under the hood." You don't need to be a dask expert to use x, but if you want to optimize performance or add to the library you'll need to understand how it works.

There are two basic problems that we'll use `dask` to solve:

 * **Memory:** on some problems, our rasters may be large enough that we can't hold everything in memory at once. So we need a system to load things in and out of memory as we need them.
 * **Parallelization:** parts of our models can be computed concurrently (which will speed them up), which is tricky with Python. 
 
These two issues are at odds with each other- running more things in parallel means having more stuff in memory at once. So optimizing x often comes down to figuring out how much you can cram into memory without breaking something.

## so what is `dask` then?

When you look at `dask` code, it should look **just like regular Python code.** `dask` has an array interface that looks just like `numpy` and a `DataFrame` interface that looks just like `pandas`. So it should be familiar.

**The difference:** when you build out your code with `dask`, no actual computation happens. Instead it builds a "task graph"; basically a set of IOUs for your computation. It doesn't evaluate them until you explicitly ask it to.

## why would I do that

Since the entire computation is defined in the abstract, a **scheduler** can decide what the best order is to process everything, including when to load things into/out of memory and when to parallelize across CPU cores (our two aforementioned problems). `dask` comes with a couple different schedulers to choose between.

## that sounds like a `pyspark` knockoff

Yeah but a lot less overhead to set it up

## why `dask` instead of writing it all in `multiprocessing`?

Because shut up, that's why. Also because the shared-memory section of Python's `multiprocessing` documentation is a throne of lies.

## what about `distributed`?

`distributed` is a sister package to `dask` that adds one more scheduler- but this one can parallelize across a cluster. We use it to run large inference jobs on puma.

# Goals for this notebook

* get the basic idea of `dask`
* quick intro to `dask.array`
* quick intro to `dask.dataframe`
* quick intro to `dask.delayed`

In [4]:
import numpy as np
import pandas as pd
import dask, dask.array, dask.dataframe

## `dask.array`

The array interface looks (almost) just like `numpy`. The difference is that there will be a `chunks` parameter any time we build an array, that specifies what-size pieces to carve it into for computation.

Smaller chunks means more independent computations- this can make parallelization easier, but also adds more overhead. Larger chunks means more information has to get loaded at once.

In [14]:
# numpy array
foo = np.arange(10)
foo

array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])

In [15]:
# dask array
bar = dask.array.arange(10, chunks=5)
bar

dask.array<arange, shape=(10,), dtype=int64, chunksize=(5,)>

In [16]:
bar.compute()

array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])

In [17]:
foo.sum()

45

In [18]:
bar.sum()

dask.array<sum-aggregate, shape=(), dtype=int64, chunksize=()>

In [19]:
bar.sum().compute()

45

Every `dask` object has a `.dask` variable you can inspect to see what the actual DAG is:

In [23]:
dict(bar.sum().dask)

{('arange-148142c4b92299810fc0c0f4549563ac',
  0): (<function dask.array.chunk.arange>, 0, 5, 1, 5, dtype('int64')),
 ('arange-148142c4b92299810fc0c0f4549563ac',
  1): (<function dask.array.chunk.arange>, 5, 10, 1, 5, dtype('int64')),
 ('sum-aggregate-26349468436826e12254b9617fc9444a',): (<toolz.functoolz.Compose at 0x7f7932d75c50>,
  [('sum-daa3748e0f4588a32f3553887f222ce4', 0),
   ('sum-daa3748e0f4588a32f3553887f222ce4', 1)]),
 ('sum-daa3748e0f4588a32f3553887f222ce4',
  0): (<function dask.compatibility.apply>, functools.partial(<function sum at 0x7f79541cf1e0>, dtype=dtype('int64')), [('arange-148142c4b92299810fc0c0f4549563ac',
    0)], {'axis': (0,), 'keepdims': True}),
 ('sum-daa3748e0f4588a32f3553887f222ce4',
  1): (<function dask.compatibility.apply>, functools.partial(<function sum at 0x7f79541cf1e0>, dtype=dtype('int64')), [('arange-148142c4b92299810fc0c0f4549563ac',
    1)], {'axis': (0,), 'keepdims': True})}

Usually the scheduler does its thing without much manual intervention- but this is definitely one of those "it works fine until it doesn't" kind of libraries.

In [24]:
foo[1:4]

array([1, 2, 3])

In [25]:
bar[1:4].compute()

array([1, 2, 3])

In [26]:
foo + 1

array([ 1,  2,  3,  4,  5,  6,  7,  8,  9, 10])

In [27]:
(bar + 1).compute()

array([ 1,  2,  3,  4,  5,  6,  7,  8,  9, 10])

In [28]:
np.exp(foo)

array([  1.00000000e+00,   2.71828183e+00,   7.38905610e+00,
         2.00855369e+01,   5.45981500e+01,   1.48413159e+02,
         4.03428793e+02,   1.09663316e+03,   2.98095799e+03,
         8.10308393e+03])

In [30]:
dask.array.exp(bar).compute()

array([  1.00000000e+00,   2.71828183e+00,   7.38905610e+00,
         2.00855369e+01,   5.45981500e+01,   1.48413159e+02,
         4.03428793e+02,   1.09663316e+03,   2.98095799e+03,
         8.10308393e+03])

What you **can't** do with `dask.array` is any fancy indexing tricks.

In [43]:
foo[foo[1:6]]

array([1, 2, 3, 4, 5])

In [45]:
bar[bar[1:6]].compute()

ValueError: operands could not be broadcast together with shapes (10,) (5,)

In [46]:
dask.array.arange(1000000000, chunks=1000000).sum().compute()

499999999500000000