# [Hands-on] Modular and Pipeline Building for Data Management with Kedro

* online document: [link](https://kedro.readthedocs.io/en/stable/index.html)
* github repository: [link](https://github.com/kedro-org/kedro)

In [1]:
from kedro.io import DataCatalog, MemoryDataSet
from kedro.pipeline import node, pipeline
from kedro.runner import SequentialRunner

  _slash_escape = "\\/" not in _json.dumps("/")
  class JSONEncoder(_json.JSONEncoder):
  class JSONDecoder(_json.JSONDecoder):


## 1. Node

A `node` is a Kedro concept. It is a wrapper for a Python function that names the inputs and outputs of that function. It is the building block of a pipeline. Nodes can be linked when the output of one node is the input of another.

In [2]:
"""Create a node in the pipeline by providing a function to be called
along with variable names for inputs and/or outputs.
    
Args:
    func: A function that corresponds to the node logic. The function
        should have at least one input or output.
    inputs: The name or the list of the names of variables used as inputs
        to the function. The number of names should match the number of
        arguments in the definition of the provided function. When
        Dict[str, str] is provided, variable names will be mapped to
        function argument names.
    outputs: The name or the list of the names of variables used as outputs
        to the function. The number of names should match the number of
        outputs returned by the provided function. When Dict[str, str]
        is provided, variable names will be mapped to the named outputs the
        function returns.
    name: Optional node name to be used when displaying the node in logs or
        any other visualisations.
    tags: Optional set of tags to be applied to the node.
    confirms: Optional name or the list of the names of the datasets
        that should be confirmed. This will result in calling ``confirm()``
        method of the corresponding data set instance. Specified dataset
        names do not necessarily need to be present in the node ``inputs``
        or ``outputs``.
    namespace: Optional node namespace.

Returns:
    A Node object with mapped inputs, outputs and function.

Example:
::
    >>> import pandas as pd
    >>> import numpy as np
    >>>
    >>> def clean_data(cars: pd.DataFrame,
    >>>                boats: pd.DataFrame) -> Dict[str, pd.DataFrame]:
    >>>     return dict(cars_df=cars.dropna(), boats_df=boats.dropna())
    >>>
    >>> def halve_dataframe(data: pd.DataFrame) -> List[pd.DataFrame]:
    >>>     return np.array_split(data, 2)
    >>>
    >>> nodes = [
    >>>     node(clean_data,
    >>>          inputs=['cars2017', 'boats2017'],
    >>>          outputs=dict(cars_df='clean_cars2017',
    >>>                       boats_df='clean_boats2017')),
    >>>     node(halve_dataframe,
    >>>          'clean_cars2017',
    >>>          ['train_cars2017', 'test_cars2017']),
    >>>     node(halve_dataframe,
    >>>          dict(data='clean_boats2017'),
    >>>          ['train_boats2017', 'test_boats2017'])
    >>> ]
"""

# Prepare first node
def greeting_func():
    return "Hello"

greeting_node = node(
    func=greeting_func, inputs=None, outputs="greeting_words"
)

# Prepare second node
def introducing_func(greeting_words):
    return f"{greeting_words}, AIMedic!"

introducing_node = node(
    func=introducing_func, inputs="greeting_words", outputs="message"
)

## 2. Pipeline
A pipeline organises the dependencies and execution order of a collection of nodes, and connects inputs and outputs while keeping your code modular. The pipeline determines the node execution order by resolving dependencies and does not necessarily run the nodes in the order in which they are passed in.

In [3]:
"""Create a ``Pipeline`` from a collection of nodes and/or ``Pipeline``s.
Args:
    pipe: The nodes the ``Pipeline`` will be made of. If you
        provide pipelines among the list of nodes, those pipelines will
        be expanded and all their nodes will become part of this
        new pipeline.
    inputs: A name or collection of input names to be exposed as connection points
        to other pipelines upstream. This is optional; if not provided, the
        pipeline inputs are automatically inferred from the pipeline structure.
        When str or Set[str] is provided, the listed input names will stay
        the same as they are named in the provided pipeline.
        When Dict[str, str] is provided, current input names will be
        mapped to new names.
        Must only refer to the pipeline's free inputs.
    outputs: A name or collection of names to be exposed as connection points
        to other pipelines downstream. This is optional; if not provided, the
        pipeline inputs are automatically inferred from the pipeline structure.
        When str or Set[str] is provided, the listed output names will stay
        the same as they are named in the provided pipeline.
        When Dict[str, str] is provided, current output names will be
        mapped to new names.
        Can refer to both the pipeline's free outputs, as well as
        intermediate results that need to be exposed.
    parameters: A name or collection of parameters to namespace.
        When str or Set[str] are provided, the listed parameter names will stay
        the same as they are named in the provided pipeline.
        When Dict[str, str] is provided, current parameter names will be
        mapped to new names.
        The parameters can be specified without the `params:` prefix.
    tags: Optional set of tags to be applied to all the pipeline nodes.
    namespace: A prefix to give to all dataset names,
        except those explicitly named with the `inputs`/`outputs`
        arguments, and parameter references (`params:` and `parameters`).

Raises:
    ModularPipelineError: When inputs, outputs or parameters are incorrectly
        specified, or they do not exist on the original pipeline.
    ValueError: When underlying pipeline nodes inputs/outputs are not
        any of the expected types (str, dict, list, or None).

Returns:
    A new ``Pipeline`` object.
"""

# Assemble nodes to a pipeline
my_pipeline = pipeline(
    pipe=[greeting_node, introducing_node]
)

## 3. DataCatalog
A `DataCatalog` is a Kedro concept. It is the registry of all data sources that the project can use. It maps the names of node inputs and outputs as keys in a `DataSet`, which is a Kedro class that can be specialised for different types of data storage. Kedro uses a `MemoryDataSet` for data that is simply stored in-memory.

In [4]:
"""``DataCatalog`` stores instances of ``AbstractDataSet``
implementations to provide ``load`` and ``save`` capabilities from
anywhere in the program. To use a ``DataCatalog``, you need to
instantiate it with a dictionary of data sets. Then it will act as a
single point of reference for your calls, relaying load and save
functions to the underlying data sets.

Args:
    data_sets: A dictionary of data set names and data set instances.
    feed_dict: A feed dict with data to be added in memory.
    layers: A dictionary of data set layers. It maps a layer name
        to a set of data set names, according to the
        data engineering convention. For more details, see
        https://kedro.readthedocs.io/en/stable/faq/faq.html#what-is-data-engineering-convention
        
Example:
::
    >>> from kedro.extras.datasets.pandas import CSVDataSet
    >>>
    >>> cars = CSVDataSet(filepath="cars.csv",
    >>>                   load_args=None,
    >>>                   save_args={"index": False})
    >>> io = DataCatalog(data_sets={'cars': cars})
"""

# Prepare a data catalog
INPUT_DATA_HASH = {
    "greeting_words": MemoryDataSet()
}

data_catalog = DataCatalog(
    data_sets=INPUT_DATA_HASH
)

## 4. Runner
The `Runner` is an object that runs the pipeline. Kedro resolves the order in which the nodes are executed:

1. Kedro first executes `greeting_node`. This runs `greeting_func`, which takes no input but outputs the string “Hello”.

2. The output string is stored in the `MemoryDataSet` named `greeting_words`.

3. Kedro then executes the second node, `introducing_node`. This loads the `greeting_words` dataset and injects it into the `introducing_func` function.

4. The function joins the input salutation with “AIMedic!” to form the output string “Hello AIMedic!”

5. The output of the pipeline is returned in a dictionary with key `message`.

In [5]:
"""The method implementing sequential pipeline running.
        
Args:
    pipeline: The ``Pipeline`` to run.
    catalog: The ``DataCatalog`` from which to fetch data.
    hook_manager: The ``PluginManager`` to activate hooks.
    session_id: The id of the session.

Raises:
    Exception: in case of any downstream node failure.
"""

# Create a runner to run the pipeline
runner = SequentialRunner()

# Rune the pipeline
output_data_dict = runner.run(my_pipeline, data_catalog)

In [6]:
# Print outputs of the pipeline
print(output_data_dict)

{'message': 'Hello, AIMedic!'}
