Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-33027: add PipelineGraph class #347

Merged
merged 16 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/changes/DM-33027.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a PipelineGraph class that represents a Pipeline with all configuration overrides applied as a graph.
5 changes: 5 additions & 0 deletions doc/lsst.pipe.base/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ Developing Pipelines

creating-a-pipeline.rst
testing-pipelines-with-mocks.rst
working-with-pipeline-graphs.rst

.. _lsst.pipe.base-contributing:

Expand All @@ -77,6 +78,10 @@ Python API reference
:no-main-docstr:
:skip: BuildId
:skip: DatasetTypeName
:skip: PipelineGraph

.. automodapi:: lsst.pipe.base.pipeline_graph
:no-main-docstr:

.. automodapi:: lsst.pipe.base.testUtils
:no-main-docstr:
Expand Down
88 changes: 88 additions & 0 deletions doc/lsst.pipe.base/working-with-pipeline-graphs.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
.. _pipe_base_pipeline_graphs:

.. py:currentmodule:: lsst.pipe.base.pipeline_graph

############################
Working with Pipeline Graphs
############################

Pipeline objects are written as YAML documents, but once they are fully configured, they are conceptually directed acyclic graphs (DAGs).
In code, this graph version of a pipeline is represented by the `PipelineGraph` class.
`PipelineGraph` objects are usually constructed by calling the `.Pipeline.to_graph` method::

from lsst.daf.butler import Butler
from lsst.pipe.base import Pipeline

butler = Butler("/some/repo")
pipeline = Pipeline.from_uri("my_pipeline.yaml")
graph = pipeline.to_graph(registry=butler.registry)

The ``registry`` argument here is optional, but without it the graph will be incomplete ("unresolved") and the pipeline will not be checked for correctness until the `~PipelineGraph.resolve` method is called.
Resolving a graph compares all of the task connections (which are edges in the graph) that reference each dataset type to each other and to the registry's definition of that dataset to determine a common graph-wide definition.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why cant you compare the dataset type to each other w/o a registry?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first problem comes from allowing the use of "skypix" as a dimension placeholder - that makes it impossible to always construct a DatasetType from just a connection without a registry that has at least those dataset types already registered.

The other problem was that I was just wary of making it possible to resolve graphs in a way that was inconsistent with the registry.

I could imagine it being useful to work around the first problem (e.g. just pick the common skypix system from the universe) and we could just choose to ignore the second, but I'd like to have a good reason for it, and my thinking was that in practice a registry should always be available.

A definition in the registry always takes precedence, followed by the output connection that produces the dataset type (if there is one).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are conflicts silently dropped?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the beauty of having a bipartite graph - the conflicts are resolved in the dataset type nodes, but the differences are remembered by having the original storage classes in the edges. And they are only resolved when the storage classes are convertible in the right direction - otherwise you get an error.

When a pipeline graph is used to register dataset types in a data repository, it is this common definition in the dataset type node that is registered.
Edge dataset type descriptions represent storage class overrides for a task, or specify that the task only wants a component.

Simple Graph Inspection
-----------------------

The basic structure of the graph can be explored via the `~PipelineGraph.tasks` and `~PipelineGraph.dataset_types` mapping attributes.
These are keyed by task label and *parent* (never component) dataset type name, and they have `TaskNode` and `DatasetTypeNode` objects as values, respectively.
A resolved pipeline graph is always sorted, which means iterations over these mappings will be in topological order.
`TaskNode` objects have an `~TaskNode.init` attribute that holds a `TaskInitNode` instance - these resemble `TaskNode` objects and have edges to dataset types as well, but these edges represent the "init input" and "init output" connections of those tasks.

Task and dataset type node objects have attributes holding all of their edges, but to get neighboring nodes, you have to go back to the graph object::

task_node = graph.tasks["task_a"]
for edge in task.inputs.values():
dataset_type_node = graph.dataset_types[edge.parent_dataset_type_name]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this access pattern one already needs to know the names of all the dataset types an edge contains? I know one can do this with lists, loops, getatter etc, its just that this little example makes it seem simpler than it is.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each edge is associated with only one dataset type; the graph structure permits multiple edges between the same nodes (this is what networkx calls a MultiDiGraph instead of just a DiGraph). Does that answer your question? I'm not sure I understood it otherwise.

print(f"{task_node.label} takes {dataset_type_node.name} as an input.")

There are also convenience methods on `PipelineGraph` to get the edges or neighbors of a node:

- `~PipelineGraph.producing_edge_of`: an alternative to `DatasetTypeNode.producing_edge`
- `~PipelineGraph.consuming_edges_of`: an alternative to `DatasetTypeNode.consuming_edges`
- `~PipelineGraph.producer_of`: a shortcut for getting the task that write a dataset type
- `~PipelineGraph.consumers_of`: a shortcut for getting the tasks that read a dataset type
- `~PipelineGraph.inputs_of`: a shortcut for getting the dataset types that a task reads
- `~PipelineGraph.outputs_of`: a shortcut for getting the dataset types that a task writes

Pipeline graphs also hold the `~PipelineGraph.description` and `~PipelineGraph.data_id` (usually just an instrument value) of the pipeline used to construct them, as well as the same mapping of labeled task subsets (`~PipelineGraph.task_subsets`).

Modifying PipelineGraphs
------------------------

Usually the tasks in a pipeline are set before a `PipelineGraph` is ever constructed.
In some cases it may be more convenient to add tasks to an existing `PipelineGraph`, either because a related graph is being created from an existing one, or because a (rare) task needs to be configured in a way that depends on the content or structure of the rest of the graph.
`PipelineGraph` provides a number of mutation methods:

- `~PipelineGraph.add_task` adds a brand new task from a `.PipelineTask` type object and its configuration;
- `~PipelineGraph.add_task_nodes` adds one or more tasks from a different `PipelineGraph` instance;
- `~PipelineGraph.reconfigure_tasks` replaces the configuration of an existing task with new configuration (note that this is typically less convenient than adding config *overrides* to a `Pipeline` object, because all configuration in a `PipelineGraph` must be validated and frozen);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this potentially change the whole graph structure as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it certainly can.

- `~PipelineGraph.remove_task_nodes` removes existing tasks;
- `~PipelineGraph.add_task_subset` and `~PipelineGraph.remove_task_subset` modify the mapping of labeled task subsets (which can also be modified in-place).

**The most important thing to remember when modifying `PipelineGraph` objects is that modifications typically reset some or all of the graph to an unresolved state.**

The reference documentation for these methods describes exactly what guarantees they make about existing resolutions in detail, and what operations are still supported on unresolved or partially-resolved graphs, but it is easiest to just ensure `resolve` is called after any modifications are complete.

`PipelineGraph` mutator methods provide strong exception safety (the graph is left unchanged when an exception is raised and caught by calling code) unless the exception type raised is `PipelineGraphExceptionSafetyError`.

Exporting to NetworkX
---------------------

NetworkX is a powerful Python library for graph manipulation, and in addition to being used in the implementation, `PipelineGraph` provides methods to create various native NetworkX graph objects.
The node attributes of these graphs provide much of the same information as the `TaskNode` and `DatasetTypeNode` objects (see the documentation for those objects for details).

The export methods include:

- `~PipelineGraph.make_xgraph` exports all nodes, including task nodes, dataset type nodes, and task init nodes, and the edges between them.
This is a `networkx.MultiDiGraph` because there can be (albeit) rarely multiple edges (representing different connections) between a dataset type and a task.
The edges of this graph have attributes as well as the nodes.
- `~PipelineGraph.make_bipartite_graph` exports just task nodes and dataset type nodes and the edges between them (or, if ``init=True``, just task init nodes and the dataset type nodes and edges between them).
A "bipartite" graph is one in which there are two kinds of nodes and edges only connect one type to the other.
This is also a `networkx.MultiDiGraph`, and its edges also have attributes.
- `~PipelineGraph.make_task_graph` exports just task (or task init) nodes; it is one "bipartite projection" of the full graph.
This is a `networkx.DiGraph`, because all dataset types that connect a pair of tasks are rolled into one edge, and edges have no state.
- `~PipelineGraph.make_dataset_type_graph` exports just dataset type nodes; it is one "bipartite projection" of the full graph.
This is a `networkx.DiGraph`, because all tasks that connect a pair of dataset types are rolled into one edge, and edges have no state.
6 changes: 5 additions & 1 deletion python/lsst/pipe/base/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from . import automatic_connection_constants, connectionTypes, pipelineIR
from . import automatic_connection_constants, connectionTypes, pipeline_graph, pipelineIR
from ._dataset_handle import *
from ._instrument import *
from ._observation_dimension_packer import *
Expand All @@ -11,6 +11,10 @@
from .graph import *
from .graphBuilder import *
from .pipeline import *

# We import the main PipelineGraph type and the module (above), but we don't
# lift all symbols to package scope.
from .pipeline_graph import PipelineGraph
from .pipelineTask import *
from .struct import *
from .task import *
Expand Down
175 changes: 40 additions & 135 deletions python/lsst/pipe/base/pipeTools.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,51 +27,24 @@
# No one should do import * from this module
__all__ = ["isPipelineOrdered", "orderPipeline"]

# -------------------------------
# Imports of standard modules --
# -------------------------------
import itertools
from collections.abc import Iterable
from typing import TYPE_CHECKING

# -----------------------------
# Imports for other modules --
# -----------------------------
from .connections import iterConnections
from .pipeline import Pipeline, TaskDef

# Exceptions re-exported here for backwards compatibility.
from .pipeline_graph import DuplicateOutputError, PipelineDataCycleError, PipelineGraph # noqa: F401

if TYPE_CHECKING:
from .pipeline import Pipeline, TaskDef
from .taskFactory import TaskFactory

# ----------------------------------
# Local non-exported definitions --
# ----------------------------------

# ------------------------
# Exported definitions --
# ------------------------


class MissingTaskFactoryError(Exception):
"""Exception raised when client fails to provide TaskFactory instance."""

pass


class DuplicateOutputError(Exception):
"""Exception raised when Pipeline has more than one task for the same
output.
"""

pass


class PipelineDataCycleError(Exception):
"""Exception raised when Pipeline has data dependency cycle."""

pass


def isPipelineOrdered(pipeline: Pipeline | Iterable[TaskDef], taskFactory: TaskFactory | None = None) -> bool:
"""Check whether tasks in pipeline are correctly ordered.

Expand All @@ -80,134 +53,66 @@

Parameters
----------
pipeline : `pipe.base.Pipeline`
pipeline : `Pipeline` or `collections.abc.Iterable` [ `TaskDef` ]
Pipeline description.
taskFactory: `pipe.base.TaskFactory`, optional
Instance of an object which knows how to import task classes. It is
only used if pipeline task definitions do not define task classes.
taskFactory: `TaskFactory`, optional
Ignored; present only for backwards compatibility.

Returns
-------
True for correctly ordered pipeline, False otherwise.
is_ordered : `bool`
True for correctly ordered pipeline, False otherwise.

Raises
------
ImportError
Raised when task class cannot be imported.
DuplicateOutputError
Raised when there is more than one producer for a dataset type.
MissingTaskFactoryError
Raised when TaskFactory is needed but not provided.
"""
# Build a map of DatasetType name to producer's index in a pipeline
producerIndex = {}
for idx, taskDef in enumerate(pipeline):
for attr in iterConnections(taskDef.connections, "outputs"):
if attr.name in producerIndex:
raise DuplicateOutputError(
"DatasetType `{}' appears more than once as output".format(attr.name)
)
producerIndex[attr.name] = idx

# check all inputs that are also someone's outputs
for idx, taskDef in enumerate(pipeline):
# get task input DatasetTypes, this can only be done via class method
inputs = {name: getattr(taskDef.connections, name) for name in taskDef.connections.inputs}
for dsTypeDescr in inputs.values():
# all pre-existing datasets have effective index -1
prodIdx = producerIndex.get(dsTypeDescr.name, -1)
if prodIdx >= idx:
# not good, producer is downstream
return False

if isinstance(pipeline, Pipeline):
graph = pipeline.to_graph()

Check warning on line 74 in python/lsst/pipe/base/pipeTools.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/pipeTools.py#L74

Added line #L74 was not covered by tests
else:
graph = PipelineGraph()
for task_def in pipeline:
graph.add_task(task_def.label, task_def.taskClass, task_def.config, task_def.connections)
# Can't use graph.is_sorted because that requires sorted dataset type names
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if I understand this comment or what you are doing here, or why if you have a graph structure you cant know sorting. If you have a graph, surely the process of creating a graph sorts it no?

More over I am not sure I understand the point of this function any more. If Pipelines or iterables of taskDefs are kind of just the in-progress things, ordering does not matter. If we get to the point where ordering DOSE matter, surely we should be using the new graph structure anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been proposed for deprecation on RFC-949, so soon it won't matter.

The distinction here is between just topologically sorting a graph (which does not have a unique solution) and sorting it topologically with some tiebreaker to give it a unique solution. PipelineGraph.is_sorted assumes a specific tiebreaker, while this function just checked for any topological ordering. Since I wanted to get rid of this method anyway I figured a small reimplementation to keep the behavior unchanged until it could be removed would be the least disruptive.

# as well as sorted tasks.
tasks_xgraph = graph.make_task_xgraph()
seen: set[str] = set()
for task_label in tasks_xgraph:
successors = set(tasks_xgraph.successors(task_label))
if not successors.isdisjoint(seen):
return False

Check warning on line 86 in python/lsst/pipe/base/pipeTools.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/pipeTools.py#L86

Added line #L86 was not covered by tests
seen.add(task_label)
return True


def orderPipeline(pipeline: list[TaskDef]) -> list[TaskDef]:
def orderPipeline(pipeline: Pipeline | Iterable[TaskDef]) -> list[TaskDef]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, I am not sure I see the point of an orderPipeline function if there is a new graph structure available.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is proposed for deprecation on RFC-949 as well.

"""Re-order tasks in pipeline to satisfy data dependencies.

When possible new ordering keeps original relative order of the tasks.

Parameters
----------
pipeline : `list` of `pipe.base.TaskDef`
pipeline : `Pipeline` or `collections.abc.Iterable` [ `TaskDef` ]
Pipeline description.

Returns
-------
Correctly ordered pipeline (`list` of `pipe.base.TaskDef` objects).
ordered : `list` [ `TaskDef` ]
Correctly ordered pipeline.

Raises
------
`DuplicateOutputError` is raised when there is more than one producer for a
dataset type.
`PipelineDataCycleError` is also raised when pipeline has dependency
cycles. `MissingTaskFactoryError` is raised when `TaskFactory` is needed
but not provided.
DuplicateOutputError
Raised when there is more than one producer for a dataset type.
PipelineDataCycleError
Raised when the pipeline has dependency cycles.
"""
# This is a modified version of Kahn's algorithm that preserves order

# build mapping of the tasks to their inputs and outputs
inputs = {} # maps task index to its input DatasetType names
outputs = {} # maps task index to its output DatasetType names
allInputs = set() # all inputs of all tasks
allOutputs = set() # all outputs of all tasks
dsTypeTaskLabels: dict[str, str] = {} # maps DatasetType name to the label of its parent task
for idx, taskDef in enumerate(pipeline):
# task outputs
dsMap = {name: getattr(taskDef.connections, name) for name in taskDef.connections.outputs}
for dsTypeDescr in dsMap.values():
if dsTypeDescr.name in allOutputs:
raise DuplicateOutputError(
f"DatasetType `{dsTypeDescr.name}' in task `{taskDef.label}' already appears as an "
f"output in task `{dsTypeTaskLabels[dsTypeDescr.name]}'."
)
dsTypeTaskLabels[dsTypeDescr.name] = taskDef.label
outputs[idx] = set(dsTypeDescr.name for dsTypeDescr in dsMap.values())
allOutputs.update(outputs[idx])

# task inputs
connectionInputs = itertools.chain(taskDef.connections.inputs, taskDef.connections.prerequisiteInputs)
inputs[idx] = set(getattr(taskDef.connections, name).name for name in connectionInputs)
allInputs.update(inputs[idx])

# for simplicity add pseudo-node which is a producer for all pre-existing
# inputs, its index is -1
preExisting = allInputs - allOutputs
outputs[-1] = preExisting

# Set of nodes with no incoming edges, initially set to pseudo-node
queue = [-1]
result = []
while queue:
# move to final list, drop -1
idx = queue.pop(0)
if idx >= 0:
result.append(idx)

# remove task outputs from other tasks inputs
thisTaskOutputs = outputs.get(idx, set())
for taskInputs in inputs.values():
taskInputs -= thisTaskOutputs

# find all nodes with no incoming edges and move them to the queue
topNodes = [key for key, value in inputs.items() if not value]
queue += topNodes
for key in topNodes:
del inputs[key]

# keep queue ordered
queue.sort()

# if there is something left it means cycles
if inputs:
# format it in usable way
loops = []
for idx, inputNames in inputs.items():
taskName = pipeline[idx].label
outputNames = outputs[idx]
edge = " {} -> {} -> {}".format(inputNames, taskName, outputNames)
loops.append(edge)
raise PipelineDataCycleError("Pipeline has data cycles:\n" + "\n".join(loops))

return [pipeline[idx] for idx in result]
if isinstance(pipeline, Pipeline):
graph = pipeline.to_graph()

Check warning on line 112 in python/lsst/pipe/base/pipeTools.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/pipeTools.py#L112

Added line #L112 was not covered by tests
else:
graph = PipelineGraph()
for task_def in pipeline:
graph.add_task(task_def.label, task_def.taskClass, task_def.config, task_def.connections)
graph.sort()
return list(graph._iter_task_defs())