# Introduction to Pipeline

Pipeline is a Python package to build complex analysis pipelines with [dask](https://dask.org).

Inspired on spotify's [Luigi](https://github.com/spotify/luigi), Pipeline builds on top of `dask.delayed`, adding save and load instructions to the dask graph.

In contrast to Luigi, Pipeline allows to experiment with the analysis pipeline more interactively, thanks to dask. For instance, within a Jupyter notebook.

At the same time, it can also accelerate experimenting with the analysis by storing expensive intermediate computations to disk, in contrast to plain dask.

In [1]:
from pipeline import Task, Storage, dataclass, dependency, task

## The task decorator

A Task can be created from a function using `task` as a decorator.

If you're familiar with `dask.delayed`, this function is as if it were a wrapper of `dask.delayed(pure=True)`.

In [2]:
@task
def double(x: float) -> float:
    return 2 * x

Calling the function doesn't perform the computation immediately, but returns a `dask.delayed` object:

In [3]:
double(21)

Delayed('double/54abf7066a03bddb650fbb1616e27bc5.cloudpickle.zstandard')

To obtain the result, you must call the `.compute` method.

In [4]:
double(21).compute()

42

## Storing results to disk

So far, a task behaves as a `dask.delayed`. But, additionally, we can easily save and load data to avoid recomputing expensive tasks.

To do that, we first need to create a `Storage` object, which uses any object implementing a `MutableMapping[str, bytes]` interface.

The constructor also accepts a `str`, and in that case builds a `fsspec.FSMap`. Here, we create a `fsspec.FSMap` dict-backed in-memory storage:

In [5]:
storage = Storage("memory://")

To save (and then load) a Task, we must explicitly create it with `save=True`, as we probably don't want to save cheap to compute and expensive to store tasks.

In [6]:
@task(save=True)
def double_with_print(x: float) -> float:
    out = 2 * x
    print(f"Calculating: 2 * {x} = {out}")
    return out

To observe when the function is actually called, we added a `print` statement inside.

In [7]:
my_task = double_with_print(21)

my_task

Delayed('double_with_print/54abf7066a03bddb650fbb1616e27bc5.cloudpickle.zstandard')

Calling the function, creates a `dask.delayed` object, as before.

If we call `.compute()`, we see the print statement and it returns the result.

In [8]:
my_task.compute()

Calculating: 2 * 21 = 42


42

Now, if we call compute inside an storage context manager, that result will be saved, and loaded in future computations.

In [9]:
with storage(save=True):
    result = my_task.compute()

result

Calculating: 2 * 21 = 42


42

When we recompute the task inside the context manager, we obtain the same result, but we see no print statement, as the function wasn't called, but the result retrieved from storage.

In [10]:
with storage(save=True):
    result = my_task.compute()

result

42

Instead, recomputing outside the context manager calls the function.

In [11]:
my_task.compute()

Calculating: 2 * 21 = 42


42

**Important:** functions are expected to be **pure**, that is, that their output only depend on their input parameters, and have no side effects. A function will not be called again when its output is already stored.

Example of non-pure functions:

- **Mutating input**: using in-place operations.
- **Non-deterministic output**: drawing random numbers, or relying on global variables.
- **Side effects:** updating global variables.

We've seen an example of a side effect: the `print` function in the previous example was not called when recomputing the task (inside the storage context manager).

### Combined Storages

There are two ways to combine storages:

#### Chaining storages

To join multiple storages, we can use `Storage.chain` to create a joint `MutableMapping` as follows:

```python
storage_local = Storage("/local_folder")
storage_remote = Storage("ssh://user@server/home/user/remote_folder")
storage_combined = Storage.from_chain(storage_local, storage_remote)

with storage_combined(save=True):
    ...
```

In this case, `storage` will try to load in order, first from `"local_folder"` and then from `"remote_folder"`. If it needs to save a task, it will be saved to the first one (`local_storage`).

*Note: underneath it's just using a `collections.ChainMap` to join them.*

#### Nested storages

Alternatively, we can simply nest the context managers:

```python
with storage_remote(save=True):
    # Tasks computed here load from and save to remote only
    task.compute()

    with storage_local(save=True):
        # Tasks computed here load from local or remote (in that order)
        # and save to local.
        task.compute()

    with storage_local(save=True, nested=False):
        # Tasks computed here ignore remote
        task.compute()

## Dependencies

With the task decorator, we can also set default values as default arguments in the decorated function. But, another feature of `pipeline` is the possibility of declaring default parameters which can be computed from its input parameters in other tasks.

For that, we have to define our task as a subclass of `Task`.

In [12]:
import numpy as np


@dataclass  # optional, but useful for type hints
class NormalizedData(Task):
    data: np.ndarray
    center: dependency[float]
    scale: dependency[float] = 1

    @staticmethod  # optional, it is automatically converted to a staticmethod.
    def run(data, center, scale) -> np.ndarray:
        return (data - center) / scale

    @dependency
    def center(self) -> float:
        return compute_center(self.data)


@task(save=True)
def compute_center(x: np.ndarray) -> float:
    print("Computing center.")
    return np.median(x)

`Task` use the same syntax as dataclasses: we specify its parameters as class annotations. For instance, `data: np.ndarray`.

*Note: in fact, the `Task.__init__` method is built with `dataclass`, and we can decorate the class with `@dataclass` to obtain type hints.*

We can set default values as class attributes, as is the case with `scale = 1`.

We can define *dependencies* as methods without parameters (besides `self`), decorated with `@dependency`, as is the case with `center`.

Finally, we need to define a `run` staticmethod, which will be what the task computes.

*Note: the names of `run`'s parameters must be defined at least as a class annotation, class attribute or dependency method.*

Let's use this task with a sample array:

In [13]:
data = np.array([5, 6, 7, 8])

We can call `NormalizedData` with an explicit value for `center`:

In [14]:
NormalizedData(data, center=0).compute()

array([5., 6., 7., 8.])

or we can leave it unspecified, and it will be computed from the `compute_center` task for the input data:

In [15]:
NormalizedData(data).compute()

Computing center.


array([-1.5, -0.5,  0.5,  1.5])

As the `compute_center` task was created with `save=True`, if we compute `NormalizedData` inside a storage context manager, the `compute_center` intermediate result will be saved.

In [16]:
with storage(save=True):
    result = NormalizedData(data).compute()

result

Computing center.


array([-1.5, -0.5,  0.5,  1.5])

When we recompute `NormalizedData`, we see no `Computing center.` printed.

In [17]:
with storage(save=True):
    result = NormalizedData(data).compute()

result

array([-1.5, -0.5,  0.5,  1.5])

Note that it is also true if we are interactively playing with the `scale` parameter, as it is the `compute_center` task that was stored, and it only depended on the input data.

In [18]:
with storage(save=True):
    result = NormalizedData(data, scale=0.5).compute()

result

array([-3., -1.,  1.,  3.])

## Encoding and decoding

To store data, results need to be encoded into a bytes representation. By default, Task encodes data by serializing with `cloudpickle` and compressing with `zstandard`.

But, `cloudpickle` is not appropriate for long-term storage, as it depends on the Python version used. We can easily customize the encoding by passing another serializer that implements the `dumps` and `loads` functions. To demonstrate this, we will use `json` and `yaml` from the [serialize](https://github.com/hgrecco/serialize) project.

In [19]:
from serialize import json, yaml


First, we create an in-memory `Storage`, and a function which outputs a `dict`.

In [20]:
storage = Storage("memory://")
storage.fs.clear()  # it is reuses the previous in-memory storage, so we clear it.


def point_as_dict(x, y):
    return dict(x=x, y=y)

Then, we create 3 Task variants from that function, with different serializers, and no compression:

In [21]:
# we set compressor=None to see that the bytes correspond to json/yaml serialized data.
as_default = task(point_as_dict, save=True, compressor=None)
as_json = task(point_as_dict, save=True, serializer=json, compressor=None)
as_yaml = task(point_as_dict, save=True, serializer=yaml, compressor=None)

with storage(save=True):
    as_default(x=1, y=2).compute()
    as_json(x=1, y=2).compute()
    as_yaml(x=1, y=2).compute()

And print the underlying "files" to see that it was properly serialized.

In [22]:
for k, v in storage.fs.items():
    print(f"{k:<60} {v}")

point_as_dict/85985a7bd912508f89c96f0ff61e15a6.cloudpickle   b'\x80\x05\x95\x11\x00\x00\x00\x00\x00\x00\x00}\x94(\x8c\x01x\x94K\x01\x8c\x01y\x94K\x02u.'
point_as_dict/85985a7bd912508f89c96f0ff61e15a6.json          b'{"x": 1, "y": 2}'
point_as_dict/85985a7bd912508f89c96f0ff61e15a6.yaml          b'x: 1\ny: 2\n'


### Custom encoding

To customize the encoding, the `Task` class has the following attributes:

```python
class Task:
    encoders: Optional[tuple[Encoder]] = None
    serializer: Optional[Serializer] = cloudpickle
    compressor: Optional[Compressor] = zstandard
    encrypter: Optional[Encrypter] = None
```

where

| type       | implements           |
|------------|----------------------|
| Encoder    | encode, decode       |
| Serializer | dumps, loads         |
| Compressor | compress, decompress |
| Encrypter  | encrypt, decrypt     |

and implements the default encoding of the output of `Task.run` as the following transformations:

`encoders[0] -> ... -> encoder[-1] > serializer -> compressor -> encrypter`

Most serializers, compressors and encrypters already implement this interface, so you can simply import a module/class/object and use it.

### Even more custom encoding

Before serializing, it might be useful to apply certain transformations to the result.

For instance, `encoders` can be a tuple of [`numcodecs`](https://numcodecs.readthedocs.io/en/stable/) filters.

But if no pre-built encoder exists, we can also customize the encoding further by overriding the `Task.encode` and `Task.decode` methods:

In [23]:
class point_as_dict_2(Task):
    x: float
    y: float

    run = point_as_dict
    save = True
    serializer = json
    compressor = None

    def encode(self, x):
        """Transform the dict to a tuple before calling encode."""
        x = (x["x"], x["y"])
        return super().encode(x)

    def decode(self, x):
        """Transform the tuple back to a dict."""
        x = super().decode(x)
        return {"x": x[0], "y": x[1]}


with storage(save=True):
    point_as_dict_2(x=1, y=2).compute()

for k, v in storage.fs.items():
    print(f"{k:<60} {v}")

point_as_dict/85985a7bd912508f89c96f0ff61e15a6.cloudpickle   b'\x80\x05\x95\x11\x00\x00\x00\x00\x00\x00\x00}\x94(\x8c\x01x\x94K\x01\x8c\x01y\x94K\x02u.'
point_as_dict/85985a7bd912508f89c96f0ff61e15a6.json          b'{"x": 1, "y": 2}'
point_as_dict/85985a7bd912508f89c96f0ff61e15a6.yaml          b'x: 1\ny: 2\n'
point_as_dict_2/85985a7bd912508f89c96f0ff61e15a6.json        b'[1, 2]'


where we called `super().encode` to continue with the default encoding after transforming the dict to a tuple.

Note: we had to change the Task name, adding a `_2` at the end. Otherwise, it would have collided with the previous `point_as_dict` serialized as JSON, as they had the same input parameters to run, producing the same hash `859...5a6`, and the same extension `.json`. We could also have assigned different names to the previous tasks using the name parameter of the `task` function: `task(as_dict, name="foo")`.

## Common pitfalls

The result of a Task is saved according to a hash of its run arguments. Although two computations might produce the same result, they might be constructed differently and not loaded from storage. Let's see an example:

In [24]:
storage = Storage("memory://")
storage.fs.clear()  # it is reuses the previous in-memory storage, so we clear it.

We will create a task to load an "image" and then another to compute its minimum.

In [25]:
@task
def load_image(file: str) -> np.ndarray:
    return np.array([1, 2, 3])


@task(save=True, serializer=json, compressor=None)
def compute_min(image: np.ndarray) -> float:
    print("Computing min for", image)
    return int((image).min())


with storage(save=True):
    # We create a task to load an image (but didn't load it yet)
    # and pass it to the compute_min task, and compute it
    image1 = load_image("image")
    min_task1 = compute_min(image1)
    min_task1.compute()

    # Now, we actually load the image before passing it to
    # compute_min. The input is now a np.ndarray instead of
    # dask.delayed
    image2 = image1.compute()
    min_task2 = compute_min(image2)
    min_task2.compute()

    # Here, even-though it will load the same array, it has
    # a different argument, and represents a different image.
    # Storage doesn't know a priori that it will result in
    # the same argument.
    image3 = load_image("image2")
    min_task3 = compute_min(image3)
    min_task3.compute()

    # Now, this was already computed before, and will be
    # retrieved from storage
    image4 = image3.compute()
    min_task4 = compute_min(image4)
    min_task4.compute()

Computing min for [1 2 3]
Computing min for [1 2 3]
Computing min for [1 2 3]


As we can see, it was computed 3 times, even-though the actual computation received the same argument.

To understand it, it can be useful to see the inputs that each `compute_min` received:

In [26]:
print('image1 = load_image("image")  -> ', image1)
print("image2 = image1.compute()     -> ", image2)
print('image3 = load_image("image2") -> ', image3)
print("image4 = image3.compute()     -> ", image4)

image1 = load_image("image")  ->  Delayed('load_image/7fa16d227d7f86a4251029b028e07189.cloudpickle.zstandard')
image2 = image1.compute()     ->  [1 2 3]
image3 = load_image("image2") ->  Delayed('load_image/623caa47f757c5946eb49415780e5565.cloudpickle.zstandard')
image4 = image3.compute()     ->  [1 2 3]


Even-though `image1` and `image3` will compute to the same, before computing each task looks different, because they have a different (and irrelevant in this example) argument.

Hence, when passed to `compute_min`, it couldn't be recognized as the same task.

In [27]:
print("min_task1 = compute_min(image1)  -> ", min_task1)
print("min_task2 = compute_min(image2)  -> ", min_task2)
print("min_task3 = compute_min(image3)  -> ", min_task3)
print("min_task4 = compute_min(image4)  -> ", min_task4)

min_task1 = compute_min(image1)  ->  Delayed('compute_min/9d19fc947e66b340c9d5d676ca71544d.json')
min_task2 = compute_min(image2)  ->  Delayed('compute_min/6df7d63420af1411f533b4050768e4a1.json')
min_task3 = compute_min(image3)  ->  Delayed('compute_min/97f86c151b43416b51ca653094a8b6da.json')
min_task4 = compute_min(image4)  ->  Delayed('compute_min/6df7d63420af1411f533b4050768e4a1.json')


In contrast, `min_task2` and `min_task4` received the same input, they ended up with the same hash, and `min_task4` was loaded from storage.

We can check the storage to see that only 3 tasks were saved:

In [28]:
for k, v in storage.fs.items():
    print(f"{k:<60} {v}")

compute_min/6df7d63420af1411f533b4050768e4a1.json            b'1'
compute_min/97f86c151b43416b51ca653094a8b6da.json            b'1'
compute_min/9d19fc947e66b340c9d5d676ca71544d.json            b'1'


**Important**: this example might suggest that it is wise to call `.compute` on task arguments beforehand, but it doesn't!.

When trying to recompute:

```python
image = load_image("image")
compute_min(image).compute()
```

on the future, we won't have to actually load the image. Instead, we would have to load the image if we did:

```python
image = load_image("image").compute()
compute_min(image).compute()
```

## Read more at dask.org

As `Task` works on top of `dask.delayed`, it is useful to check out dask's documentation:

- Delayed: https://docs.dask.org/en/stable/delayed.html
- Delayed Collections: https://docs.dask.org/en/stable/delayed-collections.html
- Delayed Best Practices: https://docs.dask.org/en/stable/delayed-best-practices.html
- General Best Practices: https://docs.dask.org/en/stable/best-practices.html

Many tips discussed there apply for `Task` too. In particular, these points, which were discussed before:

- Don’t mutate inputs
- Avoid global state
- Don’t rely on side effects