Skip to content

Commit

Permalink
Merge branch 'tickets/DM-28036'
Browse files Browse the repository at this point in the history
  • Loading branch information
natelust committed Feb 15, 2021
2 parents 3a66313 + 740869e commit 560fb7b
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 71 deletions.
150 changes: 100 additions & 50 deletions python/lsst/pipe/base/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,17 @@
# -------------------------------
from dataclasses import dataclass
from types import MappingProxyType
from typing import Mapping, Set, Union, Generator, TYPE_CHECKING, Optional
from typing import Dict, Mapping, Set, Union, Generator, TYPE_CHECKING, Optional, Tuple

import copy
import re
import os
import urllib.parse
import warnings

# -----------------------------
# Imports for other modules --
from lsst.daf.butler import DatasetType, NamedValueSet, Registry, SkyPixDimension
from lsst.daf.butler import DatasetType, NamedValueSet, Registry, SkyPixDimension, ButlerURI
from lsst.utils import doImport
from .configOverrides import ConfigOverrides
from .connections import iterConnections
Expand All @@ -47,7 +50,7 @@
from . import pipeTools

if TYPE_CHECKING: # Imports needed only for type annotations; may be circular.
from lsst.obs.base.instrument import Instrument
from lsst.obs.base import Instrument

# ----------------------------------
# Local non-exported definitions --
Expand Down Expand Up @@ -111,13 +114,13 @@ def __init__(self, taskName, config, taskClass=None, label=""):
self.connections = config.connections.ConnectionsClass(config=config)

@property
def configDatasetName(self):
def configDatasetName(self) -> str:
"""Name of a dataset type for configuration of this task (`str`)
"""
return self.label + "_config"

@property
def metadataDatasetName(self):
def metadataDatasetName(self) -> Optional[str]:
"""Name of a dataset type for metadata of this task, `None` if
metadata is not to be saved (`str`)
"""
Expand Down Expand Up @@ -168,7 +171,7 @@ def fromFile(cls, filename: str) -> Pipeline:
A path that points to a pipeline defined in yaml format. This
filename may also supply additional labels to be used in
subsetting the loaded Pipeline. These labels are separated from
the path by a colon, and may be specified as a comma separated
the path by a \\#, and may be specified as a comma separated
list, or a range denoted as beginning..end. Beginning or end may
be empty, in which case the range will be a half open interval.
Unlike python iteration bounds, end bounds are *INCLUDED*. Note
Expand All @@ -189,13 +192,49 @@ def fromFile(cls, filename: str) -> Pipeline:
string based matching due to the nature of contracts and may prune more
than it should.
"""
# Split up the filename and any labels that were supplied
filename, labelSpecifier = cls._parseFileSpecifier(filename)
pipeline: Pipeline = cls.fromIR(pipelineIR.PipelineIR.from_file(filename))
return cls.from_uri(filename)

@classmethod
def from_uri(cls, uri: Union[str, ButlerURI]) -> Pipeline:
"""Load a pipeline defined in a pipeline yaml file at a location
specified by a URI.
Parameters
----------
uri: `str` or `ButlerURI`
If a string is supplied this should be a URI path that points to a
pipeline defined in yaml format. This uri may also supply
additional labels to be used in subsetting the loaded Pipeline.
These labels are separated from the path by a \\#, and may be
specified as a comma separated list, or a range denoted as
beginning..end. Beginning or end may be empty, in which case the
range will be a half open interval. Unlike python iteration
bounds, end bounds are *INCLUDED*. Note that range based selection
is not well defined for pipelines that are not linear in nature,
and correct behavior is not guaranteed, or may vary from run to
run. The same specifiers can be used with a ButlerURI object, by
being the sole contents in the fragments attribute.
Returns
-------
pipeline: `Pipeline`
The pipeline loaded from specified location with appropriate (if
any) subsetting
Notes
-----
This method attempts to prune any contracts that contain labels which
are not in the declared subset of labels. This pruning is done using a
string based matching due to the nature of contracts and may prune more
than it should.
"""
# Split up the uri and any labels that were supplied
uri, label_specifier = cls._parse_file_specifier(uri)
pipeline: Pipeline = cls.fromIR(pipelineIR.PipelineIR.from_uri(uri))

# If there are labels supplied, only keep those
if labelSpecifier is not None:
pipeline = pipeline.subsetFromLabels(labelSpecifier)
if label_specifier is not None:
pipeline = pipeline.subsetFromLabels(label_specifier)
return pipeline

def subsetFromLabels(self, labelSpecifier: LabelSpecifier) -> Pipeline:
Expand Down Expand Up @@ -263,42 +302,50 @@ def subsetFromLabels(self, labelSpecifier: LabelSpecifier) -> Pipeline:
return Pipeline.fromIR(self._pipelineIR.subset_from_labels(labelSet))

@staticmethod
def _parseFileSpecifier(fileSpecifer):
"""Split appart a filename path from label subsets
def _parse_file_specifier(uri: Union[str, ButlerURI]
) -> Tuple[ButlerURI, Optional[LabelSpecifier]]:
"""Split appart a uri and any possible label subsets
"""
split = fileSpecifer.split(':')
# There is only a filename, return just that
if len(split) == 1:
return fileSpecifer, None
# More than one specifier provided, bail out
if len(split) > 2:
raise ValueError("Only one : is allowed when specifying a pipeline to load")
else:
labelSubset: str
filename: str
filename, labelSubset = split[0], split[1]
if isinstance(uri, str):
# This is to support legacy pipelines during transition
uri, num_replace = re.subn("[:](?!\\/\\/)", "#", uri)
if num_replace:
warnings.warn(f"The pipeline file {uri} seems to use the legacy : to separate "
"labels, this is deprecated and will be removed after June 2021, please use "
"# instead.",
category=FutureWarning)
if uri.count("#") > 1:
raise ValueError("Only one set of labels is allowed when specifying a pipeline to load")
uri = ButlerURI(uri)
label_subset = uri.fragment or None

