---
badges: true
categories:
- python
- kedro
- databricks
description: Kedro Pipeline provides options allow you to slice and compose pipelines effortlessly
toc: true
hide: false
date: '2024-03-06'
---

# Kedro Pipeline (1) - Slicing Pipeline Effortlessly 🍕

In [10]:
from kedro.pipeline import pipeline, node
from kedro.pipeline.node import Node
def foo():
   return "bar"

# Kedro Node and Pipelines

Kedro introduces the concepts of Nodes and Pipelines. A basic understanding of these concepts is assumed. However, if you're unfamiliar, you can refer to the Nodes and Pipelines documentation for more details.

In essence, a Kedro Node acts as a thin wrapper around a Python function, specifying its inputs and outputs. On the other hand, a Pipeline is essentially a collection of Nodes that are strung together. When a pipeline is executed, Kedro resolves the dependencies between nodes to determine the correct order of execution.

While Kedro is primarily designed for data and machine learning applications, it can be utilized for executing any sequential tasks, including parallel processing if needed.

In [23]:
node_a = node(func=foo, inputs=None, outputs="output_a")
first_pipeline = pipeline([])
node_a, first_pipeline

(Node(foo, None, 'output_a', None), Pipeline([]))

`pipeline` is a factory method that expects a list of `Node` and produce the `Pipeline` object. In this example, we have an empty `Pipeline`. Below is another valid example:

In [26]:
pipeline([node_a])

Pipeline([
Node(foo, None, 'output_a', None)
])

## Node Uniqueness

The pipeline in Kedro automatically validates Node instances. Specifically, nodes cannot produce the same output (though they can share the same input), and there cannot be duplicate nodes within the pipeline. This validation is crucial to ensure that the pipeline forms an executable Directed Acyclic Graph (DAG), allowing for proper execution and preventing any cyclic dependencies.

In [27]:
pipeline([node_a, node_a])

ValueError: Pipeline nodes must have unique names. The following node names appear more than once:

Free nodes:
  - foo(None) -> [output_a]

You can name your nodes using the last argument of 'node()'.

On the other hand, `Node` are considered equal if they have the same `inputs`, `outputs `and `function` (and node name if provided, it is an optional argument)

In [33]:
node_b = node(foo, inputs=None, outputs="output_a")

In [34]:
node_b == node_a

True

Internally, it is comparing the `name` attribute, which is a combination of namespace, function name, inputs and outputs. This is not important to most Kedro users and are only used by Kedro internally.

In [42]:
node_a.name



'foo(None) -> [output_a]'

In [43]:
Node.__str__??

