Skip to content

Commit

Permalink
Use ResourcePath instead of ButlerURI throughout.
Browse files Browse the repository at this point in the history
Also replaces a few Union[str, ButlerURI] and similar with the new
ResourcePathExpression type alias.
  • Loading branch information
TallJimbo committed Dec 21, 2021
1 parent 347f216 commit b014171
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 83 deletions.
10 changes: 5 additions & 5 deletions doc/lsst.pipe.base/creating-a-pipeline.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ s, and discussing common conventions when creating `Pipelines`.
A Basic Pipeline
----------------

`Pipeline` documents are written using yaml syntax. If you are unfamiliar with
`Pipeline` documents are written using yaml syntax. If you are unfamiliar with
yaml, there are many guides across the internet, but the basic idea is that it
is a simple markup language to describe key, value mappings, and lists of
values (which may be further mappings).
Expand Down Expand Up @@ -109,12 +109,12 @@ configuration options that alter the way the task executes. Because
description field) some tasks may need specific configurations set to
enable/disable behavior in the context of the specific `Pipeline`.

To configure a task associated with a particular label, the value associated
To configure a task associated with a particular label, the value associated
with the label must be changed from the qualified task name to a new
sub-mapping. This new sub mapping should have two keys, ``class`` and
``config``.

The ``class`` key should point to the same qualified task name as before. The
The ``class`` key should point to the same qualified task name as before. The
value associated with the ``config`` keyword is itself a mapping where
configuration overrides are declared. The example below shows this behavior
in action.
Expand Down Expand Up @@ -371,7 +371,7 @@ extend the total `Pipeline`.

If a ``label`` declared in the the ``tasks`` section was declared in one of
the imported ``Pipelines``, one of two things happen. If the label is
associated with the same `PipelineTask` that was declared in the imported
associated with the same `PipelineTask` that was declared in the imported
pipeline, this definition will be extended. This means that any configs
declared in the imported `Pipeline` will be merged with configs declared in
the current `Pipeline` with the current declaration taking config precedence.
Expand Down Expand Up @@ -421,7 +421,7 @@ is loaded.

The simplest form of a `Pipeline` specification is the URI at which the
`Pipeline` can be found. This URI may be any supported by
`lsst.daf.butler.ButlerURI`. In the case that the pipeline resides in a file
`lsst.resources.ResourcePath`. In the case that the pipeline resides in a file
located on a filesystem accessible by the machine that will be processing the
`Pipeline` (i.e. a file URI), there is no need to preface the URI with
``file://``, a bare file path is assumed to be a file based URI.
Expand Down
8 changes: 4 additions & 4 deletions python/lsst/pipe/base/configOverrides.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from enum import Enum
from operator import attrgetter

from lsst.daf.butler import ButlerURI
from lsst.resources import ResourcePath
from lsst.utils import doImport

OverrideTypes = Enum("OverrideTypes", "Value File Python Instrument")
Expand Down Expand Up @@ -148,11 +148,11 @@ def addFileOverride(self, filename):
Parameters
----------
filename : `str` or `ButlerURI`
filename : convertible to `ResourcePath`
Path or URI to the override file. All URI schemes supported by
`ButlerURI` are supported.
`ResourcePath` are supported.
"""
self._overrides.append((OverrideTypes.File, ButlerURI(filename)))
self._overrides.append((OverrideTypes.File, ResourcePath(filename)))

def addValueOverride(self, field, value):
"""Add override for a specific field.
Expand Down
13 changes: 7 additions & 6 deletions python/lsst/pipe/base/executionButlerBuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@
from collections import defaultdict
from typing import Callable, DefaultDict, Iterable, List, Mapping, Optional, Set, Tuple, Union

from lsst.daf.butler import Butler, ButlerURI, Config, DataCoordinate, DatasetRef, DatasetType
from lsst.daf.butler import Butler, Config, DataCoordinate, DatasetRef, DatasetType
from lsst.daf.butler.core.repoRelocation import BUTLER_ROOT_TAG
from lsst.daf.butler.transfers import RepoExportContext
from lsst.resources import ResourcePath, ResourcePathExpression
from lsst.utils.introspection import get_class_of

