Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the ability to dynamically create modular pipelines #1993

Closed
jstammers opened this issue Oct 31, 2022 · 3 comments
Closed

Add the ability to dynamically create modular pipelines #1993

jstammers opened this issue Oct 31, 2022 · 3 comments
Labels
Community Issue/PR opened by the open-source community Issue: Feature Request New feature or improvement to existing feature Stage: Technical Design 🎨 Ticket needs to undergo technical design before implementation

Comments

@jstammers
Copy link
Contributor

Description

I have a pipeline which is broadly composed of the following steps

  1. Load and preprocess some data
  2. Apply a function to transform it
  3. Store the transformed data and some additional meta-data

An example of the pipeline is presented below

from typing import List

import pandas as pd
from kedro.pipeline import node, pipeline


def create_data():
    df = pd.DataFrame({"x": [1, 2, 3]})
    return df


def process_data(df: pd.DataFrame, x: int) -> pd.DataFrame:
    df["x"] *= x
    return df


def combine_data(dfs: List[pd.DataFrame]) -> pd.DataFrame:
    return pd.concat(dfs)


def create_pipeline(*args, **kwargs):
    input_pipeline = pipeline(
        [node(create_data, inputs=None, outputs="input_data")]
    )

    process_pipeline = pipeline(
        [
            node(
                process_data,
                inputs=["input_data", "param:x"],
                outputs="processed_data",
            )
        ]
    )

    combine_pipeline = pipeline(
        [node(combine_data, inputs=["processed_data"], outputs="combined_data")]
    )
    
    return input_pipeline + process_pipeline + combine_pipeline

As an extension to the current functionality, I would like to be able to iterate over multiple parameters for process_pipeline and combine the results together at the final stage. Additionally, these loop parameters would be determined as an output from the initial pipeline. Using modular pipelines, I would expect that if it were possible to load the result of an output from a node, then I could compose the pipeline as follows

def create_data():
    loop = [1, 2, 3, 4, 5]
    df = pd.DataFrame({"x": [1, 2, 3]})
    return df, loop


def create_pipeline(*args, **kwargs):
    input_pipeline = pipeline(
        [node(create_data, inputs=None, outputs=["input_data", "loops"])]
    )
    
    process_pipeline = pipeline(
        [
            node(
                process_data,
                inputs=["input_data", "param:loop"],
                outputs="processed_data",
            )
        ]
    )
    

    p = pipeline
    process_outputs = []

    for loop in _load("loops"):
        processor = pipeline(process_pipeline,parameters={"param:loop":loop}, namespace=loop)
        process_outputs .append("__" + str(loop) + "_processed_data") #assumes '__<loop>_processed_data' is added to catalog
        p += processor
   
    combine_pipeline = pipeline(
        [node(combine_data, inputs=[process_outputs ], outputs="combined_data")]
    )
    p += combine_pipeline
                                   
    return p

Context

This change would be useful because it would allow me to extend the use a current production pipeline without requiring additional modifications to the code that is used to execute this pipeline. In order to make use of this dynamic parameter, I currently have to create a separate runner script for this specific pipeline, which inevitably makes it less portable

Possible Alternatives

In order to achieve the desired functionality, I have implemented something similar to the following

from kedro.io import DataCatalog
from kedro.runner import SequentialRunner

catalog = DataCatalog()

pipeline = create_pipeline()
runner = SequentialRunner()

init_pipeline = pipeline.to_outputs(["input_data", "loops"])

init_run = runner.run(init_pipeline, catalog)

loops = init_run["loops"].load()
processed_data = []

for l in loops:
    catalog = DataCatalog({"input_data":init_run["input_data"], "param:loop":MemoryDataSet(l)})
    loop_run = runner.run(pipeline.from_inputs(["input_data_", "param:loop"]).to_outputs("processed_data"), catalog)
    processed_data.append(loop_run["processed_data"].load())

output_run = runner.run(pipeline.from_inputs("processed_data"), DataCatalog({"processed_data":MemoryDataSet(processed_data)})

but this does not give me the full functionality of Kedro. For example, I can't use this to load catalogs from different environments, This script is also very tightly coupled to the pipeline such that if I change the pipeline, I would need to change this as well.
Another option I have come across is to use a custom Runner as described in #1853, but I haven't yet tried to implement this for my pipeline

@jstammers jstammers added the Issue: Feature Request New feature or improvement to existing feature label Oct 31, 2022
@datajoely
Copy link
Contributor

Related discussion by @noklam here #1963

@merelcht merelcht added the Community Issue/PR opened by the open-source community label Jan 26, 2023
@AhdraMeraliQB AhdraMeraliQB added the Stage: Technical Design 🎨 Ticket needs to undergo technical design before implementation label Mar 21, 2023
@noklam noklam mentioned this issue Jun 1, 2023
1 task
@astrojuanlu
Copy link
Member

It's not clear to me what the ask is here. Defining modular pipelines dynamically is already possible with Python code. See for instance https://getindata.com/blog/kedro-dynamic-pipelines/

    for namespace in settings.DYNAMIC_PIPELINES_MAPPING.keys():
        pipes.append(
            pipeline(
                data_processing,
                inputs={
                    "companies": "companies",
                    "shuttles": "shuttles",
                    "reviews": "reviews",
                },
                namespace=namespace,
                tags=settings.DYNAMIC_PIPELINES_MAPPING[namespace],
            )
        )
    return sum(pipes)

What am I missing?

Also, this issue hasn't had activity in one year, I'm voting to close it unless we can clarify the problem.

@merelcht
Copy link
Member

I agree with @astrojuanlu, closing this issue now due to inactivity and it not being entirely clear what the ask is. If you come across this issue and have a similar need, feel free to comment and we can consider re-opening it for further discussion.

@merelcht merelcht closed this as not planned Won't fix, can't repro, duplicate, stale Mar 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Community Issue/PR opened by the open-source community Issue: Feature Request New feature or improvement to existing feature Stage: Technical Design 🎨 Ticket needs to undergo technical design before implementation
Projects
Archived in project
Development

No branches or pull requests

5 participants