# Tutorial for Pipefunc Package

The `pipefunc` package is a Python library that allows you to define functions as pipelines, with each function providing a single step in the pipeline. In this tutorial, we will explain how to use the package, based on an example notebook.


For the latest documentation, check out [the official documentation](https://pipefunc.readthedocs.io/en/latest/#what-is-this).

## High level overview

1. The pipefunc package allows to create reusable and callable pipelines. 
1. A `Pipeline` contains a list of `PipeFunc` objects.
1. At its core, these `PipeFunc` objects only contain a function and an output name.
1. You can create a `PipeFunc` object directly or using the `@pipefunc` decorator.
1. The `Pipeline` will automatically connect all functions based on the output names and function inputs.



---

## Building a Simple Pipeline

Let's start by importing `pipefunc` and `Pipeline` from the `pipefunc` module.


In [None]:
from pipefunc import Pipeline, pipefunc

We then define some functions using the `@pipefunc` decorator. The `@pipefunc` decorator turns these functions into pipeline steps. For each function, we specify an `output_name` which will be used to refer to the output of that function in the pipeline.


In [None]:
@pipefunc(output_name="c")
def f_c(a, b):
    return a + b


@pipefunc(output_name="d")
def f_d(b, c, x=1):  # "c" is the output of f_c
    return b * c


@pipefunc(output_name="e")
def f_e(c, d, x=1):  # "d" is the output of f_d
    return c * d * x

We now have three functions `f_c`, `f_d`, and `f_e`, which we can use to build a pipeline. Let's create a `Pipeline` object, passing our functions in the order we want them to execute. We can also enable debugging, profiling, and caching for the entire pipeline:


In [None]:
pipeline = Pipeline(
    [f_c, f_d, f_e],
    debug=True,  # optionally print debug information
    profile=True,  # optionally profile the pipeline
    cache_type="hybrid",  # optionally cache the pipeline
)

Now, we have a pipeline that adds two numbers (function `f_c`), multiplies two numbers (function `f_d`), and again multiplies two numbers (function `f_e`).

---

## Visualizing the Pipeline

You can visualize your pipeline using the `visualize()` method, and print the nodes in the graph using the `graph.nodes` attribute.


In [None]:
pipeline.visualize()
print("Graph nodes:", pipeline.graph.nodes)

---

## Using the Pipeline

There are several ways in which you can use the pipeline.

You can:
1. Call the pipeline as a function (***sequentially***) and get a specific output:
  - `pipeline.run(output_name, kwargs)`
  - `pipeline(output_name, **kwargs)`
  - `f = pipeline.func(output_name); f(**kwargs)`
2. Evaluate the entire pipeline (***parallel***) including map-reduce operations:
  - `pipeline.map(kwargs)`

We will first demonstrate how to use the pipeline by calling it as a function and later introduce the more powerful `pipeline.map`.

See [this FAQ section](project:faq.md#run-vs-map) for more information on the difference between `run` and `map`.

### 1. Directly calling the `pipeline` object

If the pipeline has a unique leaf node (single final output), then we can directly call the pipeline object with the input arguments.


In [None]:
pipeline(a=1, b=2)

In [None]:
# The above returns the output for:
pipeline.unique_leaf_node

We can also specify the desired output as the first argument of the pipeline function:


In [None]:
print("`e` is:", pipeline("e", a=1, b=2))
print("`d` is:", pipeline("d", a=1, b=2))

### 3. Using `pipeline.run`

Similar to calling the `pipeline` object, we can use the `run` method to execute the pipeline.

> Note: The `Pipeline.__call__` method is just a wrapper around the `run` method.


In [None]:
print(pipeline.run(output_name="e", kwargs={"a": 1, "b": 2}))

or get _*all*_ function outputs and inputs by specifying `full_output=True`:


In [None]:
print(pipeline.run(output_name="e", kwargs={"a": 1, "b": 2}, full_output=True))

### 4. Get a function handle for a specific output (`pipeline.func`)

We can get a handle for each function using the `func` method on the pipeline, passing the output name of the function we want.


In [None]:
pf_d = pipeline.func("d")
pf_e = pipeline.func("e")

We can now use these handles as if they were the original functions. The pipeline will automatically ensure that the functions are called in the correct order, passing the output of one function as the input to the next.


In [None]:
c = f_c(a=2, b=3)  # call the wrapped function directly
assert c == 5

In [None]:
assert (
    f_d(b=3, c=5)
    == pf_d(a=2, b=3)  # We can call pf_d with different arguments
    == pf_d(b=3, c=5)
    == 15
)
assert pf_e(c=c, d=15, x=1) == pf_e(a=2, b=3, x=1) == pf_e(a=2, b=3, d=15, x=1) == 75

The functions returned by `pipeline.func` have several additional methods

#### Using the call_full_output Method

The `call_full_output()` method can be used to call the function and get all the outputs from the pipeline as a dictionary.


In [None]:
pf_e = pipeline.func("e")
pf_e.call_full_output(a=2, b=3, x=1)

#### Direct Calling with Root Arguments (as positional arguments)

You can directly call the functions in the pipeline with the root arguments using the `call_with_root_args()` method. It automatically executes all the dependencies of the function in the pipeline with the given root arguments.


In [None]:
pf_e = pipeline.func("e")
pf_e.call_with_root_args(1, 2, 1)  # note these are now positional args

This executes the function `f_e` with the root arguments `a=1, b=2, x=1`.

For more information about this method, you can use the Python built-in `help` function or the `?` command.


In [None]:
help(pf_e.call_with_root_args)

This shows the signature and the doc-string of the `call_with_root_args` method.


---

## Function Argument Combinations

As seen above, we can call the functions in the pipeline by either providing the root inputs or by providing the output of the previous function ourselves.

To see all the possible combinations of arguments that can be passed to each function, you can use the `all_arg_combinations` property. This will return a dictionary, with function output names as keys and sets of argument tuples as values.


In [None]:
all_args = pipeline.all_arg_combinations
assert all_args == {
    # means we can call `pipeline("c", a=1, b=2)`
    "c": {("a", "b")},
    # means we can call `pipeline("d", a=1, b=2, x=3)` or `pipeline("d", b=2, c=3, x=4)`
    "d": {("a", "b", "x"), ("b", "c", "x")},
    # means we can call `pipeline("e", a=1, b=2, x=3)` or `pipeline("e", b=2, d=3, x=4)`, etc.
    "e": {("a", "b", "x"), ("a", "b", "d", "x"), ("b", "c", "x"), ("c", "d", "x")},
}
# We can get root arguments for a specific function
assert pipeline.root_args("e") == ("a", "b", "x")

---

## Handling Multiple Outputs

Functions can return multiple results at once. The `output_name` argument allows you to specify multiple outputs by providing a tuple of strings. By default, this assumes the output is a tuple. However, if the output is a single element selected from a tuple, you can use the `output_picker` argument to specify that.


In [None]:
from pipefunc import Pipeline, pipefunc


# Define a function add_ab with multiple outputs, 'c' and 'const'.
@pipefunc(output_name=("c", "const"))
def add_ab(a, b):
    return (a + b, 1)


# Define a function mul_bc with multiple outputs, 'd' and 'e',
# where output_picker is used to select the output.
@pipefunc(
    output_name=("d", "e"),
    output_picker=dict.__getitem__,
)
def mul_bc(b, c, x=1):
    return {"d": b * c, "e": x}


# Define a function calc_cde with multiple outputs, 'g' and 'h',
# where output_picker is used to select the output.
@pipefunc(
    output_name=("g", "h"),
    output_picker=getattr,
)
def calc_cde(c, d, e, x):
    from types import SimpleNamespace

    return SimpleNamespace(g=c * d * x, h=c + e)


# Define a function add_gh with a single output 'i'.
@pipefunc(output_name="i")
def add_gh(h, g):
    return h + g


# Create a pipeline with the defined functions and visualize it.
pipeline_multiple = Pipeline([add_ab, mul_bc, calc_cde, add_gh])
pipeline_multiple.visualize()
final_func = pipeline_multiple.func("i")
final_func(a=1, b=2, x=3)

**(Sneak peak of a later section: simplifying the pipeline)**

The pipeline can be simplified by combining `calc_cde` and `add_gh` into a single pipeline.


In [None]:
simplified_pipeline_multiple = pipeline_multiple.simplified_pipeline("i")
simplified_pipeline_multiple.visualize()

Note that, in the simplified pipeline, the full output of `calc_cde` (i.e., `g, h`) is not available.


In [None]:
# If the full output of calc_cde (g, h) is needed, we can't use the simplified pipeline.
out_full = pipeline_multiple.func("i").call_full_output(a=1, b=2, x=3)
out_full_red = simplified_pipeline_multiple.func("i").call_full_output(a=1, b=2, x=3)
print(f"Full output of f_e:\n{out_full}")
print(f"Full output of f_e after simplification:\n{out_full_red}")

---

## Using the `renames` Feature

The `renames` attribute in `pipefunc` allows you to rename the inputs and outputs of a function before passing them to the next step in the pipeline.
This can be particularly useful when the same function is used multiple times in a pipeline, or when you want to provide more meaningful names to the inputs and outputs.

### Example: Renaming Parameters

In this example, we demonstrate how to use the `renames` attribute to rename the inputs of a function before they are passed to the next step in the pipeline.

> ⚠️ Instead of using the `@pipefunc` decorator (which creates `pipefunc.PipeFunc` object), we will create `PipeFunc` objects directly and specify the `renames` attribute.


In [None]:
from pipefunc import PipeFunc, Pipeline


def prod(a, b):
    return a * b


def subtract(a, b):
    return a - b


# We're going to use these functions multiple times in the pipeline
pipeline_renames = Pipeline(
    [
        PipeFunc(prod, output_name="prod1"),
        PipeFunc(prod, output_name="prod2", renames={"a": "x", "b": "y"}),
        PipeFunc(subtract, output_name="delta1", renames={"a": "prod1", "b": "prod2"}),
        PipeFunc(subtract, output_name="delta2", renames={"a": "prod2", "b": "prod1"}),
        PipeFunc(prod, output_name="result", renames={"a": "delta1", "b": "delta2"}),
    ],
)

inputs = {"a": 1, "b": 2, "x": 3, "y": 4}
results = pipeline_renames("result", **inputs)

# Output the results
print("Results:", results)

pipeline_renames.visualize()

#### Explanation

1. **Function Definitions**:

   - `prod(a, b)`: Multiples two numbers and returns the result.
   - `subtract(a, b)`: Subtracts `b` from `a` and returns the result.

2. **Pipeline Construction**:

   - **Step 1**: `PipeFunc(prod, output_name="prod1")`
     - Multiples inputs `a` and `b` and stores the result as `prod1`.
   - **Step 2**: `PipeFunc(prod, output_name="prod2", renames={"a": "x", "b": 'y'})`
     - Multiples inputs `x` and `y` and stores the result as `prod2`.
   - **Step 3**: `PipeFunc(subtract, output_name="delta1", renames={"a": "prod1", "b": "prod2"})`
     - Subtracts `prod2` from `prod1` and stores the result as `delta1`.
   - **Step 4**: `PipeFunc(subtract, output_name="delta2", renames={"a": "prod2", "b": "prod1"})`
     - Subtracts `prod1` from `prod2` and stores the result as `delta2`.
   - **Step 5**: `PipeFunc(prod, output_name="result", renames={"a": "delta1", "b": "delta2"})`
     - Multiples `delta1` and `delta2` and stores the result as `result`.

3. **Inputs and Execution**:

   - The inputs `{"a": 1, "b": 2, "x": 3, "y": 4}` are provided to the pipeline.
   - The pipeline executes in the defined order, renaming inputs and outputs as specified.
   - The final result is computed and returned as `result`.

4. **Output**:
   - The final output of the pipeline is printed, showing the computed `result`.

By using the `renames` attribute, this example demonstrates how to manage and distinguish between different uses of the same function within a pipeline, ensuring the correct values are processed at each step.


One can also apply the renames afterwards using the `update_renames` method. Or even to the entire pipeline, like:

In [None]:
pipeline_renames2 = pipeline_renames.copy()
pipeline_renames2.update_renames(
    {
        "a": "aa",
        "b": "bb",
        "x": "xx",
        "y": "yy",
        "result": "final_result",  # Rename the `output_name` of the last function
    },
    update_from="current",  # update from the current renames, not the original
)
pipeline_renames2(aa=1, bb=2, xx=3, yy=4)

Also check out these `Pipeline` methods:

- `Pipeline.update_defaults`
- `Pipeline.update_bound`

and these `PipeFunc` methods:

- `PipeFunc.update_renames`
- `PipeFunc.update_defaults`
- `PipeFunc.update_bound`

---

## Combine pipelines

Different pipelines can be combined into a single pipeline using the `Pipeline.combine` method or the `|` operator.

In cases the output names and arugments do not match up, we can rename the parameters of an entire pipeline using the `update_renames` method.

In [None]:
from pipefunc import Pipeline, pipefunc


@pipefunc(output_name="c")
def f(a, b):
    return a + b


@pipefunc(output_name="d")
def g(b, c, x=1):
    return b + c + x


pl1 = Pipeline([f, g])


@pipefunc(output_name="e")
def h(cc, dd, xx=2):
    return cc + dd + xx


pl2 = Pipeline([h])

# We now have two pipelines, `pl1` and `pl2`, that we want to combine
# into a single pipeline. However, they have different inputs and defaults.
# Let's update the renames and defaults of `pl2` to match `pl1`.
pl2_ = pl2.copy()
pl2_.update_renames({"cc": "c", "dd": "d", "xx": "x"})
pl2_.update_defaults({"x": 1})
combined_pipeline = pl1 | pl2_  # or use `pl1.combine(pl2_)`

combined_pipeline.visualize()

# The combined pipeline can now be used as a single pipeline
result = combined_pipeline(a=2, b=3, x=2)
print(result)  # Output: 17

Just to see another quick example of combining pipelines (even though it makes no sense to combine these pipelines):

In [None]:
pipeline_silly = pipeline | pipeline_multiple | pipeline_renames | combined_pipeline
pipeline_silly.visualize()
# e.g., if we want to get the output of `g` in the `pipeline` (not the leaf node!):
pipeline_silly("g", a=1, b=2, x=3)


---

## Simplifying Pipelines

Consider the following pipeline (look at the `visualize()` output to see the structure of the pipeline):


In [None]:
from pipefunc import Pipeline


def f1(a, b, c, d):
    return a + b + c + d


def f2(a, b, e):
    return a + b + e


def f3(a, b, f1):
    return a + b + f1


def f4(f1, f3):
    return f1 + f3


def f5(f1, f4):
    return f1 + f4


def f6(b, f5):
    return b + f5


def f7(a, f2, f6):
    return a + f2 + f6


# If the functions are not decorated with @pipefunc,
# they will be wrapped and the output_name will be the function name
pipeline_complex = Pipeline([f1, f2, f3, f4, f5, f6, f7])
pipeline_complex("f7", a=1, b=2, c=3, d=4, e=5)
pipeline_complex.visualize(color_combinable=True)  # combinable functions have the same color

In the example code above, the complex pipeline composed of multiple functions (`f1`, `f2`, `f3`, `f4`, `f5`, `f6`, `f7`) can be simplified by merging the nodes `f1`, `f3`, `f4`, `f5`, `f6` into a single node.
This merging process simplifies the pipeline and allows to reduce the number of functions that need to be cached/saved.

The method `reduced_pipeline` from the `Pipeline` class is used to generate this simplified version of the pipeline.


In [None]:
simplified_pipeline_complex = pipeline_complex.simplified_pipeline("f7")
simplified_pipeline_complex.visualize()  # A `NestedPipeFunc` will have a red edge

However, simplifying a pipeline comes with a trade-off. The simplification process removes intermediate nodes that may be necessary for debugging or inspection.

For instance, if a developer wants to monitor the output of `f3` while processing the pipeline, they would not be able to do so in the simplified pipeline as `f3` has been merged into a single node. Hence, while a simplified pipeline can speed up the computation, it may limit the ability to examine intermediate computations.


The simplified pipeline now contains a `NestedPipeFunc` object, which is a subclass of `PipeFunc` but contains an internal pipeline.

In [None]:
simplified_pipeline_complex.functions

In [None]:
nested_func = simplified_pipeline_complex.functions[-1]
print(f"{nested_func.parameters=}, {nested_func.output_name=}, {nested_func(a=1, b=2, c=3, d=4)=}")
nested_func.pipeline.visualize()

### Another graph simplification example


In [None]:
from pipefunc import Pipeline, pipefunc


@pipefunc(output_name=("d", "e"))
def calc_de(b, g, x=1):
    pass


@pipefunc(output_name=("g", "h"))
def calc_gh(a, x=1):
    pass


@pipefunc(output_name="gg")
def calc_gg(g):
    pass


@pipefunc(output_name="i")
def calc_i(gg, b, e):
    pass


# Create a pipeline with the defined functions and visualize it
pipe3 = Pipeline([calc_de, calc_gh, calc_i, calc_gg])
pipe3.visualize(color_combinable=True)
pipe3.simplified_pipeline("i").visualize()

We can also manually nest functions in the pipeline by either creating a `NestedPipeFunc` object or using the `pipeline.nest_funcs` method.

In [None]:
pipe3_manual = pipe3.copy()
# combine the nodes manually
nested_func = pipe3_manual.nest_funcs({"gg", "i", "d"})  # returns the new `NestedPipeFunc`
pipe3_manual.visualize()

In [None]:
# The returned `NestedPipeFunc` object contains an internal pipeline
nested_func.pipeline.visualize()

---

## Working with Resources Report

The `print_profiling_stats()` method of the `pipeline` provides useful information on the performance of the functions in the pipeline such as CPU usage, memory usage, average time, and the number of times each function was called. This feature is only available if `profile=True` when creating the pipeline.


In [None]:
# This will print the number of times each function was called
# CPU, memory, and time usage is also reported
pipeline.print_profiling_stats()

This report can be beneficial in performance tuning and identifying bottlenecks in your pipeline. You can identify which functions are consuming the most resources and adjust your pipeline accordingly.

You can also look all the stats directly with:


In [None]:
pipeline.profiling_stats

---

## Running Map-Reduce Pipelines using `MapSpec`

This section shows how `pipefunc` uses `mapspec` to automate data distribution across Map-Reduce tasks.

> **Note:** ⚠️ The mapping computation of the pipeline is done in parallel using the `concurrent.futures.ProcessPoolExecutor` whenever `pipeline.map(..., parallel=True)` (default).

### Example: Simple reduction pipeline

The script below demonstrates a two-step pipeline: doubling each integer in an input list, followed by summing all the doubled values.


In [None]:
import numpy as np

from pipefunc import Pipeline, pipefunc
from pipefunc.map import load_outputs, load_xarray_dataset
from pipefunc.typing import Array


@pipefunc(output_name="y", mapspec="x[i] -> y[i]")
def double_it(x: int) -> int:
    assert isinstance(x, int)
    return 2 * x


@pipefunc(output_name="sum")
def take_sum(y: Array[int]) -> int:
    assert isinstance(y, np.ndarray)
    return sum(y)


pipeline_map = Pipeline([double_it, take_sum])

inputs = {"x": [0, 1, 2, 3]}
run_folder = "my_run_folder"
results = pipeline_map.map(inputs, run_folder=run_folder)

# Check the results in the resulting dict
assert results["y"].output.tolist() == [0, 2, 4, 6]
assert results["sum"].output == 12

# Or load the outputs from disk
assert load_outputs("y", run_folder=run_folder).tolist() == [0, 2, 4, 6]
assert load_outputs("sum", run_folder=run_folder) == 12

# Or also load from disk but as an `xarray.Dataset:
load_xarray_dataset(run_folder=run_folder)

#### Explanation

1. **Using `mapspec` for Data Distribution**:

   - The `mapspec` attribute in the `@pipefunc` decorator defines how data is distributed across computations. In `double_it`, `mapspec="x[i] -> y[i]"` specifies that each element `i` of the input array `x` is independently processed to produce the corresponding element `i` in the output array `y`. Because `take_sum` does not have a `mapspec`, it receives the entire array `y` for aggregation.

2. **Function Definitions**:

   - `double_it`: Doubles each integer, demonstrating a simple stateless operation that can be easily parallelized.
   - `take_sum`: Aggregates all elements of the resulting array into a single sum, serving as the reduce step in this Map-Reduce example.

3. **Pipeline Execution**:

   - The `Pipeline.map` method executes the pipeline with specific inputs and a directory for temporary run files, showcasing how PipeFunc manages data flow and execution state across the pipeline. Alternatively, use the `pipefunc.map.run` function.

4. **Results Verification**:

   - Assertions check that the final sum of doubled numbers is correct, ensuring both the integrity and correctness of the pipeline's execution.

5. **Output Retrieval**:
   - The `load_outputs` function demonstrates how to retrieve and verify results post-computation, confirming the output is stored and accessible as expected.

### Example: Cross-product of inputs and multidiagonal aggregation

This example shows how to compute the outer product of two input vectors (`x` and `y`) and then aggregate the resulting matrix along rows, and finally reduce the computation to a single `float` by taking the `norm` of the resulting `aggregated` vector.


In [None]:
from pipefunc import Pipeline, pipefunc


@pipefunc(output_name="z", mapspec="x[i], y[j] -> z[i, j]")
def multiply_elements(x: int, y: int) -> int:
    """Multiply two integers."""
    return x * y


@pipefunc(output_name="aggregated", mapspec="z[i, :] -> aggregated[i]")
def aggregate_rows(z: np.ndarray) -> np.ndarray:
    """Sum the elements of each row in matrix z."""
    return np.sum(z)


@pipefunc(output_name="norm")
def compute_norm(aggregated: np.ndarray) -> float:
    """Compute the Euclidean norm of the vector aggregated."""
    return np.linalg.norm(aggregated)


pipeline_norm = Pipeline([multiply_elements, aggregate_rows, compute_norm])
inputs = {"x": [1, 2, 3], "y": [4, 5, 6]}
results = pipeline_norm.map(inputs, run_folder="my_run_folder")
print("Norm of the aggregated sums:", results["norm"].output)

#### Explanation

1. **Matrix Creation (`multiply_elements`)**:

   - Each combination of elements from arrays `x` and `y` is multiplied to form the matrix `z`. The `mapspec` `"x[i], y[j] -> z[i, j]"` ensures that every pair of elements is processed to generate a 2D matrix.

2. **Row Aggregation (`aggregate_rows`)**:

   - The matrix `z` is then processed row by row to sum the values, creating an aggregated result for each row. The `mapspec` `"z[i, :] -> aggregated[i]"` directs the pipeline to apply the summation across each row, transforming a 2D array into a 1D array of row sums.

3. **Vector Norm Calculation (`compute_norm`)**:
   - Finally, the norm of the aggregated vector is computed, providing a single scalar value that quantifies the magnitude of the vector formed from row sums. This step does not require a `mapspec` as it takes the entire output from the previous step and produces a single output.

### Example: Handling Dynamic Output Shapes

It is not always possible to predict output shapes directly from the input parameters. For instance, when a function generates a list whose length depends on the runtime value of an input, PipeFunc cannot automatically determine the output dimensions. To manage such cases, developers must manually specify these output shapes.

We also show an alternative way of specifying the `mapspec` when creating the `Pipeline` object by passing `tuple`s to the `Pipeline` constructor.


In [None]:
from pipefunc import Pipeline, pipefunc
from pipefunc.typing import Array


@pipefunc(output_name="x")
def generate_ints(n: int) -> list[int]:
    """Generate a list of integers from 0 to n-1."""
    return list(range(n))


@pipefunc(output_name="y")
def double_it(x: int) -> int:
    """Double the input integer."""
    return 2 * x


@pipefunc(output_name="sum")
def take_sum(y: Array[int]) -> int:
    """Sum a list of integers."""
    return sum(y)


# One is able to specify the mapspec when creating the Pipeline object
# by passing a tuple of the function and the mapspec string.
pipeline_sum = Pipeline(
    [
        generate_ints,
        (double_it, "x[i] -> y[i]"),  # Apply doubling to each element in the list
        take_sum,
    ],
)


inputs = {"n": 4}
internal_shapes = {"x": (4,)}  # Explicitly specify the shape of 'x' output from generate_ints

# Run the pipeline
results = pipeline_sum.map(inputs, internal_shapes=internal_shapes, run_folder="my_run_folder")
print("Sum of doubled integers:", results["sum"].output)

This example demonstrates explicitly specifying output shapes via `internal_shapes` in scenarios where the output size is dynamically determined by input values.

### Example: Zipped Inputs and Pairwise Computation

This pipeline processes zipped inputs `x` and `y` with independent `z` to compute a function across all combinations, producing a 2D matrix `r`.


In [None]:
from pipefunc import Pipeline, pipefunc


@pipefunc(output_name="r")
def process_elements(x: int, y: int, z: int) -> float:
    return x * y + z


pipeline_proc = Pipeline([(process_elements, "x[a], y[a], z[b] -> r[a, b]")])

inputs = {"x": [1, 2, 3], "y": [4, 5, 6], "z": [7, 8]}

results = pipeline_proc.map(inputs, run_folder="my_run_folder")
output_matrix = results["r"].output
print("Output Matrix:\n", output_matrix)

#### Explanation

- **Function `process_elements`**:

  - Takes three inputs: `x`, `y`, and `z`. For each pair `(x[a], y[a])`, the function is applied with each `z[b]`.

- **Pipeline Definition**:

  - The `mapspec` `"x[a], y[a], z[b] -> r[a, b]"` specifies how elements from the inputs are to be combined. It states that each element from the paired inputs `x` and `y` (indexed by `a`) should be processed with each element from `z` (indexed by `b`), resulting in a 2D output array `r`.

- **Outputs**:
  - The output `r` is a 2-dimensional matrix where the dimensions are determined by the lengths of `x`/`y` and `z`. Each element of this matrix represents the computation result for a specific combination of inputs.


### Example: Physics based example

This example demonstrates using the `pipefunc` for a physics-based simulation. The goal is to create a pipeline for geometry creation, meshing, material assignment, and electrostatics calculations, culminating in computing the average charge.

> Note: this example is based on the [`aiida-dynamic-workflows` tutorial](https://github.com/microsoft/aiida-dynamic-workflows/blob/4d452ed3be4192dc5b2c8f40690f82c3afcaa7a8/examples/02-workflows.md).

We start with defining a few `dataclasses` to represent the geometry, mesh, and material properties. We then define functions to create the geometry, mesh it, assign materials, and calculate the electrostatics.


In [None]:
from dataclasses import dataclass


@dataclass(frozen=True)
class Geometry:
    x: float
    y: float


@dataclass(frozen=True)
class Mesh:
    geometry: Geometry
    mesh_size: float


@dataclass(frozen=True)
class Materials:
    geometry: Geometry
    materials: list[str]


@dataclass(frozen=True)
class Electrostatics:
    mesh: Mesh
    materials: Materials
    voltages: list[float]

In [None]:
import numpy as np

from pipefunc import Pipeline, pipefunc
from pipefunc.map import load_outputs


@pipefunc(output_name="geo")
def make_geometry(x: float, y: float) -> Geometry:
    return Geometry(x, y)


@pipefunc(output_name=("mesh", "coarse_mesh"))
def make_mesh(
    geo: Geometry,
    mesh_size: float,
    coarse_mesh_size: float,
) -> tuple[Mesh, Mesh]:
    return Mesh(geo, mesh_size), Mesh(geo, coarse_mesh_size)


@pipefunc(output_name="materials")
def make_materials(geo: Geometry) -> Materials:
    return Materials(geo, ["i", "j", "c"])


@pipefunc(output_name="electrostatics")
def run_electrostatics(
    mesh: Mesh,
    materials: Materials,
    V_left: float,  # noqa: N803
    V_right: float,  # noqa: N803
) -> Electrostatics:
    return Electrostatics(mesh, materials, [V_left, V_right])


@pipefunc(output_name="charge")
def get_charge(electrostatics: Electrostatics) -> float:
    # obviously not actually the charge; but we should return _some_ number that
    # is "derived" from the electrostatics.
    return sum(electrostatics.voltages)


@pipefunc(output_name="average_charge")
def average_charge(charge: np.ndarray) -> float:
    return np.mean(charge)


pipeline_charge = Pipeline(
    [
        make_geometry,
        make_mesh,
        make_materials,
        (run_electrostatics, "V_left[i], V_right[j] -> electrostatics[i, j]"),
        (get_charge, "electrostatics[i, j] -> charge[i, j]"),
        average_charge,  # this function receives the full 2D array of charges!
    ],
)
pipeline_charge.visualize(figsize=(15, 15))

Let's run the map for some inputs:


In [None]:
inputs = {
    "V_left": np.linspace(0, 2, 3),
    "V_right": np.linspace(-0.5, 0.5, 2),
    "x": 0.1,
    "y": 0.2,
    "mesh_size": 0.01,
    "coarse_mesh_size": 0.05,
}

run_folder = "my_run_folder"
results = pipeline_charge.map(inputs, run_folder=run_folder, parallel=False)
assert results["average_charge"].output == 1.0
assert results["average_charge"].output_name == "average_charge"
assert load_outputs("average_charge", run_folder=run_folder) == 1.0

This example highlighted how to run a simulation that included a map-reduce operation.
Often we want to sweep this over multiple parameters.
You could add all the `mapspec`s required to map an additional parameter.
Alternatively, you can use the `pipeline.add_mapspec_axis` method to add an axis to parameters of the pipeline.

See the example below, where we extend the `mapspec`s.


In [None]:
# Add a cross-product of x and y
pipeline_charge.add_mapspec_axis("x", axis="a")
pipeline_charge.add_mapspec_axis("y", axis="b")

# And also a cross-product of the zipped mesh_size and coarse_mesh_size
pipeline_charge.add_mapspec_axis("mesh_size", axis="c")
pipeline_charge.add_mapspec_axis("coarse_mesh_size", axis="c")

# Finally, the mapspecs become, which shows a 3D array for the `average_charge`:
pipeline_charge.mapspecs_as_strings

In [None]:
pipeline_charge.visualize(figsize=(20, 20))

Let's run it on a 2x2x2 grid of inputs:


In [None]:
inputs = {
    "V_left": np.linspace(0, 2, 3),
    "V_right": np.linspace(-0.5, 0.5, 2),
    "x": np.linspace(0.1, 0.2, 2),
    "y": np.linspace(0.2, 0.3, 2),
    "mesh_size": [0.01, 0.02],
    "coarse_mesh_size": [0.05, 0.06],
}
results = pipeline_charge.map(inputs, run_folder=run_folder, parallel=False)
output = results["average_charge"].output
print(f"Output shape: {output.shape}")
print(f"Output:\n{output}")

We can also load all data as `xarray.Dataset`:


In [None]:
from pipefunc.map import load_xarray_dataset

ds = load_xarray_dataset(run_folder=run_folder)
ds

Or specify the `output_name` to load only specific outputs:


In [None]:
avg_charge = load_xarray_dataset("average_charge", run_folder=run_folder)
avg_charge

Now imagine that the electrostatics object is a very large object that we cannot afford to save and load from disk.
For this purpose there is the `pipfunc.NestedPipeFunc` class that allows to combine multiple functions into a single function. We can then tell it to not return the output of the intermediate functions by specifying which outputs to return.

In [None]:
pipeline_charge2 = pipeline_charge.copy()
nested_func = pipeline_charge2.nest_funcs(
    {"electrostatics", "charge"},
    new_output_name="charge",
    # We can also specify `("charge", "electrostatics")` to get both outputs
)

This `nested_func` contains an internal pipeline:

In [None]:
nested_func.pipeline.visualize()

When visualizing the pipeline, you can see that the `NestedFunc` is shown as a single node.

In [None]:
pipeline_charge2.visualize(figsize=(15, 15))

### Custom parallelism

By default when `pipeline.map(..., parallel=True)` is used, the pipeline is executed in parallel using the `concurrent.futures.ProcessPoolExecutor`. However, you can also specify a custom executor to control the parallelism of the pipeline execution.

It works with any custom executor that has the `concurrent.futures.Executor` interface, so for example it works with:

- `concurrent.futures.ProcessPoolExecutor`
- `concurrent.futures.ThreadPoolExecutor`
- `ipyparallel.Client().executor()`
- `dask.distributed.Client().get_executor()`
- `mpi4py.futures.MPIPoolExecutor()`
- `loky.get_reusable_executor()`

To just change the number of cores while using the default executor, use

In [None]:
import datetime
import time
from concurrent.futures import ProcessPoolExecutor

import numpy as np

from pipefunc import Pipeline, pipefunc


@pipefunc(output_name="double", mapspec="x[i] -> double[i]")
def double_it(x: int) -> int:
    print(f"{datetime.datetime.now()} - Running double_it for x={x}")
    time.sleep(1)
    return 2 * x


@pipefunc(output_name="half", mapspec="x[i] -> half[i]")
def half_it(x: int) -> int:
    print(f"{datetime.datetime.now()} - Running half_it for x={x}")
    time.sleep(1)
    return x // 2


@pipefunc(output_name="sum")
def take_sum(half: np.ndarray, double: np.ndarray) -> int:
    print(f"{datetime.datetime.now()} - Running take_sum")
    return sum(half + double)


pipeline_parallel = Pipeline([double_it, half_it, take_sum])
inputs = {"x": [0, 1, 2, 3]}
run_folder = "my_run_folder"
executor = ProcessPoolExecutor(max_workers=8)  # use 8 processes
results = pipeline_parallel.map(
    inputs,
    run_folder=run_folder,
    parallel=True,
    executor=executor,
    storage="shared_memory_dict",
)
print(results["sum"].output)

> ⚠️ In this pipeline, `double_it` and `half_it` are doubly parallel; both the map is parallel and the two functions are executed at the same time, note the timestamps and the `sleep()` calls.
> See the `visualize()` output to see the structure of the pipeline.

In [None]:
pipeline_parallel.visualize()

### Advanced: [Adaptive Scheduler](https://adaptive-scheduler.readthedocs.io/) integration

PipeFunc can also be used with the `adaptive_scheduler` package to run the pipeline on a cluster. This allows you to run the pipeline on a cluster (e.g., with SLURM) without having to worry about the details of submitting jobs and managing resources.

In [None]:
from concurrent.futures import ProcessPoolExecutor

import numpy as np

from pipefunc import Pipeline, pipefunc
from pipefunc.map.adaptive import create_learners
from pipefunc.resources import Resources


# Pass in a `Resources` object that specifies the resources needed for each function
@pipefunc(output_name="double", mapspec="x[i] -> double[i]", resources=Resources(cpus=5))
def double_it(x: int) -> int:
    return 2 * x


# Or specify the resources as a dictionary
@pipefunc(output_name="half", mapspec="x[i] -> half[i]", resources={"memory": "8GB"})
def half_it(x: int) -> int:
    return x // 2


# Specify delayed resources that are used inside the function; "internal" parallelization
@pipefunc(
    output_name="sum",
    resources=lambda kw: {"cpus": len(kw["half"]), "parallelization_mode": "internal"},
    resources_variable="resources",
)
def take_sum(half: np.ndarray, double: np.ndarray, resources: Resources) -> int:
    with ProcessPoolExecutor(resources.cpus) as executor:
        # Do some printing in parallel (not smart, but just to show the parallelization)
        list(executor.map(print, range(resources.cpus)))
    return sum(half + double)


pipeline_adapt = Pipeline([double_it, half_it, take_sum])

inputs = {"x": [0, 1, 2, 3]}
run_folder = "my_run_folder"
learners_dict = create_learners(
    pipeline_adapt,
    inputs,
    run_folder=run_folder,
    split_independent_axes=True,  # Split up into as many independent jobs as possible
)
kwargs = learners_dict.to_slurm_run(
    returns="kwargs",  # or "run_manager" to return a `adaptive_scheduler.RunManager` object
    default_resources={"cpus": 2, "memory": "8GB"},
)
kwargs

---

## Parallel Execution and Caching

To enable parallel execution, you can use Python's built-in `concurrent.futures.ProcessPoolExecutor`. To enable caching, simply set the `cache` attribute to `True` for each function. This can be useful to avoid recomputing results when calling the same function with the same arguments multiple times.


In [None]:
from concurrent.futures import ProcessPoolExecutor

for f in pipeline.functions:
    # Enable caching for all functions
    # See next section to only cache based on a certain parameter sweep
    f.cache = True

pf_e = pipeline.func("e")
sequence = 10 * [{"a": 2, "b": 3, "x": 1}]
with ProcessPoolExecutor(max_workers=1) as executor:
    results = executor.map(pf_e.call_with_dict, sequence)
    print(list(results))

The cache is populated _**even when using parallel execution**_. To see the cache, you can use the `cache` attribute on the pipeline.

The keys of the cache are always in terms of the root arguments.


In [None]:
print(f"Cache object: {pipeline.cache}")
pipeline.cache.cache

---

## Parameter Sweeps

Parameter sweeps are a technique used in computational simulations to explore the parameter space of a model or system.

In the provided example, the `generate_sweep` method is used to generate a set of combinations of input parameters `a`, `b`, `c`, `d`, and `e` for the function `f7`.
The `generate_sweep` method takes a dictionary of parameters as input and returns a list of dictionaries, where each dictionary represents a combination of parameters.


In [None]:
from pipefunc.sweep import Sweep

combos = {
    "a": [0, 1, 2],
    "b": [0, 1, 2],
    "c": [0, 1, 2],
    "d": [0, 1, 2],
    "e": [0, 1, 2],
}
# This means a Cartesian product of all the values in the lists
# while zipping ("a", "b").
sweep = Sweep(combos, dims=[("a", "b"), "c", "d", "e"])
sweep.list()[:10]  # show the first 10 combinations

The function `set_cache_for_sweep` then enables caching for nodes in the pipeline that are expected to be executed two or more times during the parameter sweep. Caching improves the efficiency of the sweep by storing and reusing results of repeated computations, rather than performing the same computation multiple times.


In [None]:
from pipefunc.sweep import set_cache_for_sweep

set_cache_for_sweep("f7", simplified_pipeline_complex, sweep, min_executions=2, verbose=True)

We can now run the sweep using e.g.,

In [None]:
results = [
    simplified_pipeline_complex.run("f7", kwargs=combo, full_output=True) for combo in sweep.list()
]

In [None]:
import pandas as pd

df = pd.DataFrame(results)
df