-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DM-28036 Support URI in pipelines #167
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,14 +30,17 @@ | |
# ------------------------------- | ||
from dataclasses import dataclass | ||
from types import MappingProxyType | ||
from typing import Mapping, Set, Union, Generator, TYPE_CHECKING, Optional | ||
from typing import Dict, Mapping, Set, Union, Generator, TYPE_CHECKING, Optional, Tuple | ||
|
||
import copy | ||
import re | ||
import os | ||
import urllib.parse | ||
import warnings | ||
|
||
# ----------------------------- | ||
# Imports for other modules -- | ||
from lsst.daf.butler import DatasetType, NamedValueSet, Registry, SkyPixDimension | ||
from lsst.daf.butler import DatasetType, NamedValueSet, Registry, SkyPixDimension, ButlerURI | ||
from lsst.utils import doImport | ||
from .configOverrides import ConfigOverrides | ||
from .connections import iterConnections | ||
|
@@ -47,7 +50,7 @@ | |
from . import pipeTools | ||
|
||
if TYPE_CHECKING: # Imports needed only for type annotations; may be circular. | ||
from lsst.obs.base.instrument import Instrument | ||
from lsst.obs.base import Instrument | ||
|
||
# ---------------------------------- | ||
# Local non-exported definitions -- | ||
|
@@ -111,13 +114,13 @@ def __init__(self, taskName, config, taskClass=None, label=""): | |
self.connections = config.connections.ConnectionsClass(config=config) | ||
|
||
@property | ||
def configDatasetName(self): | ||
def configDatasetName(self) -> str: | ||
"""Name of a dataset type for configuration of this task (`str`) | ||
""" | ||
return self.label + "_config" | ||
|
||
@property | ||
def metadataDatasetName(self): | ||
def metadataDatasetName(self) -> Optional[str]: | ||
"""Name of a dataset type for metadata of this task, `None` if | ||
metadata is not to be saved (`str`) | ||
""" | ||
|
@@ -168,7 +171,7 @@ def fromFile(cls, filename: str) -> Pipeline: | |
A path that points to a pipeline defined in yaml format. This | ||
filename may also supply additional labels to be used in | ||
subsetting the loaded Pipeline. These labels are separated from | ||
the path by a colon, and may be specified as a comma separated | ||
the path by a \\#, and may be specified as a comma separated | ||
list, or a range denoted as beginning..end. Beginning or end may | ||
be empty, in which case the range will be a half open interval. | ||
Unlike python iteration bounds, end bounds are *INCLUDED*. Note | ||
|
@@ -189,13 +192,49 @@ def fromFile(cls, filename: str) -> Pipeline: | |
string based matching due to the nature of contracts and may prune more | ||
than it should. | ||
""" | ||
# Split up the filename and any labels that were supplied | ||
filename, labelSpecifier = cls._parseFileSpecifier(filename) | ||
pipeline: Pipeline = cls.fromIR(pipelineIR.PipelineIR.from_file(filename)) | ||
return cls.from_uri(filename) | ||
|
||
@classmethod | ||
def from_uri(cls, uri: Union[str, ButlerURI]) -> Pipeline: | ||
"""Load a pipeline defined in a pipeline yaml file at a location | ||
specified by a URI. | ||
|
||
Parameters | ||
---------- | ||
uri: `str` or `ButlerURI` | ||
If a string is supplied this should be a URI path that points to a | ||
pipeline defined in yaml format. This uri may also supply | ||
additional labels to be used in subsetting the loaded Pipeline. | ||
These labels are separated from the path by a \\#, and may be | ||
specified as a comma separated list, or a range denoted as | ||
beginning..end. Beginning or end may be empty, in which case the | ||
range will be a half open interval. Unlike python iteration | ||
bounds, end bounds are *INCLUDED*. Note that range based selection | ||
is not well defined for pipelines that are not linear in nature, | ||
and correct behavior is not guaranteed, or may vary from run to | ||
run. The same specifiers can be used with a ButlerURI object, by | ||
being the sole contents in the fragments attribute. | ||
|
||
Returns | ||
------- | ||
pipeline: `Pipeline` | ||
The pipeline loaded from specified location with appropriate (if | ||
any) subsetting | ||
|
||
Notes | ||
----- | ||
This method attempts to prune any contracts that contain labels which | ||
are not in the declared subset of labels. This pruning is done using a | ||
string based matching due to the nature of contracts and may prune more | ||
than it should. | ||
""" | ||
# Split up the uri and any labels that were supplied | ||
uri, label_specifier = cls._parse_file_specifier(uri) | ||
pipeline: Pipeline = cls.fromIR(pipelineIR.PipelineIR.from_uri(uri)) | ||
|
||
# If there are labels supplied, only keep those | ||
if labelSpecifier is not None: | ||
pipeline = pipeline.subsetFromLabels(labelSpecifier) | ||
if label_specifier is not None: | ||
pipeline = pipeline.subsetFromLabels(label_specifier) | ||
return pipeline | ||
|
||
def subsetFromLabels(self, labelSpecifier: LabelSpecifier) -> Pipeline: | ||
|
@@ -263,42 +302,50 @@ def subsetFromLabels(self, labelSpecifier: LabelSpecifier) -> Pipeline: | |
return Pipeline.fromIR(self._pipelineIR.subset_from_labels(labelSet)) | ||
|
||
@staticmethod | ||
def _parseFileSpecifier(fileSpecifer): | ||
"""Split appart a filename path from label subsets | ||
def _parse_file_specifier(uri: Union[str, ButlerURI] | ||
) -> Tuple[ButlerURI, Optional[LabelSpecifier]]: | ||
"""Split appart a uri and any possible label subsets | ||
""" | ||
split = fileSpecifer.split(':') | ||
# There is only a filename, return just that | ||
if len(split) == 1: | ||
return fileSpecifer, None | ||
# More than one specifier provided, bail out | ||
if len(split) > 2: | ||
raise ValueError("Only one : is allowed when specifying a pipeline to load") | ||
else: | ||
labelSubset: str | ||
filename: str | ||
filename, labelSubset = split[0], split[1] | ||
if isinstance(uri, str): | ||
# This is to support legacy pipelines during transition | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Adding an example of what this matches to the comment would be nice; I've already forgotten. |
||
uri, num_replace = re.subn("[:](?!\\/\\/)", "#", uri) | ||
if num_replace: | ||
warnings.warn(f"The pipeline file {uri} seems to use the legacy : to separate " | ||
"labels, this is deprecated and will be removed after June 2021, please use " | ||
"# instead.", | ||
category=FutureWarning) | ||
if uri.count("#") > 1: | ||
raise ValueError("Only one set of labels is allowed when specifying a pipeline to load") | ||
uri = ButlerURI(uri) | ||
label_subset = uri.fragment or None | ||
|
||
specifier: Optional[LabelSpecifier] | ||
if label_subset is not None: | ||
label_subset = urllib.parse.unquote(label_subset) | ||
args: Dict[str, Union[Set[str], str, None]] | ||
# labels supplied as a list | ||
if ',' in labelSubset: | ||
if '..' in labelSubset: | ||
if ',' in label_subset: | ||
if '..' in label_subset: | ||
raise ValueError("Can only specify a list of labels or a range" | ||
"when loading a Pipline not both") | ||
labels = set(labelSubset.split(",")) | ||
specifier = LabelSpecifier(labels=labels) | ||
args = {"labels": set(label_subset.split(","))} | ||
# labels supplied as a range | ||
elif '..' in labelSubset: | ||
# Try to destructure the labelSubset, this will fail if more | ||
elif '..' in label_subset: | ||
# Try to de-structure the labelSubset, this will fail if more | ||
# than one range is specified | ||
try: | ||
begin, end = labelSubset.split("..") | ||
except ValueError: | ||
begin, end, *rest = label_subset.split("..") | ||
if rest: | ||
raise ValueError("Only one range can be specified when loading a pipeline") | ||
specifier = LabelSpecifier(begin=begin if begin else None, end=end if end else None) | ||
args = {"begin": begin if begin else None, "end": end if end else None} | ||
# Assume anything else is a single label | ||
else: | ||
labels = {labelSubset} | ||
specifier = LabelSpecifier(labels=labels) | ||
args = {"labels": {label_subset}} | ||
|
||
return filename, specifier | ||
specifier = LabelSpecifier(**args) | ||
else: | ||
specifier = None | ||
|
||
return uri, specifier | ||
|
||
@classmethod | ||
def fromString(cls, pipeline_string: str) -> Pipeline: | ||
|
@@ -346,12 +393,12 @@ def fromPipeline(cls, pipeline: pipelineIR.PipelineIR) -> Pipeline: | |
------- | ||
pipeline: `Pipeline` | ||
""" | ||
return cls.fromIR(copy.deep_copy(pipeline._pipelineIR)) | ||
return cls.fromIR(copy.deepcopy(pipeline._pipelineIR)) | ||
|
||
def __str__(self) -> str: | ||
return str(self._pipelineIR) | ||
|
||
def addInstrument(self, instrument: Union[Instrument, str]): | ||
def addInstrument(self, instrument: Union[Instrument, str]) -> None: | ||
"""Add an instrument to the pipeline, or replace an instrument that is | ||
already defined. | ||
|
||
|
@@ -370,7 +417,7 @@ def addInstrument(self, instrument: Union[Instrument, str]): | |
instrument = f"{instrument.__module__}.{instrument.__qualname__}" | ||
self._pipelineIR.instrument = instrument | ||
|
||
def getInstrument(self): | ||
def getInstrument(self) -> Instrument: | ||
"""Get the instrument from the pipeline. | ||
|
||
Returns | ||
|
@@ -382,7 +429,7 @@ def getInstrument(self): | |
""" | ||
return self._pipelineIR.instrument | ||
|
||
def addTask(self, task: Union[PipelineTask, str], label: str): | ||
def addTask(self, task: Union[PipelineTask, str], label: str) -> None: | ||
"""Add a new task to the pipeline, or replace a task that is already | ||
associated with the supplied label. | ||
|
||
|
@@ -410,7 +457,7 @@ def addTask(self, task: Union[PipelineTask, str], label: str): | |
label = task._DefaultName | ||
self._pipelineIR.tasks[label] = pipelineIR.TaskIR(label, taskName) | ||
|
||
def removeTask(self, label: str): | ||
def removeTask(self, label: str) -> None: | ||
"""Remove a task from the pipeline. | ||
|
||
Parameters | ||
|
@@ -426,7 +473,7 @@ def removeTask(self, label: str): | |
""" | ||
self._pipelineIR.tasks.pop(label) | ||
|
||
def addConfigOverride(self, label: str, key: str, value: object): | ||
def addConfigOverride(self, label: str, key: str, value: object) -> None: | ||
"""Apply single config override. | ||
|
||
Parameters | ||
|
@@ -440,7 +487,7 @@ def addConfigOverride(self, label: str, key: str, value: object): | |
""" | ||
self._addConfigImpl(label, pipelineIR.ConfigIR(rest={key: value})) | ||
|
||
def addConfigFile(self, label: str, filename: str): | ||
def addConfigFile(self, label: str, filename: str) -> None: | ||
"""Add overrides from a specified file. | ||
|
||
Parameters | ||
|
@@ -453,7 +500,7 @@ def addConfigFile(self, label: str, filename: str): | |
""" | ||
self._addConfigImpl(label, pipelineIR.ConfigIR(file=[filename])) | ||
|
||
def addConfigPython(self, label: str, pythonString: str): | ||
def addConfigPython(self, label: str, pythonString: str) -> None: | ||
"""Add Overrides by running a snippet of python code against a config. | ||
|
||
Parameters | ||
|
@@ -467,7 +514,7 @@ def addConfigPython(self, label: str, pythonString: str): | |
""" | ||
self._addConfigImpl(label, pipelineIR.ConfigIR(python=pythonString)) | ||
|
||
def _addConfigImpl(self, label: str, newConfig: pipelineIR.ConfigIR): | ||
def _addConfigImpl(self, label: str, newConfig: pipelineIR.ConfigIR) -> None: | ||
if label == "parameters": | ||
if newConfig.rest.keys() - self._pipelineIR.parameters.mapping.keys(): | ||
raise ValueError("Cannot override parameters that are not defined in pipeline") | ||
|
@@ -481,10 +528,13 @@ def _addConfigImpl(self, label: str, newConfig: pipelineIR.ConfigIR): | |
raise LookupError(f"There are no tasks labeled '{label}' in the pipeline") | ||
self._pipelineIR.tasks[label].add_or_update_config(newConfig) | ||
|
||
def toFile(self, filename: str): | ||
def toFile(self, filename: str) -> None: | ||
self._pipelineIR.to_file(filename) | ||
|
||
def toExpandedPipeline(self) -> Generator[TaskDef]: | ||
def write_to_uri(self, uri: Union[str, ButlerURI]) -> None: | ||
self._pipelineIR.write_to_uri(uri) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pre-existing line below: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The typing "guidelines" I have read have been "Be liberal in what you accept, be specific with what you return" so I try to adhere to that as much as possible. It does not hurt to have more info about what is being returned, but it is a pain to get more specific later (especially in a case where it suddenly does matter) |
||
def toExpandedPipeline(self) -> Generator[TaskDef, None, None]: | ||
"""Returns a generator of TaskDefs which can be used to create quantum | ||
graphs. | ||
|
||
|
@@ -545,7 +595,7 @@ def toExpandedPipeline(self) -> Generator[TaskDef]: | |
def __len__(self): | ||
return len(self._pipelineIR.tasks) | ||
|
||
def __eq__(self, other: "Pipeline"): | ||
def __eq__(self, other: object): | ||
if not isinstance(other, Pipeline): | ||
return False | ||
return self._pipelineIR == other._pipelineIR | ||
|
@@ -621,7 +671,7 @@ def fromTaskDef(cls, taskDef: TaskDef, *, registry: Registry) -> TaskDatasetType | |
types: `TaskDatasetTypes` | ||
The dataset types used by this task. | ||
""" | ||
def makeDatasetTypesSet(connectionType, freeze=True): | ||
def makeDatasetTypesSet(connectionType: str, freeze: bool = True) -> NamedValueSet[DatasetType]: | ||
"""Constructs a set of true `DatasetType` objects | ||
|
||
Parameters | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
appart -> apart