Skip to content

Make Pipelines great again #28

@MarcSkovMadsen

Description

@MarcSkovMadsen

Panel Pipelines: Visual Data Flow with Parameterized Nodes

A next-generation replacement for panel.pipeline.Pipeline, built on ReactFlow.

Why Replace Panel Pipeline?

Panel Pipeline
was a pioneering attempt at visual workflows, but its limitations are well-known:
linear-first design, no interactive canvas (no drag/zoom/pan), navigation-centric
rather than data-flow-centric, no embedded previews inside nodes, and
long-standing unresolved bugs.

Tools like Daggr,
ComfyUI,
LangFlow, and
n8n have proven that visual node editors
are the natural interface for composable workflows. Panel deserves the same.


Core Concept: A Parameterized Class IS the Node

A pipeline stage is any param.Parameterized subclass. No special base class needed.

Parameterized Concept Pipeline Role
param.Parameter declarations Inputs (data the node consumes)
@param.output decorated methods Outputs (data the node produces)
@param.depends view methods Display (live preview inside the node)
Parameter types & constraints Auto-wiring and auto-input widgets
import param

class TransformNode(param.Parameterized):
    # Inputs
    text = param.String(default="hello world")
    mode = param.Selector(default="upper", objects=["upper", "lower", "title", "swapcase"])

    # Output: Should we continue to use param.output. Or just param.depends methods?
    @param.output(param.String)
    @param.depends("text", "mode")
    def result(self):
        if not self.text:
            return ""
        return getattr(self.text, self.mode)()

    # Display (view method — rendered inside the node)
    # Shold param.depends methods be recognized as outputs?
    @param.depends("text", "mode")
    def preview(self):
        result = getattr(self.text, self.mode)()
        return pn.pane.Markdown(f"**{result}**")

The Pipeline class introspects this and automatically:

  1. Creates auto-input widget nodes for text and mode (unconnected params)
  2. Wires the result output to any downstream stage with a matching result parameter
  3. Renders the preview() view method inside the node body
  4. Sets up reactive updates so changes propagate through the graph

Pipeline API

from panel_reactflow import Pipeline

Pipeline(
    stages=[("Name", ClassOrInstance), ...],
    graph=None,           # None = auto-infer edges; dict = explicit topology
    layout_spacing=(350, 150),  # (horizontal, vertical) pixels between nodes
    auto_inputs=True,     # auto-generate widget nodes for unconnected params
    kwargs={},            # extra kwargs forwarded to ReactFlow
).servable()

Question: Should this be a Pipeline class or just a def create_pipeline(...)->ReactFlow function?

Parameters

  • stages — List of (name, class_or_instance) tuples. Classes are instantiated automatically.
  • graph — Explicit topology: {source_name: target_name | (t1, t2, ...)}. When None, edges are inferred by matching @param.output names to downstream parameter names.
  • layout_spacing(horizontal, vertical) spacing in pixels.
  • auto_inputs — When True, unconnected parameters get auto-generated widget nodes with an "INPUT" pill badge.
  • kwargs — Extra keyword arguments forwarded to ReactFlow (e.g., min_height, show_minimap).

View resolution

Pipeline resolves what to display inside each stage node:

  1. View methods (preferred) — Public @param.depends methods that are not @param.output methods. If a stage has preview() or view() decorated with @param.depends, it's rendered in the node.
  2. Output fallback — If no view methods exist, @param.output methods are rendered. A single output is shown directly; multiple outputs are displayed in a pn.Accordion with named sections (all expanded).

Node styling

Pipeline nodes are visually distinguished:

  • Auto-input nodes — Indigo border + gradient + "INPUT" pill badge (CSS class rf-auto-input)
  • Stage nodes — Emerald border + gradient + "OUTPUT" pill badge (CSS class rf-stage)

Question: What would the right styling here be?


Examples

1. Hello World — Single Stage

tmp/hello_world.py — The simplest pipeline: one stage, auto-input widgets.

import panel as pn
import param

from panel_reactflow import Pipeline

pn.extension("jsoneditor")


class StrTransformNode(param.Parameterized):
    text = param.String("Hello World")
    mode = param.Selector(default="upper", objects=["upper", "lower", "title", "swapcase"])

    @param.output(param.String)
    @param.depends("text", "mode")
    def result(self):
        if not self.text:
            return ""
        return getattr(self.text, self.mode)()


Pipeline(stages=[("Transform", StrTransformNode)]).servable()

text and mode have no upstream connection, so Pipeline auto-generates input widget nodes for them. The result() output is rendered as the node's fallback view.

2. Text Pipeline — Two Stages with View Methods

tmp/text_pipeline.py — Auto-inferred edges, view methods in both stages.

import panel as pn
import param

from panel_reactflow import Pipeline

pn.extension("jsoneditor")


class TransformNode(param.Parameterized):
    text = param.String(default="hello world")
    mode = param.Selector(default="upper", objects=["upper", "lower", "title", "swapcase"])

    @param.output(param.String)
    @param.depends("text", "mode")
    def result(self):
        if not self.text:
            return ""
        return getattr(self.text, self.mode)()

    @param.depends("text", "mode")
    def preview(self):
        if not self.text:
            return pn.pane.Markdown("*No input yet*")
        result = getattr(self.text, self.mode)()
        return pn.pane.Markdown(f"**{result}**")


class DisplayNode(param.Parameterized):
    result = param.String()

    @param.depends("result")
    def view(self):
        return pn.pane.Alert(self.result or "Waiting...", alert_type="success")


Pipeline(
    stages=[("Transform", TransformNode), ("Display", DisplayNode)],
).servable()

Pipeline auto-infers the edge Transform.result -> Display.result by name matching. Transform shows preview(), Display shows view(). Auto-input widgets are created for text and mode.

3. Data Explorer — DataFrames with Dynamic Selectors

tmp/data_explorer.py — DataFrames flowing between nodes, dynamic column selectors, hvPlot chart.

import hvplot.pandas  # noqa: F401
import numpy as np
import pandas as pd
import panel as pn
import param

from panel_reactflow import Pipeline

pn.extension("jsoneditor")

DATASETS = {
    "iris": pd.DataFrame({
        "sepal_length": np.random.normal(5.8, 0.8, 150),
        "sepal_width": np.random.normal(3.0, 0.4, 150),
        "petal_length": np.random.normal(3.7, 1.8, 150),
        "petal_width": np.random.normal(1.2, 0.8, 150),
    }),
    "random": pd.DataFrame({
        "x": np.random.randn(200),
        "y": np.random.randn(200),
        "size": np.random.uniform(1, 10, 200),
        "value": np.random.uniform(0, 100, 200),
    }),
}


class DataLoaderNode(param.Parameterized):
    dataset = param.Selector(default="iris", objects=list(DATASETS.keys()))

    @param.output(param.DataFrame)
    @param.depends("dataset")
    def data(self):
        return DATASETS[self.dataset]

    @param.depends("dataset")
    def table(self):
        return pn.pane.DataFrame(DATASETS[self.dataset], max_height=300)


class ChartNode(param.Parameterized):
    data = param.DataFrame()
    x_col = param.Selector(default="", objects=[""])
    y_col = param.Selector(default="", objects=[""])

    def __init__(self, **params):
        super().__init__(**params)
        self._update_col_options()

    @param.depends("data", watch=True)
    def _update_col_options(self):
        if self.data is not None and len(self.data.columns):
            cols = list(self.data.columns)
            self.param.x_col.objects = cols
            self.param.y_col.objects = cols
            if self.x_col not in cols:
                self.x_col = cols[0]
            if self.y_col not in cols:
                self.y_col = cols[1] if len(cols) > 1 else cols[0]
        else:
            self.param.x_col.objects = [""]
            self.param.y_col.objects = [""]
            self.x_col = ""
            self.y_col = ""

    @param.output()
    @param.depends("data", "x_col", "y_col")
    def plot(self):
        if self.data is None or not self.x_col or not self.y_col:
            return pn.pane.Markdown("*Waiting for data...*")
        return self.data.hvplot.scatter(x=self.x_col, y=self.y_col, height=500, width=500)


Pipeline(
    stages=[("Data", DataLoaderNode), ("Chart", ChartNode)],
).servable()

DataLoaderNode.data auto-connects to ChartNode.data. ChartNode dynamically updates its x_col/y_col selector options when data arrives. table() is DataLoader's view method; plot() is Chart's output fallback.

4. Stock Analysis DAG — Diamond Topology

tmp/stock_dag.py — Fan-out and fan-in with explicit graph.