specifier: Optional[LabelSpecifier]
if label_subset is not None:
label_subset = urllib.parse.unquote(label_subset)
args: Dict[str, Union[Set[str], str, None]]
# labels supplied as a list
if ',' in labelSubset:
if '..' in labelSubset:
if ',' in label_subset:
if '..' in label_subset:
raise ValueError("Can only specify a list of labels or a range"
"when loading a Pipline not both")
labels = set(labelSubset.split(","))
specifier = LabelSpecifier(labels=labels)
args = {"labels": set(label_subset.split(","))}
# labels supplied as a range
elif '..' in labelSubset:
# Try to destructure the labelSubset, this will fail if more
elif '..' in label_subset:
# Try to de-structure the labelSubset, this will fail if more
# than one range is specified
try:
begin, end = labelSubset.split("..")
except ValueError:
begin, end, *rest = label_subset.split("..")
if rest:
raise ValueError("Only one range can be specified when loading a pipeline")
specifier = LabelSpecifier(begin=begin if begin else None, end=end if end else None)
args = {"begin": begin if begin else None, "end": end if end else None}
# Assume anything else is a single label
else:
labels = {labelSubset}
specifier = LabelSpecifier(labels=labels)
args = {"labels": {label_subset}}

return filename, specifier
specifier = LabelSpecifier(**args)
else:
specifier = None

return uri, specifier

@classmethod
def fromString(cls, pipeline_string: str) -> Pipeline:
Expand Down Expand Up @@ -346,12 +393,12 @@ def fromPipeline(cls, pipeline: pipelineIR.PipelineIR) -> Pipeline:
-------
pipeline: `Pipeline`
"""
return cls.fromIR(copy.deep_copy(pipeline._pipelineIR))
return cls.fromIR(copy.deepcopy(pipeline._pipelineIR))

def __str__(self) -> str:
return str(self._pipelineIR)

def addInstrument(self, instrument: Union[Instrument, str]):
def addInstrument(self, instrument: Union[Instrument, str]) -> None:
"""Add an instrument to the pipeline, or replace an instrument that is
already defined.
Expand All @@ -370,7 +417,7 @@ def addInstrument(self, instrument: Union[Instrument, str]):
instrument = f"{instrument.__module__}.{instrument.__qualname__}"
self._pipelineIR.instrument = instrument

