Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-35494 Allow execution butler creation to transfer datasets #261

Merged
merged 1 commit into from
Jul 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/changes/DM-35494.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* Added support for transferring files into execution butler.
51 changes: 46 additions & 5 deletions python/lsst/pipe/base/executionButlerBuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,31 @@ def _export(
return yamlBuffer


def _setupNewButler(butler: Butler, outputLocation: ResourcePath, dirExists: bool) -> Butler:
def _setupNewButler(
butler: Butler,
outputLocation: ResourcePath,
dirExists: bool,
datastoreRoot: Optional[ResourcePath] = None,
) -> Butler:
"""Set up the execution butler

Parameters
----------
butler : `Butler`
The original butler, upon which the execution butler is based.
outputLocation : `ResourcePath`
Location of the execution butler.
dirExists : `bool`
Does the ``outputLocation`` exist, and if so, should it be clobbered?
datastoreRoot : `ResourcePath`, optional
Path for the execution butler datastore. If not specified, then the
original butler's datastore will be used.

Returns
-------
execution_butler : `Butler`
Execution butler.
"""
# Set up the new butler object at the specified location
if dirExists:
# Remove the existing table, if the code got this far and this exists
Expand All @@ -252,7 +276,9 @@ def _setupNewButler(butler: Butler, outputLocation: ResourcePath, dirExists: boo

# record the current root of the datastore if it is specified relative
# to the butler root
if config.get(("datastore", "root")) == BUTLER_ROOT_TAG and butler._config.configDir is not None:
if datastoreRoot is not None:
config["datastore", "root"] = datastoreRoot.geturl()
elif config.get(("datastore", "root")) == BUTLER_ROOT_TAG and butler._config.configDir is not None:
config["datastore", "root"] = butler._config.configDir.geturl()
config["datastore", "trust_get_request"] = True

Expand Down Expand Up @@ -325,6 +351,8 @@ def buildExecutionButler(
clobber: bool = False,
butlerModifier: Optional[Callable[[Butler], Butler]] = None,
collections: Optional[Iterable[str]] = None,
datastoreRoot: Optional[ResourcePathExpression] = None,
transfer: str = "auto",
) -> Butler:
r"""buildExecutionButler is a function that is responsible for exporting
input `QuantumGraphs` into a new minimal `~lsst.daf.butler.Butler` which
Expand All @@ -343,7 +371,7 @@ def buildExecutionButler(
graph : `QuantumGraph`
Graph containing nodes that are to be exported into an execution
butler
outputLocation : convertible to `ResourcePath
outputLocation : convertible to `ResourcePath`
URI Location at which the execution butler is to be exported. May be
specified as a string or a `ResourcePath` instance.
run : `str`, optional
Expand All @@ -368,6 +396,14 @@ def buildExecutionButler(
`~lsst.daf.butler.Butler` when creating the execution butler. If not
supplied the `~lsst.daf.butler.Butler`\ 's `~lsst.daf.butler.Registry`
default collections will be used.
datastoreRoot : convertible to `ResourcePath`, Optional
Root directory for datastore of execution butler. If `None`, then the
original butler's datastore will be used.
transfer : `str`
How (and whether) the input datasets should be added to the execution
butler datastore. This should be a ``transfer`` string recognized by
:func:`lsst.resources.ResourcePath.transfer_from`.
``"auto"`` means to ``"copy"`` if the ``datastoreRoot`` is specified.

Returns
-------
Expand All @@ -384,6 +420,8 @@ def buildExecutionButler(
"""
# We know this must refer to a directory.
outputLocation = ResourcePath(outputLocation, forceDirectory=True)
if datastoreRoot is not None:
datastoreRoot = ResourcePath(datastoreRoot, forceDirectory=True)

# Do this first to Fail Fast if the output exists
if (dirExists := outputLocation.exists()) and not clobber:
Expand All @@ -401,15 +439,18 @@ def buildExecutionButler(
exports, inserts = _accumulate(graph, dataset_types)
yamlBuffer = _export(butler, collections, exports, inserts)

newButler = _setupNewButler(butler, outputLocation, dirExists)
newButler = _setupNewButler(butler, outputLocation, dirExists, datastoreRoot)

newButler = _import(yamlBuffer, newButler, inserts, run, butlerModifier)

if transfer == "auto" and datastoreRoot is not None:
transfer = "copy"

# Transfer the existing datasets directly from the source butler.
newButler.transfer_from(
butler,
exports,
transfer="auto", # No transfers should be happening.
transfer=transfer,
skip_missing=False, # Everything should exist.
register_dataset_types=True,
)
Expand Down