Skip to content

Commit

Permalink
executionButler: add option to transfer files
Browse files Browse the repository at this point in the history
In some instances, it is helpful/important to transfer files
into the execution butler (e.g., original butler datastore is
on a slow disk). Added an option to buildExecutionButler to
allow transfer of files into a new datastore.
  • Loading branch information
PaulPrice committed Jul 14, 2022
1 parent 835a13a commit 1b9a457
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 5 deletions.
1 change: 1 addition & 0 deletions doc/changes/DM-35494.feature.rst
@@ -0,0 +1 @@
* Added support for transferring files into execution butler.
51 changes: 46 additions & 5 deletions python/lsst/pipe/base/executionButlerBuilder.py
Expand Up @@ -227,7 +227,31 @@ def _export(
return yamlBuffer


def _setupNewButler(butler: Butler, outputLocation: ResourcePath, dirExists: bool) -> Butler:
def _setupNewButler(
butler: Butler,
outputLocation: ResourcePath,
dirExists: bool,
datastoreRoot: Optional[ResourcePath] = None,
) -> Butler:
"""Set up the execution butler
Parameters
----------
butler : `Butler`
The original butler, upon which the execution butler is based.
outputLocation : `ResourcePath`
Location of the execution butler.
dirExists : `bool`
Does the ``outputLocation`` exist, and if so, should it be clobbered?
datastoreRoot : `ResourcePath`, optional
Path for the execution butler datastore. If not specified, then the
original butler's datastore will be used.
Returns
-------
execution_butler : `Butler`
Execution butler.
"""
# Set up the new butler object at the specified location
if dirExists:
# Remove the existing table, if the code got this far and this exists
Expand All @@ -252,7 +276,9 @@ def _setupNewButler(butler: Butler, outputLocation: ResourcePath, dirExists: boo

# record the current root of the datastore if it is specified relative
# to the butler root
if config.get(("datastore", "root")) == BUTLER_ROOT_TAG and butler._config.configDir is not None:
if datastoreRoot is not None:
config["datastore", "root"] = datastoreRoot.geturl()
elif config.get(("datastore", "root")) == BUTLER_ROOT_TAG and butler._config.configDir is not None:
config["datastore", "root"] = butler._config.configDir.geturl()
config["datastore", "trust_get_request"] = True

Expand Down Expand Up @@ -325,6 +351,8 @@ def buildExecutionButler(
clobber: bool = False,
butlerModifier: Optional[Callable[[Butler], Butler]] = None,
collections: Optional[Iterable[str]] = None,
datastoreRoot: Optional[ResourcePathExpression] = None,
transfer: str = "auto",
) -> Butler:
r"""buildExecutionButler is a function that is responsible for exporting
input `QuantumGraphs` into a new minimal `~lsst.daf.butler.Butler` which
Expand All @@ -343,7 +371,7 @@ def buildExecutionButler(
graph : `QuantumGraph`
Graph containing nodes that are to be exported into an execution
butler
outputLocation : convertible to `ResourcePath
outputLocation : convertible to `ResourcePath`
URI Location at which the execution butler is to be exported. May be
specified as a string or a `ResourcePath` instance.
run : `str`, optional
Expand All @@ -368,6 +396,14 @@ def buildExecutionButler(
`~lsst.daf.butler.Butler` when creating the execution butler. If not
supplied the `~lsst.daf.butler.Butler`\ 's `~lsst.daf.butler.Registry`
default collections will be used.
datastoreRoot : convertible to `ResourcePath`, Optional
Root directory for datastore of execution butler. If `None`, then the
original butler's datastore will be used.
transfer : `str`
How (and whether) the input datasets should be added to the execution
butler datastore. This should be a ``transfer`` string recognized by
:func:`lsst.resources.ResourcePath.transfer_from`.
``"auto"`` means to ``"copy"`` if the ``datastoreRoot`` is specified.
Returns
-------
Expand All @@ -384,6 +420,8 @@ def buildExecutionButler(
"""
# We know this must refer to a directory.
outputLocation = ResourcePath(outputLocation, forceDirectory=True)
if datastoreRoot is not None:
datastoreRoot = ResourcePath(datastoreRoot, forceDirectory=True)

# Do this first to Fail Fast if the output exists
if (dirExists := outputLocation.exists()) and not clobber:
Expand All @@ -401,15 +439,18 @@ def buildExecutionButler(
exports, inserts = _accumulate(graph, dataset_types)
yamlBuffer = _export(butler, collections, exports, inserts)

newButler = _setupNewButler(butler, outputLocation, dirExists)
newButler = _setupNewButler(butler, outputLocation, dirExists, datastoreRoot)

newButler = _import(yamlBuffer, newButler, inserts, run, butlerModifier)

if transfer == "auto" and datastoreRoot is not None:
transfer = "copy"

# Transfer the existing datasets directly from the source butler.
newButler.transfer_from(
butler,
exports,
transfer="auto", # No transfers should be happening.
transfer=transfer,
skip_missing=False, # Everything should exist.
register_dataset_types=True,
)
Expand Down

0 comments on commit 1b9a457

Please sign in to comment.