Skip to content

Commit

Permalink
Support bind values with user query in SimplePipelineExecutor (DM-36487)
Browse files Browse the repository at this point in the history
SimplePipelineExecutor factory methods add a parameter for a bind mapping
and pass it to GraphBuilder.
  • Loading branch information
andy-slac committed Oct 21, 2022
1 parent 14023ac commit 59570d3
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 5 deletions.
1 change: 1 addition & 0 deletions doc/changes/DM-36487.api.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
`SimplePipelineExecutor` factory methods add `bind` parameter for bind values to use with the user expression.
27 changes: 22 additions & 5 deletions python/lsst/ctrl/mpexec/simple_pipeline_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@

__all__ = ("SimplePipelineExecutor",)

from typing import Any, Iterable, Iterator, List, Optional, Type, Union
from collections.abc import Iterable, Iterator, Mapping
from typing import Any, List, Optional, Type, Union

from lsst.daf.butler import Butler, CollectionType, Quantum
from lsst.pex.config import Config
Expand Down Expand Up @@ -114,7 +115,12 @@ def prep_butler(

@classmethod
def from_pipeline_filename(
cls, pipeline_filename: str, *, where: str = "", butler: Butler
cls,
pipeline_filename: str,
*,
where: str = "",
bind: Optional[Mapping[str, Any]] = None,
butler: Butler,
) -> SimplePipelineExecutor:
"""Create an executor by building a QuantumGraph from an on-disk
pipeline YAML file.
Expand All @@ -125,6 +131,9 @@ def from_pipeline_filename(
Name of the YAML file to load the pipeline definition from.
where : `str`, optional
Data ID query expression that constraints the quanta generated.
bind : `Mapping`, optional
Mapping containing literal values that should be injected into the
``where`` expression, keyed by the identifiers they replace.
butler : `Butler`
Butler that manages all I/O. `prep_butler` can be used to create
one.
Expand All @@ -136,7 +145,7 @@ def from_pipeline_filename(
`Butler`, ready for `run` to be called.
"""
pipeline = Pipeline.fromFile(pipeline_filename)
return cls.from_pipeline(pipeline, butler=butler, where=where)
return cls.from_pipeline(pipeline, butler=butler, where=where, bind=bind)

@classmethod
def from_task_class(
Expand All @@ -146,6 +155,7 @@ def from_task_class(
label: Optional[str] = None,
*,
where: str = "",
bind: Optional[Mapping[str, Any]] = None,
butler: Butler,
) -> SimplePipelineExecutor:
"""Create an executor by building a QuantumGraph from a pipeline
Expand All @@ -163,6 +173,9 @@ def from_task_class(
``task_class._DefaultName``.
where : `str`, optional
Data ID query expression that constraints the quanta generated.
bind : `Mapping`, optional
Mapping containing literal values that should be injected into the
``where`` expression, keyed by the identifiers they replace.
butler : `Butler`
Butler that manages all I/O. `prep_butler` can be used to create
one.
Expand All @@ -183,14 +196,15 @@ def from_task_class(
f"got {type(config).__name__}."
)
task_def = TaskDef(taskName=task_class.__name__, config=config, label=label, taskClass=task_class)
return cls.from_pipeline([task_def], butler=butler, where=where)
return cls.from_pipeline([task_def], butler=butler, where=where, bind=bind)

@classmethod
def from_pipeline(
cls,
pipeline: Union[Pipeline, Iterable[TaskDef]],
*,
where: str = "",
bind: Optional[Mapping[str, Any]] = None,
butler: Butler,
**kwargs: Any,
) -> SimplePipelineExecutor:
Expand All @@ -204,6 +218,9 @@ def from_pipeline(
labels and configuration.
where : `str`, optional
Data ID query expression that constraints the quanta generated.
bind : `Mapping`, optional
Mapping containing literal values that should be injected into the
``where`` expression, keyed by the identifiers they replace.
butler : `Butler`
Butler that manages all I/O. `prep_butler` can be used to create
one.
Expand All @@ -220,7 +237,7 @@ def from_pipeline(
pipeline = list(pipeline)
graph_builder = GraphBuilder(butler.registry)
quantum_graph = graph_builder.makeGraph(
pipeline, collections=butler.collections, run=butler.run, userQuery=where
pipeline, collections=butler.collections, run=butler.run, userQuery=where, bind=bind
)
return cls(quantum_graph=quantum_graph, butler=butler)

Expand Down

0 comments on commit 59570d3

Please sign in to comment.