Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-33027: add PipelineGraph class #347

Merged
merged 16 commits into from Aug 4, 2023
Merged

DM-33027: add PipelineGraph class #347

merged 16 commits into from Aug 4, 2023

Conversation

TallJimbo
Copy link
Member

@TallJimbo TallJimbo commented Jun 23, 2023

Checklist

  • ran Jenkins local lsstsw
  • added a release note for user-visible changes to doc/changes

@codecov
Copy link

codecov bot commented Jun 23, 2023

Codecov Report

Patch coverage: 86.79% and project coverage change: +1.27% 🎉

Comparison is base (eb4b848) 82.72% compared to head (8091ec1) 83.99%.

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #347      +/-   ##
==========================================
+ Coverage   82.72%   83.99%   +1.27%     
==========================================
  Files          66       77      +11     
  Lines        7345     9086    +1741     
  Branches     1443     1741     +298     
==========================================
+ Hits         6076     7632    +1556     
- Misses       1023     1165     +142     
- Partials      246      289      +43     
Files Changed Coverage Δ
python/lsst/pipe/base/tests/pipelineStepTester.py 0.00% <0.00%> (ø)
tests/test_pipeTools.py 100.00% <ø> (ø)
python/lsst/pipe/base/pipeline.py 65.41% <59.09%> (-0.65%) ⬇️
python/lsst/pipe/base/pipeTools.py 80.64% <72.72%> (-13.56%) ⬇️
...ython/lsst/pipe/base/tests/mocks/_pipeline_task.py 52.28% <74.68%> (+17.74%) ⬆️
tests/test_pipeline_graph.py 80.03% <80.03%> (ø)
python/lsst/pipe/base/pipeline_graph/_edges.py 80.62% <80.62%> (ø)
...on/lsst/pipe/base/pipeline_graph/_dataset_types.py 91.02% <91.02%> (ø)
python/lsst/pipe/base/pipeline_graph/_tasks.py 92.51% <92.51%> (ø)
...n/lsst/pipe/base/pipeline_graph/_pipeline_graph.py 93.83% <93.83%> (ø)
... and 7 more

... and 2 files with indirect coverage changes

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@natelust natelust left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Posting for now, with more likely to come later