def getInstrument(self):
def getInstrument(self) -> Instrument:
"""Get the instrument from the pipeline.
Returns
Expand All @@ -382,7 +429,7 @@ def getInstrument(self):
"""
return self._pipelineIR.instrument

def addTask(self, task: Union[PipelineTask, str], label: str):
def addTask(self, task: Union[PipelineTask, str], label: str) -> None:
"""Add a new task to the pipeline, or replace a task that is already
associated with the supplied label.
Expand Down Expand Up @@ -410,7 +457,7 @@ def addTask(self, task: Union[PipelineTask, str], label: str):
label = task._DefaultName
self._pipelineIR.tasks[label] = pipelineIR.TaskIR(label, taskName)

def removeTask(self, label: str):
def removeTask(self, label: str) -> None:
"""Remove a task from the pipeline.
Parameters
Expand All @@ -426,7 +473,7 @@ def removeTask(self, label: str):
"""
self._pipelineIR.tasks.pop(label)

def addConfigOverride(self, label: str, key: str, value: object):
def addConfigOverride(self, label: str, key: str, value: object) -> None:
"""Apply single config override.
Parameters
Expand All @@ -440,7 +487,7 @@ def addConfigOverride(self, label: str, key: str, value: object):
"""
self._addConfigImpl(label, pipelineIR.ConfigIR(rest={key: value}))

def addConfigFile(self, label: str, filename: str):
def addConfigFile(self, label: str, filename: str) -> None:
"""Add overrides from a specified file.
Parameters
Expand All @@ -453,7 +500,7 @@ def addConfigFile(self, label: str, filename: str):
"""
self._addConfigImpl(label, pipelineIR.ConfigIR(file=[filename]))

def addConfigPython(self, label: str, pythonString: str):
def addConfigPython(self, label: str, pythonString: str) -> None:
"""Add Overrides by running a snippet of python code against a config.
Parameters
Expand All @@ -467,7 +514,7 @@ def addConfigPython(self, label: str, pythonString: str):
"""
self._addConfigImpl(label, pipelineIR.ConfigIR(python=pythonString))

def _addConfigImpl(self, label: str, newConfig: pipelineIR.ConfigIR):
def _addConfigImpl(self, label: str, newConfig: pipelineIR.ConfigIR) -> None:
if label == "parameters":
if newConfig.rest.keys() - self._pipelineIR.parameters.mapping.keys():
raise ValueError("Cannot override parameters that are not defined in pipeline")
Expand All @@ -481,10 +528,13 @@ def _addConfigImpl(self, label: str, newConfig: pipelineIR.ConfigIR):
raise LookupError(f"There are no tasks labeled '{label}' in the pipeline")
self._pipelineIR.tasks[label].add_or_update_config(newConfig)

def toFile(self, filename: str):
def toFile(self, filename: str) -> None:
self._pipelineIR.to_file(filename)

def toExpandedPipeline(self) -> Generator[TaskDef]:
def write_to_uri(self, uri: Union[str, ButlerURI]) -> None:
self._pipelineIR.write_to_uri(uri)

def toExpandedPipeline(self) -> Generator[TaskDef, None, None]:
"""Returns a generator of TaskDefs which can be used to create quantum
graphs.
Expand Down Expand Up @@ -545,7 +595,7 @@ def toExpandedPipeline(self) -> Generator[TaskDef]:
def __len__(self):
return len(self._pipelineIR.tasks)

def __eq__(self, other: "Pipeline"):
def __eq__(self, other: object):
if not isinstance(other, Pipeline):
return False
return self._pipelineIR == other._pipelineIR
Expand Down Expand Up @@ -621,7 +671,7 @@ def fromTaskDef(cls, taskDef: TaskDef, *, registry: Registry) -> TaskDatasetType
types: `TaskDatasetTypes`
The dataset types used by this task.
"""
def makeDatasetTypesSet(connectionType, freeze=True):
def makeDatasetTypesSet(connectionType: str, freeze: bool = True) -> NamedValueSet[DatasetType]:
"""Constructs a set of true `DatasetType` objects
Parameters
Expand Down

0 comments on commit 560fb7b

Please sign in to comment.