# Advanced Collections: _Custom Operations_

In the overview notebook we discussed some of the many algorithms that are pre-defined for different types of Dask collections
(such as Arrays, DataFrames and Bags). These include operations like `mean`, `max`, `value_counts` and many other standard operations.

Sometimes for more niche use-cases, these predefined
algorithms are not sufficient. 

You can wrap arbitrary functions in ``dask.delayed`` to parallelize them,
but when you are operating on a Dask collection or several Dask collections,
``dask.delayed`` won't understand the organization of your blocks. 

In these cases there are several different ways in which you can set up
custom functions.

-   All collections have a ``map_partitions`` or ``map_blocks`` function, that
    applies a user provided function across every Pandas dataframe or NumPy array
    in the collection.  Because Dask collections are made up of normal Python
    objects, it's often straightforward to map custom functions across partitions of a
    dataset without much modification.

-   More complex ``map_*`` functions.  Sometimes your custom behavior isn't
    embarrassingly parallel, but requires more advanced communication.  For
    example maybe you need to communicate a little bit of information from one
    partition to the next, or maybe you want to build a custom aggregation.

    Dask collections include methods for these as well.

-   For even more complex workloads you can convert your collections into
    individual blocks, and arrange those blocks as you like using Dask Delayed.
    There is usually a ``to_delayed`` method on every collection. Note that this
    is often the slowest approach.

**Related Documentation**

  - [Array Tutorial](https://tutorial.dask.org/03_array.html)
  - [Best Practices](https://docs.dask.org/en/latest/best-practices.html#learn-techniques-for-customization)

## Block Computations
Block computations operate on a per-block basis. So each block gets the function applied to it, and the output has the same chunk pattern as the input.

![map_blocks](images/custom_operations_map_blocks.png)


They are simple to think about, but can be tricky to get right.
We'll explore some best practices for setting them up.

**Related Documentation**

   - [`dask.array.map_blocks`](https://docs.dask.org/en/latest/array-api.html?highlight=map_blocks#dask.array.Array.map_blocks)
   - [`dask.dataframe.map_partitions`](http://dask.pydata.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.map_partitions)

### Array

Here is a straightforward case of `map_blocks` on a Dask array.

In [None]:
import dask.array as da

a = da.arange(0, 6, chunks=3)

result = a.map_blocks(lambda x: x ** 2)

When we compute the result, we see that every item in ``a`` is squared.

In [None]:
result.compute()

This is equivalent to:

In [None]:
same = a ** 2
same.compute()

We can use ``.visualize`` to convince ourselves that the task graph for both these
operations has the same structure. Two entirely independent "towers":

In [None]:
result.visualize()

In [None]:
same.visualize()

This function was easy to understand and apply to each block. When the operation is more convoluted it can help to start by writing a function that works as expected on _one block_ before passing it to ``map_blocks``.

Let's write a function that takes the first element of the block and subtracts it from all the following items.

In [None]:
def func(block):
    return block - block[0]

Test the function out on any block of ``a``

In [None]:
func(a.blocks[1]).compute()

> NOTE: Since `a.blocks[1]` is actually a dask.array, you have to call compute to get the real result.

Now pass that function to ``map_blocks``. Notice that each block has the same output: `array([0, 1, 2])` and ``map_blocks`` stacks the outputs so that the chunks are in the same pattern as the input chunks.

In [None]:
da.map_blocks(func, a).compute()

Since our funciton depends on the value of the first item in the any particular chunk, if you rechunk the array and call `map_blocks` on that new array, you'll get a different result:

In [None]:
da.map_blocks(func, a.rechunk(2)).compute()

#### Multiple arrays

Map_blocks can be used to combine several Dask arrays. When multiple arrays are passed, ``map_blocks``
aligns blocks by block positions without regard to shape.

In the following example we have two arrays with the same number of blocks
but with different shape and chunk sizes.

In [None]:
import numpy as np
import dask.array as da

x = da.arange(1000, chunks=(100,))
y = da.arange(100, chunks=(10,))

Let's take a look at these arrays:

In [None]:
x

In [None]:
y

We can pass these arrays into ``map_blocks`` using a funciton that takes two inputs, calculates the max of each, than then returns a numpy array of the ou the puts. 

In [None]:
def func(a, b):
    return np.array([a.max(), b.max()])

result = da.map_blocks(func, x, y, chunks=(2,))
result.compute()

#### Understanding `chunks` argument

In the example above we explicitly declare what the size of the output chunks will be ``chunks=(2,)`` this
is short-hand for ``chunks=((2, 2, 2, 2, 2, 2, 2, 2, 2, 2,),)`` meaning 10 blocks each with shape ``(2,)``.
You can see the shape and chunk information in the array representation:

In [None]:
result

> Specifying the output chunks is very useful when doing more involved operations with ``map_blocks``. If you don't specify ``chunks``, the output will not have the right expectation of the final shape.

#### Special arguments

There are special arguments (``block_info`` and ``block_id``) that you can use within ``map_blocks`` functions. ``block_id`` gives the index of the block within the chunks, so for a 1D array it will be something like `(i,)`. ``block_info`` is a dictionary where there is an integer key for each input dask array and a `None` key for the output array.

Let's use the example above and print ``block_info`` for the first block so that we can get a sense of what information is contained in it:

In [None]:
from pprint import pprint

def func(a, b, block_id=None, block_info=None):
    if block_id == (0,):
        pprint(block_info)
    return np.array([a.max(), b.max()])

da.map_blocks(func, x, y, chunks=(2,)).compute()

One of the use cases for the ``block_info`` and ``block_id`` arguments is to create an array from scratch. In the following example we first create an empty array with the desired number of chunks: 10x10. Then we
fill each chunk with data based on the starting positions of that block.

In [None]:
import numpy as np
import dask.array as da

x = da.empty(100, shape=(10, 10), chunks=(1, 1))

def generate_data(a, block_id=None):
    ii, jj = block_id
    return np.arange(ii*200, (ii+1)*200).reshape((10, 20))

da.map_blocks(generate_data, x, chunks=(10, 20), dtype=int)

This example might feel contrived, but it can be useful when creating custom IO operations
especially in a distributed context.

**Exercise**

Say you have a set of images that each represent a particular portion of a scene. How can you use the
technique we just learned to patch them together? 

Let's look at what is in the puzzle directory:

In [None]:
!ls puzzle

The following cell displays the completed puzzle.

In [None]:
from imageio import imread
import matplotlib.pyplot as plt

image = imread("puzzle/bicycle.png")
plt.imshow(image)

Now use ``map_blocks`` to read in the puzzle pieces from "bicycle_i_j.png". Note that each image piece has 3 dimensions: x, y, and RGBA.

In [None]:
# first set up an empty array with the right shape and one chunk per puzzle piece
a = da.empty(shape=(2, 2, 4), chunks=((1, 1), (1, 1), (4,)))

# define a function that reads one file
def reader(block, block_id=None):
    ... = block_id  # unpack block_id to get chunk index values
    filename = ...  # use chunk index values to get the correct file
    return imread(filename)

# map that function to every block and set the expected dtype of the output
result = da.map_blocks(reader, a, dtype=int)
plt.imshow(result)

In [None]:
# solution
# first set up an empty array with the right shape and one chunk per puzzle piece
a = da.empty(shape=(2, 2, 4), chunks=((1, 1), (1, 1), (4,)))

# define a function that reads one file
def reader(block, block_id=None):
    ii, jj, _ = block_id                        # unpack block_id to get chunk index values
    filename = f"puzzle/bicycle_{ii}_{jj}.png"  # use chunk index values to get the correct file
    return imread(filename)

# map that function to every block and set the expected dtype of the output
result = da.map_blocks(reader, a, dtype=int)
plt.imshow(result)

### DataFrame

In Dask dataframe there is a similar method to ``map_blocks`` but it is called ``map_partitions``.

Here is an example of using it to check if the sum of two columns is greater than some
arbitrary threshold.

In [None]:
import pandas as pd
import dask.dataframe as dd

df = pd.DataFrame({'x': [1, 2, 3, 4, 5],
                   'y': [6.4, 7.7, 8.3, 9.1, 10.2]})
ddf = dd.from_pandas(df, npartitions=2)

result = ddf.map_partitions(lambda df, threshold: (df.x + df.y) > threshold, threshold=9)
result.compute()

#### Internal uses
In practice ``map_partitions`` is used to implement many of the helper dataframe methods
that let Dask dataframe mimic Pandas. Here is the implementation of `ddf.index` for instance:

```python
@property
def index(self):
    """Return dask Index instance"""
    return self.map_partitions(
        getattr,
        "index",
        token=self._name + "-index",
        meta=self._meta.index,
        enforce_metadata=False,
    )
```

[source](https://github.com/dask/dask/blob/09862ed99a02bf3a617ac53b116f9ecf81eea338/dask/dataframe/core.py#L458-L467)


### Understanding `meta`

Dask dataframes and dask arrays have a special attribute called `_meta` that allows them to know metadata about the type of dataframe/array that they represent. This metadata includes:
 - dtype (int, float)
 - column names and order
 - name
 - type (pandas dataframe, cudf dataframe)
 
**Related documentation**

- [Dataframe metadata](https://docs.dask.org/en/latest/dataframe-design.html#metadata)

This information is stored in an empty object of the proper type.

In [None]:
print(ddf._meta)

That's how dask knows what to render when you display a dask object:

In [None]:
print(ddf)

When you add an item to the task graph, Dask tries to run the function on the meta before you call compute. 

This approach has several benefits:

- it gives Dask a sense of what the output will look like. 
- if there are fundamental issues, Dask will fail fast


In [None]:
ddf.sum()

Sometimes running the function on a miniature version of the data doesn't produce a result that is similar enough to your expected output. 

In those cases you can provide a `meta` manually - this will save you the step of running the function on graph creation, so it will be slightly faster.

In [None]:
%%timeit
result = ddf.map_partitions(lambda df, threshold: (df.x + df.y) > threshold, threshold=9, meta=bool)
result.compute()

In [None]:
%%timeit
result = ddf.map_partitions(lambda df, threshold: (df.x + df.y) > threshold, threshold=9)
result.compute()

### When not to use ``map_blocks`` or ``map_partitions``

Both ``map_blocks`` and ``map_partitions`` operate on each block in isolation. This
makes them ill-suited for operations that depend on outcomes in other chunks.
It also means that there will always be at least one result per block.

When you need the edges of one block in the next block you can use overlapping computations.

## Overlapping Computations
Sometimes you want to operate on a per-block basis, but you need some information from neighboring blocks. Example operations include the following:

- Convolve a filter across an image
- Rolling sum/mean/max, …
- Search for image motifs like a Gaussian blob that might span the border of a block
- Evaluate a partial derivative

Dask Array supports these operations by creating a new array where each block is slightly expanded by the borders of its neighbors. 

![](https://docs.dask.org/en/latest/_images/overlapping-neighbors.png)

This costs an excess copy and the communication of many small chunks, but allows localized functions to evaluate in an embarrassingly parallel manner.

**Related Documentation**
   - [Array Overlap](https://docs.dask.org/en/latest/array-overlap.html)

The main API for these computations is the ``map_overlap`` method. ``map_overlap`` is very similar to ``map_blocks`` but has the additional arguments: ``depth``, ``boundary``, and ``trim``.

Here is an example of calculating the derivative:

In [None]:
import numpy as np
import dask.array as da

x = np.array([1, 1, 2, 3, 3, 3, 2, 1, 1])
x = da.from_array(x, chunks=5)

def derivative(x):
    return x - np.roll(x, 1)

y = x.map_overlap(derivative, depth=1, boundary=0)
y.compute()

In this case each block shares 1 value from its neighboring block: ``depth``. And since we set ``boundary=0``on the outer edges of the array, the first and last block are padded with the integer 0. Since we haven't specified ``trim`` it is true by default meaning that the overlap is removed before returning the results.

If you inspect the task graph you'll see two mostly independent towers of tasks, with just some value sharing at the edges.

In [None]:
y.visualize(optimize_graph=True)

**Exercise**

Lets apply a gaussian filter to an image following the example from the [scipy docs](https://docs.scipy.org/doc/scipy/reference/generated/scipy.ndimage.gaussian_filter.html).

First create a dask array from the numpy array:

In [None]:
from scipy import misc
import dask.array as da

a = da.from_array(misc.ascent(), chunks=(128, 128))
a

Now use ``map_overlap`` to apply ``gausian_filter`` to each block.

In [None]:
from scipy.ndimage import gaussian_filter

b = a.map_overlap(gaussian_filter, sigma=5, ...)

In [None]:
# solution
from scipy.ndimage import gaussian_filter

b = a.map_overlap(gaussian_filter, sigma=5, depth=10, boundary="periodic")

Check what you've come up with by plotting the results:

In [None]:
import matplotlib.pyplot as plt

fig, (ax1, ax2) = plt.subplots(1, 2)
ax1.imshow(a)
ax2.imshow(b)
plt.show()

> Notice that if you set the depth to a smaller value, you can see the edges of the blocks in the output image.

## Blockwise Computations

Blockwise computations provide the infrastructure for implementing ``map_blocks`` and many
of the elementwise methods that make up the Array API.

They present a really powerful way of implementing matrix operations.

**Related Documentation**

   - [API Documentation](https://docs.dask.org/en/latest/array-api.html#dask.array.blockwise)
   

Let's look at an example of summing two arrays blockwise. 

In [None]:
import operator
import numpy as np
import dask.array as da

x = da.from_array([[1, 2],
                   [3, 4]], chunks=(1, 2))
y = da.from_array([[10, 20],
                   [0, 0]])
z = da.blockwise(operator.add, 'ij', x, 'ij', y, 'ij')
z.compute()

Try switching the block pattern of the output:

In [None]:
z = da.blockwise(operator.add, 'ji', x, 'ij', y, 'ij')
z.compute()

In each of these case the outcome for each block is the same. But in the first case
the blocks are stacked vertically (`shape=(2, 2)`) and in the second they are placed 
side-by-side (`shape=(1, 4)`) 


![Clockwise custom operations](images/custom_operations_blockwise.png)

### Internal uses

This is the internal definition of transpose on dask.Array. In it you can see that there is a
regular ``np.transpose`` applied within each block and then the blocks are themselves transposed.

```python
def transpose(a, axes=None):
    if axes:
        if len(axes) != a.ndim:
            raise ValueError("axes don't match array")
    else:
        axes = tuple(range(a.ndim))[::-1]
    axes = tuple(d + a.ndim if d < 0 else d for d in axes)
    return blockwise(
        np.transpose, axes, a, tuple(range(a.ndim)), dtype=a.dtype, axes=axes
    )
```

[source](https://github.com/dask/dask/blob/4569b150db36af0aa9d9a8d318b4239a78e2eaec/dask/array/routines.py#L161:L170)

## Reduction
All of the methods that we have covered so far assume that the output array will be similar to the input array. But where dask really shines is with aggregations. Each dask collection has a `reduction` method that is the generalized method that supports operations that reduce the dimensionality of the inputs.

![Custom operations: reduction](images/custom_operations_reduction.png)

**Related Documentation**
   - [`dask.array.reduction`](http://dask.pydata.org/en/latest/array-api.html#dask.dataframe.Array.reduction)
   - [`dask.dataframe.reduction`](http://dask.pydata.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.reduction)

### Internal uses

This is the internal definition of sum on dask.Array. In it you can see that there is a
regular ``np.sum`` applied across each block and then tree-reduced with ``np.sum`` again.

```python
def sum(a, axis=None, dtype=None, keepdims=False, split_every=None, out=None):
    if dtype is None:
        dtype = getattr(np.zeros(1, dtype=a.dtype).sum(), "dtype", object)
    result = reduction(
        a,
        chunk.sum,  # this is just `np.sum`
        chunk.sum,  # this is just `np.sum`
        axis=axis,
        keepdims=keepdims,
        dtype=dtype,
        split_every=split_every,
        out=out,
    )
    return result
```

In this case the  ``chunk``, ``combine`` and ``aggregate`` functions are all the same.

In [None]:
import numpy as np
import dask.array as da

a = da.from_array(np.arange(1000000).reshape(1000, 1000), chunks=(500, 200))
b = a.sum()

By visualizing `b` we can see how the tree reduction works. First the ``chunk`` function is applied to each block, then every 4 chunks are combined using the ``combine`` function. This keeps going until there are only 2 chunks left, then the ``aggregate`` function is used to finish up.

In [None]:
b.visualize()

**Exercise**

See how the graph changes when you set the chunks to `(500, 500)` or `(500, 300)`

### Controlling reduction with kwargs

There are a few handy keyword arguments that you can use to control the shape of the task graph: ``keepdims``, ``out``, ``split_every``...

The most useful of these is ``split_every`` which controls the number of chunk outputs that are used as input to each ``combine`` call. Try setting `split_every` on `a.sum()` like ``a.sum(split_every={0: 2, 1: 5})`` and visualizing the task graph to see the impact.

> **Side note**
>
> You can use reductions to calculate aggregations per-block reduction even if you don't want to combine and aggregate the results of those blocks:
>
> ```python
> da.reduction(a, np.sum, lambda x, **kwargs: x, dtype=int).compute()
> ```

## Groupby Aggregation

There are many standard reductions supported by default on dataframe groupbys. These include methods like `mean, max, min, sum, nunique`. These are easily scaled and parallelized.

**Related Documentation**

   - [DataFrame Groupby](https://docs.dask.org/en/latest/dataframe-groupby.html#aggregate)
   - [Examples](https://examples.dask.org/dataframes/02-groupby.html)

If you are trying to run a custom function on the groups in a groupby it can be tempting to use `.apply` but this is often a poor choice because it requires that the data be shuffled. Instead you should try writing a custom aggregation.

When you use a custom aggregation, each partition is grouped (without being repartitioned) and then the results from each group on each partition are aggregated.

![groupby aggregation](images/custom_operations_groupby_aggregation.png)

In order to do that you need to write three functions:

- `chunk`: operates on the series groupby on each individual partition (`ddf.partitions[0].groupby("name")["x"]`)
- `agg`: operates on the concatenated output from calling chunk on every partition
- `finalize`: operates on the output from calling aggregate - returns one column. This one is actually optional.

Here's an example of a custom aggregation for calculating the mean.

In [None]:
import dask
import dask.dataframe as dd

ddf = dask.datasets.timeseries()

custom_mean = dd.Aggregation(
    'custom_mean',
    lambda s: (s.count(), s.sum()),
    lambda count, sum: (count.sum(), sum.sum()),
    lambda count, sum: sum / count,
)
custom_result = ddf.groupby('name').agg(custom_mean)
custom_result.head()

Here is how it works:

- for every partition (one per day) group by ``name``
- on each of those pandas series groupby objects calculate the `count` and the `sum`
- concatenate every 8 (this is configurable) outputs together
- sum each of these
- finally: divide the `sum` by the `count`

This is equivalent to:

In [None]:
simple_result = ddf.groupby('name').mean()
simple_result.head()

**NOTE**: If you look at the task graph you'll see that the structure of the computation is actually pretty different. That's because `.mean` computes the `sum` and the `count` independently and only combines the values at the end.

In [None]:
custom_result.visualize()

In [None]:
simple_result.visualize()

Similarly you could use apply (**DON'T DO THIS**)

```python
ddf.groupby("name").apply(lambda x: x.mean())
```

This will shuffle the data so that all the data for a particular name is in the same partition. If you call `.compute()` on it you'll notice that it's much slower (about 50x on my computer).

**Exercise**

Write a custom aggregation to calculate the `count` of rows for each `name`.

In [None]:
custom_count = dd.Aggregation(
    'custom_count',
    chunk=...,  # write a function that operates on one chunk and gives the count
    agg=...,    # write a function that operates on the concattenated results from each chunk
)
ddf.groupby('name').agg(custom_count).compute()

In [None]:
# solution
custom_count = dd.Aggregation(
    'custom_ount',
    chunk=lambda s: s.count(),
    agg=lambda counts: counts.sum(),
)
ddf.groupby('name').agg(custom_count).compute()

## When to use which method

In this notebook we've covered several different mechanisms for applying arbitrary functions to the blocks in arrays or dataframes. Here's a brief summary of when you should use these various methods

- `map_block`, `map_partition` - chunk pattern of the input matches the chunk pattern of the output and the function is fully parrallelizable. 
- `map_overlap` - chunk paterns match, but the function is not fully parallelizable (requires input from neighboring chunks).
- `blockwise` - input and output chunks are similar but may be in different orientations.
- `reduction` - dimensionality of output does not necessarily match that of input and function is fully parallelizable.
- `groupby().agg` - when data needs to be aggregated per group (the index of the output will be the group keys).