@@ -2,7 +2,7 @@
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (http://www.lsst.org).
# (http://www.lsst.org).XS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is up with this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a sign that the author uses Emacs keybindings and cannot reliably hit ctrl, sometimes hitting shift instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 :wq

_LOG.error("Configuration validation failed for task %s (%s)", label, taskName)
raise
config.freeze()
if connections is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the safest assumption for something public? Does it really hurt to check?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assumption that the config is validated and frozen if the connection is not None, you mean?

My thinking here is that I want to deprecate and remove TaskDef anyway, so if its current construction sites follow this rule (and they do), it's unlikely to get any new ones before it's gone, and in the meantime it's a nice optimization.

And AFAIK there's no way to ask a config if it's frozen for validated. The former wouldn't be hard to add - there's a Config._frozen attribute that could be made public - but I didn't really want to open up a pex_config branch for this ticket unless I really needed to.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh no, sorry that if you dont have connections yet assume the config has not been validated, though I guess there is little harm in doing it again even if it was

graph = PipelineGraph()
for task_def in pipeline:
graph.add_task(task_def.label, task_def.taskClass, task_def.config, task_def.connections)
# Can't use graph.is_sorted because that requires sorted dataset type names
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if I understand this comment or what you are doing here, or why if you have a graph structure you cant know sorting. If you have a graph, surely the process of creating a graph sorts it no?

More over I am not sure I understand the point of this function any more. If Pipelines or iterables of taskDefs are kind of just the in-progress things, ordering does not matter. If we get to the point where ordering DOSE matter, surely we should be using the new graph structure anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been proposed for deprecation on RFC-949, so soon it won't matter.

The distinction here is between just topologically sorting a graph (which does not have a unique solution) and sorting it topologically with some tiebreaker to give it a unique solution. PipelineGraph.is_sorted assumes a specific tiebreaker, while this function just checked for any topological ordering. Since I wanted to get rid of this method anyway I figured a small reimplementation to keep the behavior unchanged until it could be removed would be the least disruptive.

return True


def orderPipeline(pipeline: list[TaskDef]) -> list[TaskDef]:
def orderPipeline(pipeline: Pipeline | Iterable[TaskDef]) -> list[TaskDef]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, I am not sure I see the point of an orderPipeline function if there is a new graph structure available.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is proposed for deprecation on RFC-949 as well.

@@ -53,14 +53,12 @@
from lsst.utils.introspection import get_full_type_name

from . import automatic_connection_constants as acc
from . import pipelineIR, pipeTools
from . import pipeline_graph, pipelineIR
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

importing pipeline_graph module here and using it below feels oddly out of place to me, and I can't quite put my finger on why. I think it is because most of the rest of the place we are pulling in directly the objects we use. Certainly is not wrong, it just somehow feels like it does not quite fit. You are free to leave it, just think about this comment.

Copy link
Member Author

@TallJimbo TallJimbo Jul 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the downside of a tradeoff that I still think is an overall win: I didn't want to put the package context (i.e. something implying "part of pipeline graph") into the class names of all of the types in the subpackage, like TaskNode and WriteEdge, but that means seeing those names outside the package without being qualified by the package name can make them a little harder to read. My hope is that not many modules will have to import more than just PipelineGraph itself (which is lifted to lsst.pipe.base), so this module is an outlier.

FWIW, I do the same thing with standard library modules: mostly I do from <module> import A, B, but for a few (dataclasses is the most prominent example) I just do import <module>, because their names are really generic when unqualified (e.g. field, replace, asdict in dataclasses).

"""Key that identifies this node in internal and exported networkx graphs.
"""

prerequisite_inputs: Mapping[str, ReadEdge]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this sort of thing is state for the graph, and not really a property of the task itself. Does it make sense to have methods on the graph that say getPrerequitesFor(task) or something like that?

I guess it really comes down to what you want to be of central importance, and what access patters you want.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(discussed in person, and in another thread)

yield self.log_output

def diff_edges(self, other: TaskNode) -> list[str]:
"""Compare the edges of this task node to those from the same task
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this really belong on this object?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so: it's a comparison between two TaskNode objects. It's not a complete comparison, so it's not __eq__, but it's one of several important kinds of partials comparisons I can imagine (and the others are I think just straightforward comparisons of attributes, so I don't think they need their own methods).

result += self.metadata_output.diff(other.metadata_output, "metadata output")
return result

def _imported_and_configured(self, rebuild: bool) -> TaskNode:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this be simpler if all tasks were imported and configured prior to being added to a graph?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've discussed this live, but for posterity:

  • The only way tasks can be in a state in which they aren't imported and configured is after reading from disk (and the tasks must have originally been imported and configured before being written).
  • If you have a serialized PipelineGraph on disk, it's a massive optimization to avoid importing all of the tasks, especially if you just want to visualize or otherwise ask questions about it, or only want a subset (either labeled or algorithmically defined) of the tasks (that would then be imported).

So the question is really whether we expect to end up with a workflow for pipelines in which PipelineGraphs get serialized once and then read many times (e.g. by SCons in drp_pipe and the like, or at the start of a campaign by CM tools). If so, I think the "not imported" state is useful enough to keep supporting (and how fast it is to read pipeline graphs in that state may be a compelling reasons to make serialized pipeline graphs part of our workflow). But as long as people are almost always reading pipeline YAML files from disk instead, all tasks will have to be imported anyway, and this state is just a lot of complexity that isn't carrying its own weight.

Right now my plan is to disable high-level access to this state to avoid making a long-term maintenance promise (which we might regret) until at least DM-33034, and decide then whether to open up access to it, remove it completely, or punt some more.

"""


def expect_not_none(value: _U | None, msg: str) -> _U:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a random curiosity, MyPy is smart enough to handle when _U is typed as possibly None as well as the None you put in this definition?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how it'd do in generic contexts, but if you pass it a type that is annotated as some concrete type or None, it is smart enough to recognize that this function guarantees that only that concrete type will be returned, and that's what makes it useful here.

Copy link
Member Author

@TallJimbo TallJimbo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've addressed the minor issues from the review-so-far and responded to other review comments (often just repeating what we discussed earlier). I haven't yet made the larger changes we discussed in our call a few days ago; I'm starting in on that now.

@@ -2,7 +2,7 @@
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (http://www.lsst.org).
# (http://www.lsst.org).XS
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a sign that the author uses Emacs keybindings and cannot reliably hit ctrl, sometimes hitting shift instead.

@@ -53,14 +53,12 @@
from lsst.utils.introspection import get_full_type_name

from . import automatic_connection_constants as acc
from . import pipelineIR, pipeTools
from . import pipeline_graph, pipelineIR
Copy link
Member Author

@TallJimbo TallJimbo Jul 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the downside of a tradeoff that I still think is an overall win: I didn't want to put the package context (i.e. something implying "part of pipeline graph") into the class names of all of the types in the subpackage, like TaskNode and WriteEdge, but that means seeing those names outside the package without being qualified by the package name can make them a little harder to read. My hope is that not many modules will have to import more than just PipelineGraph itself (which is lifted to lsst.pipe.base), so this module is an outlier.

FWIW, I do the same thing with standard library modules: mostly I do from <module> import A, B, but for a few (dataclasses is the most prominent example) I just do import <module>, because their names are really generic when unqualified (e.g. field, replace, asdict in dataclasses).

_LOG.error("Configuration validation failed for task %s (%s)", label, taskName)
raise
config.freeze()
if connections is None:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assumption that the config is validated and frozen if the connection is not None, you mean?

My thinking here is that I want to deprecate and remove TaskDef anyway, so if its current construction sites follow this rule (and they do), it's unlikely to get any new ones before it's gone, and in the meantime it's a nice optimization.

And AFAIK there's no way to ask a config if it's frozen for validated. The former wouldn't be hard to add - there's a Config._frozen attribute that could be made public - but I didn't really want to open up a pex_config branch for this ticket unless I really needed to.

graph = pipeline_graph.PipelineGraph(data_id=data_id)
graph.description = self._pipelineIR.description
for label in self._pipelineIR.tasks:
self._add_task_to_graph(label, graph)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation needs to get at self._pipelineIR, not just the Pipeline public API, while it only needs to access public methods of PipelineGraph, so this direction didn't requires as much "friending" as the alternative.

is_prerequisite: bool | None = None
producer: str | None = None
write_edge: WriteEdge
for _, _, write_edge in xgraph.in_edges(key, data="instance"): # will iterate zero or one time
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've expanded the comment quite a bit.

Identifier for this node in networkx graphs.
init : `TaskInitNode`
Node representing the initialization of this task.
prerequisite_inputs : `~collections.abc.Mapping` [ `str`, `ReadEdge` ]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed this live already, but for posterity:

  • I want the edges here primarily so a TaskNode is a self-contained representation of everything you can get from a fully-configured task and its connections. I regard that as a type that's important for constructing and modifying PipelineGraphs, so I see this question as more about whether there should also be a variant of this class that doesn't have the edges that would be the one held by the graph after construction/modification is done, with the graph then responsible for providing access to the edges.
  • Note that this still only has edge objects, and neither the task object nor the edge objects have any way to get to a DatasetTypeNode, which you have to get from the PipelineGraph (using the dataset type name, which the edge objects do have). So it's not that duplicative with what the PipelineGraph has.
  • Rather than remove these, I think it'd be a better to add edges to DatasetTypeNode to restore the symmetry. That makes it so the .tasks and .dataset_types mapping-view attributes of PipelineGraph together provide a pretty simple and yet complete interface-subset that really does represent the full graph.

"""Key that identifies this node in internal and exported networkx graphs.
"""

prerequisite_inputs: Mapping[str, ReadEdge]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(discussed in person, and in another thread)

yield self.log_output

def diff_edges(self, other: TaskNode) -> list[str]:
"""Compare the edges of this task node to those from the same task
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so: it's a comparison between two TaskNode objects. It's not a complete comparison, so it's not __eq__, but it's one of several important kinds of partials comparisons I can imagine (and the others are I think just straightforward comparisons of attributes, so I don't think they need their own methods).

result += self.metadata_output.diff(other.metadata_output, "metadata output")
return result

def _imported_and_configured(self, rebuild: bool) -> TaskNode:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've discussed this live, but for posterity:

  • The only way tasks can be in a state in which they aren't imported and configured is after reading from disk (and the tasks must have originally been imported and configured before being written).
  • If you have a serialized PipelineGraph on disk, it's a massive optimization to avoid importing all of the tasks, especially if you just want to visualize or otherwise ask questions about it, or only want a subset (either labeled or algorithmically defined) of the tasks (that would then be imported).

So the question is really whether we expect to end up with a workflow for pipelines in which PipelineGraphs get serialized once and then read many times (e.g. by SCons in drp_pipe and the like, or at the start of a campaign by CM tools). If so, I think the "not imported" state is useful enough to keep supporting (and how fast it is to read pipeline graphs in that state may be a compelling reasons to make serialized pipeline graphs part of our workflow). But as long as people are almost always reading pipeline YAML files from disk instead, all tasks will have to be imported anyway, and this state is just a lot of complexity that isn't carrying its own weight.

Right now my plan is to disable high-level access to this state to avoid making a long-term maintenance promise (which we might regret) until at least DM-33034, and decide then whether to open up access to it, remove it completely, or punt some more.

"""


def expect_not_none(value: _U | None, msg: str) -> _U:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how it'd do in generic contexts, but if you pass it a type that is annotated as some concrete type or None, it is smart enough to recognize that this function guarantees that only that concrete type will be returned, and that's what makes it useful here.

@TallJimbo TallJimbo force-pushed the tickets/DM-33027 branch 5 times, most recently from 64a3f8c to 1253e4f Compare July 24, 2023 18:05
Copy link
Contributor

@natelust natelust left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main concern was the complexity and diversity of classes and states making it hard to approach, even if each peice is itself strait forward. I have a few more comments in places, but this is certainly more than refined enough to go in, and I think any other concerns will shake out over time as we use and adapt this, finding how and what we use.

graph = pipeline.to_graph(registry=butler.registry)

The ``registry`` argument here is optional, but without it the graph will be incomplete ("unresolved") and the pipeline will not be checked for correctness until the `~PipelineGraph.resolve` method is called.
Resolving a graph compares all of the task connections (which are edges in the graph) that reference each dataset type to each other and to the registry's definition of that dataset to determine a common graph-wide definition.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why cant you compare the dataset type to each other w/o a registry?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first problem comes from allowing the use of "skypix" as a dimension placeholder - that makes it impossible to always construct a DatasetType from just a connection without a registry that has at least those dataset types already registered.

The other problem was that I was just wary of making it possible to resolve graphs in a way that was inconsistent with the registry.

I could imagine it being useful to work around the first problem (e.g. just pick the common skypix system from the universe) and we could just choose to ignore the second, but I'd like to have a good reason for it, and my thinking was that in practice a registry should always be available.


The ``registry`` argument here is optional, but without it the graph will be incomplete ("unresolved") and the pipeline will not be checked for correctness until the `~PipelineGraph.resolve` method is called.
Resolving a graph compares all of the task connections (which are edges in the graph) that reference each dataset type to each other and to the registry's definition of that dataset to determine a common graph-wide definition.
A definition in the registry always takes precedence, followed by the output connection that produces the dataset type (if there is one).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are conflicts silently dropped?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the beauty of having a bipartite graph - the conflicts are resolved in the dataset type nodes, but the differences are remembered by having the original storage classes in the edges. And they are only resolved when the storage classes are convertible in the right direction - otherwise you get an error.


task_node = graph.tasks["task_a"]
for edge in task.inputs.values():
dataset_type_node = graph.dataset_types[edge.parent_dataset_type_name]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this access pattern one already needs to know the names of all the dataset types an edge contains? I know one can do this with lists, loops, getatter etc, its just that this little example makes it seem simpler than it is.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each edge is associated with only one dataset type; the graph structure permits multiple edges between the same nodes (this is what networkx calls a MultiDiGraph instead of just a DiGraph). Does that answer your question? I'm not sure I understood it otherwise.


- `~PipelineGraph.add_task` adds a brand new task from a `.PipelineTask` type object and its configuration;
- `~PipelineGraph.add_task_nodes` adds one or more tasks from a different `PipelineGraph` instance;
- `~PipelineGraph.reconfigure_tasks` replaces the configuration of an existing task with new configuration (note that this is typically less convenient than adding config *overrides* to a `Pipeline` object, because all configuration in a `PipelineGraph` must be validated and frozen);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this potentially change the whole graph structure as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it certainly can.


Notes
-----
The `universe` attribute are set to ``registry.dimensions`` and used to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking of this more, we made some accommodations for alternate universes in QG (older ones), is that something we should consider in PipelineTask? I think it is a bit different in that we save and restore QG, but I wanted to raise it in case you can think of something with the bigger picture you have of all of this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is consistent with what QG does with universes - we always want to make it the same as the registry universe when we resolve the graph, but if we save and read a graph later (whether that's a QG, a pipeline graph, or a future QG with an embedded pipeline graph), we want to remember the universe originally used. But in that case we also don't want to re-resolve.

new_dataset_type_node = DatasetTypeNode._from_edges(
node_key, self._xgraph, registry, previous=dataset_type_node
)
if new_dataset_type_node is not dataset_type_node:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these really singletons such that they can be compared with is?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are not singletons, but the usage of is here is intentional; they're immutable so updates in-place are impossible, and hence is provides a check on whether the DatasetTypeNode._from_edges short-circuited by returning its previous argument.

I could have also written this by making _from_edges return None when previous could be used as-is, but then I'd have needed to use typing.overload to make it clear that when the previous argument is not passed it is never None.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like that comment should be in the code.

`import_and_configure` method). If `False`, some `TaskNode` and
`TaskInitNode` attributes will not be available, but reading may be
much faster.
check_edges_unchanged : `bool`, optional
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two options seem like they should be related but are separate, and the doc string does not really help me understand why they are not, or really what they are doing

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they're closely related but in a way that's hard to map into a function signature:

  • check_edges_unchanged and assume_edges_unchanged do nothing if import_and_configure=False;
  • check_edges_unchanged=True and assume_edges_unchanged=True is an error;
  • check_edges_unchanged=False and assume_edges_unchanged=False means "allow the post-import-and-configure edges to override the current ones".

So maybe it'd be better if I made this all into four-element enum or enum-like string argument; how about this?

class ImportBehavior(enum.Enum):
    DO_NOT_IMPORT = enum.auto()
    """Do not import tasks and instantiate their config and connection objects."""

    REQUIRE_CONSISTENT_EDGES = enum.auto()
    """Import tasks and instantiate their config and connection objects, and
    check that the connections still define the same edges.
    """

    ASSUME_CONSISTENT_EDGES = enum.auto()
    """Import tasks and instantiate their config and connection objects, but
    do not check that the connections still define the same edges.

    This is safe only when the caller knows the task definition has not changed
    since the pipeline graph was persisted, such as when it was saved and loaded
    with the same pipeline version.
    """

    OVERRIDE_EDGES = enum.auto()
    """Import tasks and instantiate their config and connection objects, and allow
    the edges defined in those connections to override those in the persisted graph.

    This may cause dataset type nodes to be unresolved, since resolutions consistent
    with the original edges may be invalidated.
    """

?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've now pushed a version with this change. It does not use the new enum in reconfigure_tasks, since there are fewer options in play there and the current state seems less confusing.

"""
self._xgraph = xgraph if xgraph is not None else networkx.MultiDiGraph()
self._sorted_keys: Sequence[NodeKey] | None = None
self._task_subsets = task_subsets if task_subsets is not None else {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to take explicit ownership of a dict here, to prevent side effects?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I definitely would if this was a public interface, but given that it's private and it needs have been constructed with the exact same xgraph instance that's passed in, I think it's safe to leave this up to the caller.

self._dataset_types = DatasetTypeMappingView(self._xgraph)
self._raw_data_id: dict[str, Any]
if isinstance(data_id, DataCoordinate):
universe = data_id.universe
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should you check if there is a conflict in the universe from the supplied dataId and the supplied universe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not worried about it in practice but I suppose it doesn't hurt.

_LOG.error("Configuration validation failed for task %s (%s)", label, taskName)
raise
config.freeze()
if connections is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh no, sorry that if you dont have connections yet assume the config has not been validated, though I guess there is little harm in doing it again even if it was

Much of the code changed here is actually stuff I want to deprecate in
the future, once PipelineGraph has been integrated with more things.
In the meantime, this addresses much the duplication caused by adding
PipelineGraph.
This provides some symmetry with TaskNode and a bit of convenience.
We can't tell Sphinx about having lifted the PipelineGraph symbol
to lsst.pipe.base, unfortunately, as it doesn't like duplicates and
can't do aliases.
This is what most uses will want to do anyway, unless they don't have
a data repository.
@TallJimbo TallJimbo merged commit d81147c into main Aug 4, 2023
14 checks passed
@TallJimbo TallJimbo deleted the tickets/DM-33027 branch August 4, 2023 19:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants