Skip to content

Commit

Permalink
Merge branch 'tickets/DM-28649'
Browse files Browse the repository at this point in the history
  • Loading branch information
natelust committed May 18, 2021
2 parents ddd4cb5 + 1caf0e5 commit bccb7aa
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 40 deletions.
2 changes: 1 addition & 1 deletion python/lsst/pipe/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@
from .butlerQuantumContext import *
from . import connectionTypes
from . import pipelineIR
from .makeLightWeightButler import *
from .executionButlerBuilder import *
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,17 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import annotations

__all__ = ("buildLightweightButler", )
__all__ = ("buildExecutionButler", )

import io

from collections import defaultdict
from typing import Callable, DefaultDict, Mapping, Optional, Set, Tuple, Iterable, List, Union
import os
import shutil

from lsst.daf.butler import (DatasetRef, DatasetType, Butler, ButlerConfig, Registry, DataCoordinate,
RegistryConfig)
from lsst.daf.butler import (DatasetRef, DatasetType, Butler, DataCoordinate, ButlerURI, Config)
from lsst.daf.butler.core.utils import getClassOf
from lsst.daf.butler.transfers import RepoExportContext
from lsst.daf.butler.core.repoRelocation import BUTLER_ROOT_TAG


from . import QuantumGraph, QuantumNode
Expand All @@ -41,11 +39,11 @@


def _accumulate(graph: QuantumGraph) -> Tuple[Set[DatasetRef], DataSetTypeMap]:
# accumulate the dataIds that will be transferred to the lightweight
# accumulate the dataIds that will be transferred to the execution
# registry

# exports holds all the existing data that will be migrated to the
# lightweight butler
# execution butler
exports: Set[DatasetRef] = set()

# inserts is the mapping of DatasetType to dataIds for what is to be
Expand Down Expand Up @@ -126,26 +124,31 @@ def _export(butler: Butler, collections: Optional[Iterable[str]], exports: Set[D
return yamlBuffer


def _setupNewButler(butler: Butler, outputLocation: str, dirExists: bool) -> Butler:
def _setupNewButler(butler: Butler, outputLocation: ButlerURI, dirExists: bool) -> Butler:
# Set up the new butler object at the specified location
if dirExists:
if os.path.isfile(outputLocation):
os.remove(outputLocation)
else:
shutil.rmtree(outputLocation)
os.mkdir(outputLocation)
# Remove the existing table, if the code got this far and this exists
# clobber must be true
executionRegistry = outputLocation.join("gen3.sqlite3")
if executionRegistry.exists():
executionRegistry.remove()
else:
outputLocation.mkdir()

# Copy the existing butler config, modifying the location of the
# registry to the specified location.
# Preserve the root path from the existing butler so things like
# file data stores continue to look at the old location.
config = ButlerConfig(butler._config)
config["registry", "db"] = f"sqlite:///{outputLocation}/gen3.sqlite3"
config["root"] = butler._config.configDir.ospath
config = Config(butler._config)
config["root"] = outputLocation.geturl()
config["registry", "db"] = "sqlite:///<butlerRoot>/gen3.sqlite3"
# record the current root of the datastore if it is specified relative
# to the butler root
if config.get(("datastore", "root")) == BUTLER_ROOT_TAG:
config["datastore", "root"] = butler._config.configDir.geturl()
config["datastore", "trust_get_request"] = True

# Create the new registry which will create and populate the sqlite
# file.
Registry.createFromConfig(RegistryConfig(config))
config = Butler.makeRepo(root=outputLocation, config=config, overwrite=True, forceConfigRoot=False)

# Return a newly created butler
return Butler(config, writeable=True)
Expand Down Expand Up @@ -177,16 +180,16 @@ def _import(yamlBuffer: io.StringIO,
return newButler


def buildLightweightButler(butler: Butler,
graph: QuantumGraph,
outputLocation: str,
run: str,
*,
clobber: bool = False,
butlerModifier: Optional[Callable[[Butler], Butler]] = None,
collections: Optional[Iterable[str]] = None
) -> None:
r"""buildLightweightButler is a function that is responsible for exporting
def buildExecutionButler(butler: Butler,
graph: QuantumGraph,
outputLocation: Union[str, ButlerURI],
run: str,
*,
clobber: bool = False,
butlerModifier: Optional[Callable[[Butler], Butler]] = None,
collections: Optional[Iterable[str]] = None
) -> Butler:
r"""buildExecutionButler is a function that is responsible for exporting
input `QuantumGraphs` into a new minimal `~lsst.daf.butler.Butler` which
only contains datasets specified by the `QuantumGraph`. These datasets are
both those that already exist in the input `~lsst.daf.butler.Butler`, and
Expand All @@ -201,12 +204,14 @@ def buildLightweightButler(butler: Butler,
`~lsst.daf.butler.Butler` which was used to create any `QuantumGraphs`
that will be converted with this object.
graph : `QuantumGraph`
Graph containing nodes that are to be exported into a lightweight
Graph containing nodes that are to be exported into an execution
butler
outputLocation : `str`
Location at which the lightweight butler is to be exported
run : `str`
The run collection that the exported datasets are to be placed in.
outputLocation : `str` or `~lsst.daf.butler.ButlerURI`
URI Location at which the execution butler is to be exported. May be
specified as a string or a ButlerURI instance.
run : `str` optional
The run collection that the exported datasets are to be placed in. If
None, the default value in registry.defaults will be used.
clobber : `bool`, Optional
By default a butler will not be created if a file or directory
already exists at the output location. If this is set to `True`
Expand All @@ -223,24 +228,34 @@ def buildLightweightButler(butler: Butler,
things such as creating collections/runs/ etc.
collections : `~typing.Iterable` of `str`, Optional
An iterable of collection names that will be exported from the input
`~lsst.daf.butler.Butler` when creating the lightweight butler. If not
`~lsst.daf.butler.Butler` when creating the execution butler. If not
supplied the `~lsst.daf.butler.Butler`\ 's `~lsst.daf.butler.Registry`
default collections will be used.
Returns
-------
executionButler : `lsst.daf.butler.Butler`
An instance of the newly created execution butler
Raises
------
FileExistsError
Raise if something exists in the filesystem at the specified output
Raised if something exists in the filesystem at the specified output
location and clobber is `False`
NotADirectoryError
Raised if specified output URI does not correspond to a directory
"""
outputLocation = ButlerURI(outputLocation)

# Do this first to Fail Fast if the output exists
if (dirExists := os.path.exists(outputLocation)) and not clobber:
if (dirExists := outputLocation.exists()) and not clobber:
raise FileExistsError("Cannot create a butler at specified location, location exists")
if not outputLocation.isdir():
raise NotADirectoryError("The specified output URI does not appear to correspond to a directory")

exports, inserts = _accumulate(graph)
yamlBuffer = _export(butler, collections, exports)

newButler = _setupNewButler(butler, outputLocation, dirExists)

newButler = _import(yamlBuffer, newButler, inserts, run, butlerModifier)
newButler._config.dumpToUri(f"{outputLocation}/butler.yaml")
return _import(yamlBuffer, newButler, inserts, run, butlerModifier)

0 comments on commit bccb7aa

Please sign in to comment.