[0;31mSignature:[0m [0mNode[0m[0;34m.[0m[0m__str__[0m[0;34m([0m[0mself[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m Return str(self).
[0;31mSource:[0m   
    [0;32mdef[0m [0m__str__[0m[0;34m([0m[0mself[0m[0;34m)[0m[0;34m:[0m[0;34m[0m
[0;34m[0m        [0;32mdef[0m [0m_set_to_str[0m[0;34m([0m[0mxset[0m[0;34m)[0m[0;34m:[0m[0;34m[0m
[0;34m[0m            [0;32mreturn[0m [0;34mf"[{','.join(xset)}]"[0m[0;34m[0m
[0;34m[0m[0;34m[0m
[0;34m[0m        [0mout_str[0m [0;34m=[0m [0m_set_to_str[0m[0;34m([0m[0mself[0m[0;34m.[0m[0moutputs[0m[0;34m)[0m [0;32mif[0m [0mself[0m[0;34m.[0m[0m_outputs[0m [0;32melse[0m [0;34m"None"[0m[0;34m[0m
[0;34m[0m        [0min_str[0m [0;34m=[0m [0m_set_to_str[0m[0;34m([0m[0mself[0m[0;34m.[0m[0minputs[0m[0;34m)[0m [0;32mif[0m [0mself[0m[0;34m.[0m[0m_inputs[0m [0;32melse[0m [0;34m"None"[0m[0;34m[0m
[0;34m[0m[0;34m[0m
[0;34m[0m      

## Pipeline Arithmetic

The closest analogy to `Pipeline` is the Python `set`. They share simliary characteristics:
- The elements cannot be repeated.
- Pipelines can be added or subtracted

In [53]:
pipeline([node_a]) + pipeline([node_a])

Pipeline([
Node(foo, None, 'output_a', None)
])

In [59]:
a = node(foo, None, "a")
b = node(foo, None, "b")
c = node(foo, None, "c")
d = node(foo, None, "d")

original_set = set(["a","b","c"])
original_pipeline = pipeline([a,b,c])

In [67]:
pipeline([a]) + pipeline([b])

Pipeline([
Node(foo, None, 'a', None),
Node(foo, None, 'b', None)
])

In [69]:
pipeline([a, b]) - pipeline([b])

Pipeline([
Node(foo, None, 'a', None)
])

In [72]:
pipeline([a, b]) - pipeline([a])

Pipeline([
Node(foo, None, 'b', None)
])

In [60]:
original_set| set(["b","c","d"])

{'a', 'b', 'c', 'd'}

In [61]:
pipeline([a,b,c])| pipeline([b,c,d]) # nodes in both pipelines

Pipeline([
Node(foo, None, 'a', None),
Node(foo, None, 'b', None),
Node(foo, None, 'c', None),
Node(foo, None, 'd', None)
])

In [57]:
original_set & set(["b","c","d"])

{'b', 'c'}

In [62]:
pipeline([a,b,c]) & pipeline([b,c,d]) # only nodes that exist in both pipelines

Pipeline([
Node(foo, None, 'b', None),
Node(foo, None, 'c', None)
])

Pipeline arithmetic is more useful for pipeline registration i.e. `pipeline_registry.py`. For example, you can combine your development pipeline and inference pipeline in different way.

In [11]:
def fake_node(name):
    return node(foo, inputs=None, outputs=name, name=name)

# For simplicaition, let's assume each pipeline is just one single node.
spark_pipeline = pipeline([fake_node("spark")])
feature_engineering = pipeline([fake_node("feature_engineering")])
model_training = pipeline([fake_node("model_pipeline")])
inference = pipeline([fake_node("inference")])


With 4 base pipelines, you can combined them in different ways. For example you want a e2e pipeline which add all of them.

In [78]:
e2e = spark_pipeline + feature_engineering + model_training + inference

You can also have a `local` pipeline that skip only the `spark` pipeline.

In [80]:
local = e2e - spark_pipeline
local

Pipeline([
Node(foo, None, 'feature_engineering', 'feature_engineering'),
Node(foo, None, 'inference', 'inference'),
Node(foo, None, 'model_pipeline', 'model_pipeline')
])

## Advance Pipeline Slicing

Kedro provides an [interaction visualisation](https://demo.kedro.org/) that you can play around with, for this post I am gonna stick with the demo project and explains concepts about Pipeline and how you can slice pipeline and compose them.

In [None]:
#hide
%load_ext kedro.ipython
%cd /Users/Nok_Lam_Chan/dev/kedro-viz/demo-project

In [38]:
%reload_kedro /Users/Nok_Lam_Chan/dev/kedro-viz/demo-project

By using the `reload_kedro` inside a notebook, you can access the project `pipelines` object. Let's say I want to filter out the [highlighted pipeline](https://demo.kedro.org/?pipeline_id=__default__&selected_id=04ba733a) like this (click on the "Create Derived Features"):
![Select a node on kedro-viz](kedro-viz-selection.png)


To filter this with the `Pipeline` API, you need two options. `from-nodes`(downstream) and `to-nodes` (upstream).

In [39]:
pipelines.keys()

[1;35mdict_keys[0m[1m([0m[1m[[0m[32m'__default__'[0m, [32m'Data ingestion'[0m, [32m'Modelling stage'[0m, [32m'Feature engineering'[0m, [32m'Reporting stage'[0m, [32m'Pre-modelling'[0m[1m][0m[1m)[0m

In [40]:
full_pipeline


[1;35mPipeline[0m[1m([0m[1m[[0m
[1;35mNode[0m[1m([0mapply_types_to_companies, [32m'companies'[0m, [32m'ingestion.int_typed_companies'[0m, [32m'apply_types_to_companies'[0m[1m)[0m,
[1;35mNode[0m[1m([0mapply_types_to_reviews, [1m[[0m[32m'reviews'[0m, [32m'params:ingestion.typing.reviews.columns_as_floats'[0m[1m][0m, [32m'ingestion.int_typed_reviews'[0m, [32m'apply_types_to_reviews'[0m[1m)[0m,
[1;35mNode[0m[1m([0mapply_types_to_shuttles, [32m'shuttles'[0m, [32m'ingestion.int_typed_shuttles@pandas1'[0m, [32m'apply_types_to_shuttles'[0m[1m)[0m,
[1;35mNode[0m[1m([0maggregate_company_data, [32m'ingestion.int_typed_companies'[0m, [32m'ingestion.prm_agg_companies'[0m, [32m'company_agg'[0m[1m)[0m,
[1;35mNode[0m[1m([0mcombine_shuttle_level_information, [1m{[0m[32m'shuttles'[0m: [32m'ingestion.int_typed_shuttles@pandas2'[0m, [32m'reviews'[0m: [32m'ingestion.int_typed_reviews'[0m, [32m'companies'[0m: [32m'ingestion.prm_ag

In [42]:
node_name = "feature_engineering.create_derived_features" # make s|apipeline
full_pipeline.filter(from_nodes=[node_name], to_nodes=[node_name])


[1;35mPipeline[0m[1m([0m[1m[[0m
[1;35mNode[0m[1m([0mcreate_derived_features, [1m[[0m[32m'prm_spine_table'[0m, [32m'prm_shuttle_company_reviews'[0m, [32m'params:feature_engineering.feature.derived'[0m[1m][0m, [32m'feature_engineering.feat_derived_features'[0m, [32m'create_derived_features'[0m[1m)[0m
[1m][0m[1m)[0m

This only select one node because by default the `filter` method apply both method as an `and` condition. So we need to apply the `filter` method separately.

In [45]:
full_pipeline.filter(from_nodes=[node_name]) | full_pipeline.filter(to_nodes=[node_name])


[1;35mPipeline[0m[1m([0m[1m[[0m
[1;35mNode[0m[1m([0mapply_types_to_companies, [32m'companies'[0m, [32m'ingestion.int_typed_companies'[0m, [32m'apply_types_to_companies'[0m[1m)[0m,
[1;35mNode[0m[1m([0mapply_types_to_reviews, [1m[[0m[32m'reviews'[0m, [32m'params:ingestion.typing.reviews.columns_as_floats'[0m[1m][0m, [32m'ingestion.int_typed_reviews'[0m, [32m'apply_types_to_reviews'[0m[1m)[0m,
[1;35mNode[0m[1m([0mapply_types_to_shuttles, [32m'shuttles'[0m, [32m'ingestion.int_typed_shuttles@pandas1'[0m, [32m'apply_types_to_shuttles'[0m[1m)[0m,
[1;35mNode[0m[1m([0maggregate_company_data, [32m'ingestion.int_typed_companies'[0m, [32m'ingestion.prm_agg_companies'[0m, [32m'company_agg'[0m[1m)[0m,
[1;35mNode[0m[1m([0mcombine_shuttle_level_information, [1m{[0m[32m'shuttles'[0m: [32m'ingestion.int_typed_shuttles@pandas2'[0m, [32m'reviews'[0m: [32m'ingestion.int_typed_reviews'[0m, [32m'companies'[0m: [32m'ingestion.prm_ag

Now we get the correct filtered pipeline as expected.

## More notes
### The `Pipeline.filter` support `or` operator
While the current filter supports many options, there may be some value to wrap around the Pipeline API to support things like `or`. This is only possible if you use the Python API directly but not CLI (with the example above). maybe something similar to the [Graph Operators in dbt](https://docs.getdbt.com/reference/node-selection/graph-operators).

e.g. 
```
kedro run --select "my_model+"         # select my_model and all children
kedro run --select "+my_model"         # select my_model and all parents
kedro run --select "+my_model+"        # select my_model, and all of its parents and children
```


### Selecting or slicing multiple pipeline with `kedro run`
Since Pipeline API support arithmetic, it would be quite straight forward to support things like `kedro run --pipeline a+b` or `kedro run --pipeline a-b`. Let's have a look what's options are available for the CLI.

In [8]:
!kedro run --help

Usage: kedro run [OPTIONS]

  Run the pipeline.

Options:
  --from-inputs TEXT         A list of dataset names which should be used as a
                             starting point.
  --to-outputs TEXT          A list of dataset names which should be used as
                             an end point.
  --from-nodes TEXT          A list of node names which should be used as a
                             starting point.
  --to-nodes TEXT            A list of node names which should be used as an
                             end point.
  -n, --nodes TEXT           Run only nodes with specified names.
  -r, --runner TEXT          Specify a runner that you want to run the
                             pipeline with. Available runners:
                             'SequentialRunner', 'ParallelRunner' and
                             'ThreadRunner'.
  --async                    Load and save node inputs and outputs
                             asynchronously with threads. If not specified,
  

This is what happen when you do `kedro run -p training -t model_a`, it's a two steps flitering:
1. Apply the `-p` pipeline name to select a key from the pipeline dictionary, it's just `pipelines[pipeline_name]`, note this mean you can only select ONE pipeline at a time.
2. The pipeline is then further filtered with `Pipeline.filter`

In [16]:
from kedro.pipeline.pipeline import Pipeline
Pipeline.filter??

[0;31mSignature:[0m
[0mPipeline[0m[0;34m.[0m[0mfilter[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0mself[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mtags[0m[0;34m:[0m [0;34m'Iterable[str] | None'[0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mfrom_nodes[0m[0;34m:[0m [0;34m'Iterable[str] | None'[0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mto_nodes[0m[0;34m:[0m [0;34m'Iterable[str] | None'[0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mnode_names[0m[0;34m:[0m [0;34m'Iterable[str] | None'[0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mfrom_inputs[0m[0;34m:[0m [0;34m'Iterable[str] | None'[0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mto_outputs[0m[0;34m:[0m [0;34m'Iterable[str] | None'[0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mnode_namespace[0m[0;34m:[0m [0;34m'str | None'[0m [

This means that, if you have tags applied across multiple pipeline, you cannot filter it by tag, unless you apply the filter in the largest pipeline that contains all nodes. What if we can support things like:
`kedro run -p feature+training -t model_a`?