from .graph import QuantumGraph, QuantumNode
Expand Down Expand Up @@ -142,7 +143,7 @@ def _export(
return yamlBuffer


def _setupNewButler(butler: Butler, outputLocation: ButlerURI, dirExists: bool) -> Butler:
def _setupNewButler(butler: Butler, outputLocation: ResourcePath, dirExists: bool) -> Butler:
# Set up the new butler object at the specified location
if dirExists:
# Remove the existing table, if the code got this far and this exists
Expand Down Expand Up @@ -218,7 +219,7 @@ def _import(
def buildExecutionButler(
butler: Butler,
graph: QuantumGraph,
outputLocation: Union[str, ButlerURI],
outputLocation: ResourcePathExpression,
run: str,
*,
clobber: bool = False,
Expand All @@ -242,9 +243,9 @@ def buildExecutionButler(
graph : `QuantumGraph`
Graph containing nodes that are to be exported into an execution
butler
outputLocation : `str` or `~lsst.daf.butler.ButlerURI`
outputLocation : convertible to `ResourcePath
URI Location at which the execution butler is to be exported. May be
specified as a string or a ButlerURI instance.
specified as a string or a `ResourcePath` instance.
run : `str` optional
The run collection that the exported datasets are to be placed in. If
None, the default value in registry.defaults will be used.
Expand Down Expand Up @@ -282,7 +283,7 @@ def buildExecutionButler(
Raised if specified output URI does not correspond to a directory
"""
# We know this must refer to a directory.
outputLocation = ButlerURI(outputLocation, forceDirectory=True)
outputLocation = ResourcePath(outputLocation, forceDirectory=True)

# Do this first to Fail Fast if the output exists
if (dirExists := outputLocation.exists()) and not clobber:
Expand Down
28 changes: 14 additions & 14 deletions python/lsst/pipe/base/graph/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@
)

import networkx as nx
from lsst.daf.butler import ButlerURI, DatasetRef, DimensionRecordsAccumulator, DimensionUniverse, Quantum
from lsst.daf.butler import DatasetRef, DimensionRecordsAccumulator, DimensionUniverse, Quantum
from lsst.resources import ResourcePath, ResourcePathExpression
from networkx.drawing.nx_agraph import write_dot

from ..connections import iterConnections
Expand Down Expand Up @@ -717,14 +718,14 @@ def saveUri(self, uri):
Parameters
----------
uri : `ButlerURI` or `str`
uri : convertible to `ResourcePath`
URI to where the graph should be saved.
"""
buffer = self._buildSaveObject()
butlerUri = ButlerURI(uri)
if butlerUri.getExtension() not in (".qgraph"):
path = ResourcePath(uri)
if path.getExtension() not in (".qgraph"):
raise TypeError(f"Can currently only save a graph in qgraph format not {uri}")
butlerUri.write(buffer) # type: ignore # Ignore because bytearray is safe to use in place of bytes
path.write(buffer) # type: ignore # Ignore because bytearray is safe to use in place of bytes

@property
def metadata(self) -> Optional[MappingProxyType[str, Any]]:
Expand All @@ -736,7 +737,7 @@ def metadata(self) -> Optional[MappingProxyType[str, Any]]:
@classmethod
def loadUri(
cls,
uri: Union[ButlerURI, str],
uri: ResourcePathExpression,
universe: DimensionUniverse,
nodes: Optional[Iterable[Union[str, uuid.UUID]]] = None,
graphID: Optional[BuildId] = None,
Expand All @@ -746,7 +747,7 @@ def loadUri(
Parameters
----------
uri : `ButlerURI` or `str`
uri : convertible to `ResourcePath`
URI from where to load the graph.
universe: `~lsst.daf.butler.DimensionUniverse`
DimensionUniverse instance, not used by the method itself but
Expand Down Expand Up @@ -789,8 +790,8 @@ def loadUri(
initialization. To make sure that DimensionUniverse exists this method
accepts dummy DimensionUniverse argument.
"""
uri = ButlerURI(uri)
# With ButlerURI we have the choice of always using a local file
uri = ResourcePath(uri)
# With ResourcePath we have the choice of always using a local file
# or reading in the bytes directly. Reading in bytes can be more
# efficient for reasonably-sized pickle files when the resource
# is remote. For now use the local file variant. For a local file
Expand All @@ -810,16 +811,15 @@ def loadUri(
return qgraph

@classmethod
def readHeader(cls, uri: Union[ButlerURI, str], minimumVersion: int = 3) -> Optional[str]:
def readHeader(cls, uri: ResourcePathExpression, minimumVersion: int = 3) -> Optional[str]:
"""Read the header of a `QuantumGraph` pointed to by the uri parameter
and return it as a string.
Parameters
----------
uri : `~lsst.daf.butler.ButlerURI` or `str`
uri : convertible to `ResourcePath`
The location of the `QuantumGraph` to load. If the argument is a
string, it must correspond to a valid `~lsst.daf.butler.ButlerURI`
path.
string, it must correspond to a valid `ResourcePath` path.
minimumVersion : int
Minimum version of a save file to load. Set to -1 to load all
versions. Older versions may need to be loaded, and re-saved
Expand All @@ -839,7 +839,7 @@ def readHeader(cls, uri: Union[ButlerURI, str], minimumVersion: int = 3) -> Opti
Raised if the extention of the file specified by uri is not a
`QuantumGraph` extention.
"""
uri = ButlerURI(uri)
uri = ResourcePath(uri)
if uri.getExtension() in (".pickle", ".pkl"):
raise ValueError("Reading a header from a pickle save is not supported")
elif uri.getExtension() in (".qgraph"):
Expand Down
65 changes: 33 additions & 32 deletions python/lsst/pipe/base/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@

# -----------------------------
# Imports for other modules --
from lsst.daf.butler import ButlerURI, DatasetType, NamedValueSet, Registry, SkyPixDimension
from lsst.daf.butler import DatasetType, NamedValueSet, Registry, SkyPixDimension
from lsst.resources import ResourcePath, ResourcePathExpression
from lsst.utils import doImport
from lsst.utils.introspection import get_full_type_name

Expand Down Expand Up @@ -245,27 +246,27 @@ def fromFile(cls, filename: str) -> Pipeline:
return cls.from_uri(filename)

@classmethod
def from_uri(cls, uri: Union[str, ButlerURI]) -> Pipeline:
def from_uri(cls, uri: ResourcePathExpression) -> Pipeline:
"""Load a pipeline defined in a pipeline yaml file at a location
specified by a URI.
Parameters
----------
uri: `str` or `ButlerURI`
If a string is supplied this should be a URI path that points to a
pipeline defined in yaml format, either as a direct path to the yaml
file, or as a directory containing a "pipeline.yaml" file (the form
used by `write_to_uri` with ``expand=True``). This uri may also
supply additional labels to be used in subsetting the loaded
Pipeline. These labels are separated from the path by a \\#, and
may be specified as a comma separated list, or a range denoted as
beginning..end. Beginning or end may be empty, in which case the
range will be a half open interval. Unlike python iteration bounds,
end bounds are *INCLUDED*. Note that range based selection is not
well defined for pipelines that are not linear in nature, and
correct behavior is not guaranteed, or may vary from run to run. The
same specifiers can be used with a ButlerURI object, by being the
sole contents in the fragments attribute.
uri: convertible to `ResourcePath`
If a string is supplied this should be a URI path that points to a
pipeline defined in yaml format, either as a direct path to the
yaml file, or as a directory containing a "pipeline.yaml" file (the
form used by `write_to_uri` with ``expand=True``). This uri may
also supply additional labels to be used in subsetting the loaded
Pipeline. These labels are separated from the path by a \\#, and
may be specified as a comma separated list, or a range denoted as
beginning..end. Beginning or end may be empty, in which case the
range will be a half open interval. Unlike python iteration bounds,
end bounds are *INCLUDED*. Note that range based selection is not
well defined for pipelines that are not linear in nature, and
correct behavior is not guaranteed, or may vary from run to run.
The same specifiers can be used with a `ResourcePath` object, by
being the sole contents in the fragments attribute.
Returns
-------
Expand Down Expand Up @@ -359,7 +360,7 @@ def subsetFromLabels(self, labelSpecifier: LabelSpecifier) -> Pipeline:
return Pipeline.fromIR(self._pipelineIR.subset_from_labels(labelSet))

@staticmethod
def _parse_file_specifier(uri: Union[str, ButlerURI]) -> Tuple[ButlerURI, Optional[LabelSpecifier]]:
def _parse_file_specifier(uri: ResourcePathExpression) -> Tuple[ResourcePath, Optional[LabelSpecifier]]:
"""Split appart a uri and any possible label subsets"""
if isinstance(uri, str):
# This is to support legacy pipelines during transition
Expand All @@ -373,9 +374,9 @@ def _parse_file_specifier(uri: Union[str, ButlerURI]) -> Tuple[ButlerURI, Option
)
if uri.count("#") > 1:
raise ValueError("Only one set of labels is allowed when specifying a pipeline to load")
uri = ButlerURI(uri)
uri = ResourcePath(uri)
elif isinstance(uri, Path):
uri = ButlerURI(uri)
uri = ResourcePath(uri)
label_subset = uri.fragment or None

specifier: Optional[LabelSpecifier]
Expand Down Expand Up @@ -595,19 +596,19 @@ def toFile(self, filename: str) -> None:

def write_to_uri(
self,
uri: Union[str, ButlerURI],
uri: ResourcePathExpression,
expand: bool = False,
task_defs: Optional[Iterable[TaskDef]] = None,
) -> None:
"""Write the pipeline to a file or directory.
Parameters
----------
uri : `str` or `ButlerURI`
URI to write to; may have any scheme with `ButlerURI` write
or no scheme for a local file/directory. Should have a ``.yaml``
extension if ``expand=False`` and a trailing slash (indicating
a directory-like URI) if ``expand=True``.
uri : convertible to `ResourcePath`
URI to write to; may have any scheme with `ResourcePath` write
support or no scheme for a local file/directory. Should have a
``.yaml`` extension if ``expand=False`` and a trailing slash
(indicating a directory-like URI) if ``expand=True``.
expand : `bool`, optional
If `False`, write the pipeline to a single YAML file with
references to configuration files and other config overrides
Expand All @@ -627,7 +628,7 @@ def write_to_uri(
raise RuntimeError(
f"Expanded pipelines are written to directories, not YAML files like {uri}."
)
self._write_expanded_dir(ButlerURI(uri, forceDirectory=True), task_defs=task_defs)
self._write_expanded_dir(ResourcePath(uri, forceDirectory=True), task_defs=task_defs)
else:
self._pipelineIR.write_to_uri(uri)

Expand Down Expand Up @@ -741,16 +742,16 @@ def description(self) -> str:
"""The string description of the pipeline."""
return self._pipelineIR.description

def _write_expanded_dir(self, uri: ButlerURI, task_defs: Optional[Iterable[TaskDef]] = None) -> None:
def _write_expanded_dir(self, uri: ResourcePath, task_defs: Optional[Iterable[TaskDef]] = None) -> None:
"""Internal implementation of `write_to_uri` with ``expand=True`` and
a directory-like URI.
Parameters
----------
uri : `str` or `ButlerURI`
URI to write to; may have any scheme with `ButlerURI` write or no
scheme for a local file/directory. Should have a trailing slash
(indicating a directory-like URI).
uri : `ResourcePath`
URI to write to; may have any scheme with `ResourcePath` write
support or no scheme for a local file/directory. Should have a
trailing slash (indicating a directory-like URI).
task_defs : `Iterable` [ `TaskDef` ], optional
Output of `toExpandedPipeline`; may be passed to avoid a second
call to that method internally.
Expand Down

0 comments on commit b014171

Please sign in to comment.