import hvplot.pandas  # noqa: F401
import numpy as np
import pandas as pd
import panel as pn
import param

from panel_reactflow import Pipeline

pn.extension("jsoneditor")


class StockData(param.Parameterized):
    symbol = param.String(default="AAPL")
    days = param.Integer(default=252, bounds=(30, 1000))

    @param.output(param.DataFrame)
    @param.depends("symbol", "days")
    def prices(self):
        np.random.seed(hash(self.symbol) % 2**32)
        dates = pd.date_range(end=pd.Timestamp.now(), periods=self.days)
        price = 100 + np.cumsum(np.random.randn(self.days) * 1.5)
        return pd.DataFrame({"date": dates, "price": price}).set_index("date")


class MANode(param.Parameterized):
    prices = param.DataFrame()
    window = param.Integer(default=20, bounds=(5, 100))

    @param.output(param.DataFrame)
    @param.depends("prices", "window")
    def ma_data(self):
        if self.prices is None:
            return None
        df = self.prices.copy()
        df["ma"] = df["price"].rolling(self.window).mean()
        return df


class RSINode(param.Parameterized):
    prices = param.DataFrame()
    period = param.Integer(default=14, bounds=(2, 50))

    @param.output(param.DataFrame)
    @param.depends("prices", "period")
    def rsi_data(self):
        if self.prices is None:
            return None
        delta = self.prices["price"].diff()
        gain = delta.clip(lower=0).rolling(self.period).mean()
        loss = (-delta.clip(upper=0)).rolling(self.period).mean()
        rs = gain / loss
        df = self.prices.copy()
        df["rsi"] = 100 - (100 / (1 + rs))
        return df


class ChartNode(param.Parameterized):
    ma_data = param.DataFrame()
    rsi_data = param.DataFrame()

    @param.output()
    @param.depends("ma_data", "rsi_data")
    def plot(self):
        if self.ma_data is None or self.rsi_data is None:
            return pn.pane.Markdown("*Waiting for data from both branches...*")
        price_plot = self.ma_data.hvplot.line(
            y=["price", "ma"], ylabel="Price", title="Price & Moving Average",
            legend="top_left", height=200,
        )
        rsi_plot = self.rsi_data.hvplot.line(
            y="rsi", ylabel="RSI", title="RSI", color="orange", height=150,
        )
        return pn.Column(price_plot, rsi_plot, sizing_mode="stretch_width")


Pipeline(
    stages=[
        ("Stock Data", StockData),
        ("MA", MANode),
        ("RSI", RSINode),
        ("Chart", ChartNode),
    ],
    graph={"Stock Data": ("MA", "RSI"), "MA": "Chart", "RSI": "Chart"},
    kwargs={"min_height": 600},
).servable()

The explicit graph defines the diamond: Stock Data fans out to MA and RSI, which fan in to Chart. Auto-inputs are created for symbol, days, window, and period.

5. Multi-Output — One Method, Three Outputs

tmp/multi_output.py — A single @param.output method producing three named outputs, each flowing to its own downstream node.

import panel as pn
import param

from panel_reactflow import Pipeline

pn.extension("jsoneditor")


class Source(param.Parameterized):
    text = param.String(default="Hello World")

    @param.output(upper=param.String(), lower=param.String(), length=param.Integer())
    @param.depends("text")
    def split(self):
        return self.text.upper(), self.text.lower(), len(self.text)


class UpperDisplay(param.Parameterized):
    upper = param.String()

    @param.depends("upper")
    def view(self):
        return pn.pane.Markdown(f"**{self.upper}**")


class LowerDisplay(param.Parameterized):
    lower = param.String()

    @param.depends("lower")
    def view(self):
        return pn.pane.Markdown(f"*{self.lower}*")


class LengthDisplay(param.Parameterized):
    length = param.Integer()

    @param.depends("length")
    def view(self):
        return pn.pane.Alert(f"Length: {self.length}", alert_type="info")


Pipeline(
    stages=[
        ("Source", Source),
        ("Upper", UpperDisplay),
        ("Lower", LowerDisplay),
        ("Length", LengthDisplay),
    ],
).servable()

Source.split() returns a tuple of 3 values. Pipeline auto-infers edges: upper -> Upper.upper, lower -> Lower.lower, length -> Length.length. The Source node displays all 3 outputs in an Accordion (expanded). Each downstream node shows its view method.


Implemented Features

  • Pipeline class (src/panel_reactflow/pipeline.py)
  • Auto-inferred edges by matching @param.output names to downstream parameter names
  • Explicit graph dict for non-linear topologies (fan-out, fan-in, diamond)
  • Auto-input widget nodes for unconnected parameters
  • Reactive wiring via param.watch (upstream output changes propagate downstream)
  • View method resolution (public @param.depends methods rendered in nodes)
  • Output fallback rendering (single output direct, multi-output in Accordion)
  • Multi-output support (tuple-indexed extraction from @param.output(a=..., b=..., c=...))
  • Topological layout algorithm (BFS depth assignment, vertical stacking for fan-out)
  • Configurable layout spacing
  • Visual pill badges: "INPUT" (indigo) on auto-input nodes, "OUTPUT" (emerald) on stage nodes
  • Forward kwargs to ReactFlow (min_height, show_minimap, etc.)
  • Accepts both classes and instances as stages
  • Unit tests (tests/test_pipeline.py — 34 tests)

Open Issues

Requirements

Customization

  • Custom input widgets — Developers must be able to customize auto-input widgets in the same way pn.Param allows (e.g., specifying widget types, formatting, bounds overrides per parameter).
  • Custom output views — Developers must be able to customize how individual outputs are rendered in stage nodes, by providing panes, custom functions, or Viewer subclasses (analogous to pn.Param's widgets dict).
  • Customizable node styling — The default input/output pill badges and colors should look polished out of the box, but developers must be able to override them (custom CSS classes, colors, or disable badges entirely).

Execution control

  • Output caching — Expensive output computations should be cacheable. The recommended pattern is @pn.cache on the output method; this should be documented with examples.
  • Manual vs. automatic execution — Developers must be able to choose between automatic reactive updates (current default: outputs recompute on any input change) and manual trigger mode (outputs recompute only on button click).
  • Startup computation — Developers must be able to control whether outputs are computed on initialization (similar to on_init=True in param.depends).
  • Background execution — Long-running output computations should run in the background (e.g., via pn.state.execute or threading) to keep the UI responsive. When possible, independent branches should compute in parallel.
  • Generators: Please also support output generators. I.e. sync and async functions that yield one element at the time. There is no reason to wait if one element is done - but the 9 others are not.

Visual feedback

  • Stale/invalidated state — When an input changes but the downstream output has not yet recomputed, the affected nodes should be visually marked as stale (e.g., dimmed border, "stale" badge).
  • Computing indicator — While an output is being recomputed, the node should show a spinner or loading overlay so the user knows work is in progress.

Documentation and examples

  • Convert legacy Panel Pipeline examples — All examples from the Panel Pipeline How-To guides should be ported to this API and tested.
  • Convert Gradio Daggr examples — All examples from Daggr should be ported to this API and tested.
  • Convert langgraph examples: All examples from https://docs.langflow.org/ tutorials.
  • ML and GenAI examples — Create showcase examples for machine learning workflows (training pipelines, inference chains) and generative AI (LLM chains, RAG pipelines).
  • Function-to-node guide — Document how to wrap an existing function as a Pipeline stage with minimal boilerplate (input params from function signature, output from return value).
  • Testing guide — Document how to unit-test individual stages in isolation and how to integration-test a full pipeline.

API design

  • Helper node classes — Evaluate whether to provide ready-made base classes like FnNode (wraps a plain function), PanelNode (wraps a Panel viewable), and InferenceNode (wraps an ML model), similar to Daggr's node types.
  • LLM-friendly error messages — All Pipeline errors (missing outputs, unresolved edges, type mismatches) should produce clear, actionable messages that an LLM coding assistant can interpret and fix without ambiguity.
  • Resolve open issues above — Content-aware layout, Viewer resolution, node overflow, and type-based wiring should all be addressed.

Questions

  • Should we display all input parameters in one node instead of individual nodes. Langgraph https://docs.langflow.org/ does the first, hugging face does the latter.

References

Panel Pipeline

param.output

Prior Art

  • Daggr — Visual DAG builder for Gradio apps
  • ComfyUI — Node-based Stable Diffusion workflow editor
  • LangFlow — Visual LLM agent workflow builder
  • Flowise — Drag-and-drop LLM flow builder
  • n8n — Workflow automation platform

panel-reactflow

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions