Skip to content

Commit

Permalink
Merge pull request #305 from lsst/tickets/DM-13353
Browse files Browse the repository at this point in the history
DM-13353: Add support for Formatter writeRecipes
  • Loading branch information
timj committed Jun 9, 2020
2 parents d84801d + f722a2b commit dd1ad65
Show file tree
Hide file tree
Showing 14 changed files with 282 additions and 32 deletions.
6 changes: 6 additions & 0 deletions config/datastores/formatters.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# This file gives the mapping between DatasetType and the
# `lsst.daf.butler.Formatter` that handles it.
write_recipes: !include writeRecipes.yaml
default:
lsst.obs.base.fitsExposureFormatter.FitsExposureFormatter:
# default is the default recipe regardless but this demonstrates
# how to specify a default write parameter
recipe: default
TablePersistable: lsst.obs.base.fitsGenericFormatter.FitsGenericFormatter
Wcs: lsst.obs.base.fitsGenericFormatter.FitsGenericFormatter
Psf: lsst.obs.base.fitsGenericFormatter.FitsGenericFormatter
Expand Down
44 changes: 44 additions & 0 deletions config/datastores/writeRecipes.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
lsst.obs.base.fitsExposureFormatter.FitsExposureFormatter:
# No compression
noCompression: &noCompression
image: &noCompressionOptions
compression:
algorithm: NONE
scaling:
algorithm: NONE
mask:
<<: *noCompressionOptions
variance:
<<: *noCompressionOptions

# Lossless compression
lossless: &lossless
image: &losslessOptions
compression:
algorithm: GZIP_SHUFFLE
scaling:
algorithm: NONE
mask:
<<: *losslessOptions
variance:
<<: *losslessOptions

# Basic lossy (quantizing) compression
lossyBasic: &lossyBasic
image: &lossyBasicOptions
compression:
algorithm: RICE
scaling:
algorithm: STDEV_POSITIVE
maskPlanes: ["NO_DATA"]
bitpix: 32
quantizeLevel: 10.0
quantizePad: 10.0
mask:
<<: *losslessOptions
variance:
<<: *lossyBasicOptions

# Set the default
default:
<<: *lossless
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1095,7 +1095,7 @@ def pruneDatasets(self, refs: Iterable[DatasetRef], *,
self.datastore.emptyTrash()

@transactional
def ingest(self, *datasets: FileDataset, transfer: Optional[str] = None, run: Optional[str] = None,
def ingest(self, *datasets: FileDataset, transfer: Optional[str] = "auto", run: Optional[str] = None,
tags: Optional[Iterable[str]] = None,):
"""Store and register one or more datasets that already exist on disk.
Expand Down
21 changes: 21 additions & 0 deletions python/lsst/daf/butler/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,27 @@ def updateParameters(configType, config, full, toUpdate=None, toCopy=None, overw
else:
config.update(localConfig)

def toDict(self):
"""Convert a `Config` to a standalone hierarchical `dict`.
Returns
-------
d : `dict`
The standalone hierarchical `dict` with any `Config` classes
in the hierarchy converted to `dict`.
Notes
-----
This can be useful when passing a Config to some code that
expects native Python types.
"""
output = copy.deepcopy(self._data)
for k, v in output.items():
if isinstance(v, Config):
v = v.toDict()
output[k] = v
return output


class ConfigSubset(Config):
"""Config representing a subset of a more general configuration.
Expand Down
3 changes: 2 additions & 1 deletion python/lsst/daf/butler/core/configSupport.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ def clone(self, name: Optional[str] = None, dimensions: Optional[DimensionGraph]

def processLookupConfigs(config: Config, *,
allow_hierarchy: bool = False,
universe: Optional[DimensionUniverse] = None) -> Dict[LookupKey, Union[str, Dict]]:
universe: Optional[DimensionUniverse] = None) -> Dict[LookupKey,
Union[str, Dict[str, Any]]]:
"""Process sections of configuration relating to lookups by dataset type
name, storage class name, dimensions, or values of dimensions.
Expand Down
125 changes: 104 additions & 21 deletions python/lsst/daf/butler/core/formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,9 @@ class Formatter(metaclass=ABCMeta):
how a dataset is serialized. `None` indicates that no parameters are
supported."""

extension: Optional[str] = None
"""File extension default provided by this formatter."""

def __init__(self, fileDescriptor: FileDescriptor, dataId: DataCoordinate = None,
writeParameters: Optional[Dict[str, Any]] = None):
writeParameters: Optional[Dict[str, Any]] = None,
writeRecipes: Optional[Dict[str, Any]] = None):
if not isinstance(fileDescriptor, FileDescriptor):
raise TypeError("File descriptor must be a FileDescriptor")
self._fileDescriptor = fileDescriptor
Expand All @@ -101,6 +99,7 @@ def __init__(self, fileDescriptor: FileDescriptor, dataId: DataCoordinate = None
raise ValueError(f"This formatter does not accept parameter{s} {unknownStr}")

self._writeParameters = writeParameters
self._writeRecipes = self.validateWriteRecipes(writeRecipes)

def __str__(self) -> str:
return f"{self.name()}@{self.fileDescriptor.location.path}"
Expand All @@ -120,11 +119,45 @@ def dataId(self) -> Optional[DataCoordinate]:
return self._dataId

@property
def writeParameters(self) -> Mapping:
def writeParameters(self) -> Mapping[str, Any]:
"""Parameters to use when writing out datasets."""
if self._writeParameters is not None:
return self._writeParameters
return {}

@property
def writeRecipes(self) -> Mapping[str, Any]:
"""Detailed write Recipes indexed by recipe name."""
if self._writeRecipes is not None:
return self._writeRecipes
return {}

@classmethod
def validateWriteRecipes(cls, recipes: Optional[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]:
"""Validate supplied recipes for this formatter.
The recipes are supplemented with default values where appropriate.
Parameters
----------
recipes : `dict`
Recipes to validate.
Returns
-------
validated : `dict`
Validated recipes.
Raises
------
RuntimeError
Raised if validation fails. The default implementation raises
if any recipes are given.
"""
if recipes:
raise RuntimeError(f"This formatter does not understand these writeRecipes: {recipes}")
return recipes

@classmethod
def name(cls) -> str:
"""Returns the fully qualified name of the formatter.
Expand Down Expand Up @@ -219,17 +252,28 @@ def makeUpdatedLocation(cls, location: Location) -> Location:
Returns
-------
updated : `Location`
The updated location with a new file extension applied.
A new `Location` with a new file extension applied.
Raises
------
NotImplementedError
Raised if there is no ``extension`` attribute associated with
this formatter.
Notes
-----
This method is available to all Formatters but might not be
implemented by all formatters. It requires that a formatter set
an ``extension`` attribute containing the file extension used when
writing files. If ``extension`` is `None` the supplied file will
not be updated. Not all formatters write files so this is not
defined in the base class.
"""
location = copy.deepcopy(location)
try:
location.updateExtension(cls.extension)
# We are deliberately allowing extension to be undefined by
# default in the base class and mypy complains.
location.updateExtension(cls.extension) # type:ignore
except AttributeError:
raise NotImplementedError("No file extension registered with this formatter") from None
return location
Expand Down Expand Up @@ -316,6 +360,9 @@ class FormatterFactory:
defaultKey = LookupKey("default")
"""Configuration key associated with default write parameter settings."""

writeRecipesKey = LookupKey("write_recipes")
"""Configuration key associated with write recipes."""

def __init__(self) -> None:
self._mappingFactory = MappingFactory(Formatter)

Expand Down Expand Up @@ -376,24 +423,49 @@ def registerFormatters(self, config: Config, *, universe: DimensionUniverse) ->
.. code-block:: yaml
formatters:
default:
lsst.daf.butler.formatters.example.ExampleFormatter:
max: 10
min: 2
comment: Default comment
calexp: lsst.daf.butler.formatters.example.ExampleFormatter
coadd:
formatter: lsst.daf.butler.formatters.example.ExampleFormatter
parameters:
max: 5
formatters:
default:
lsst.daf.butler.formatters.example.ExampleFormatter:
max: 10
min: 2
comment: Default comment
calexp: lsst.daf.butler.formatters.example.ExampleFormatter
coadd:
formatter: lsst.daf.butler.formatters.example.ExampleFormatter
parameters:
max: 5
Any time an ``ExampleFormatter`` is constructed it will use those
parameters. If an explicit entry later in the configuration specifies
a different set of parameters, the two will be merged with the later
entry taking priority. In the example above ``calexp`` will use
the default parameters but ``coadd`` will override the value for
``max``.
Formatter configuration can also include a special section describing
collections of write parameters that can be accessed through a
simple label. This allows common collections of options to be
specified in one place in the configuration and reused later.
The ``write_recipes`` section is indexed by Formatter class name
and each key is the label to associate with the parameters.
.. code-block:: yaml
formatters:
write_recipes:
lsst.obs.base.fitsExposureFormatter.FixExposureFormatter:
lossless:
...
noCompression:
...
By convention a formatter that uses write recipes will support a
``recipe`` write parameter that will refer to a recipe name in
the ``write_recipes`` component. The `Formatter` will be constructed
in the `FormatterFactory` with all the relevant recipes and
will not attempt to filter by looking at ``writeParameters`` in
advance. See the specific formatter documentation for details on
acceptable recipe options.
"""
allowed_keys = {"formatter", "parameters"}

Expand All @@ -405,10 +477,19 @@ def registerFormatters(self, config: Config, *, universe: DimensionUniverse) ->
raise RuntimeError("Default formatter parameters in config can not be a single string"
f" (got: {type(defaultParameters)})")

# Extract any global write recipes -- these are indexed by
# Formatter class name.
writeRecipes = contents.get(self.writeRecipesKey, {})
if isinstance(writeRecipes, str):
raise RuntimeError(f"The formatters.{self.writeRecipesKey} section must refer to a dict"
f" not '{writeRecipes}'")

for key, f in contents.items():
# default is handled in a special way
if key == self.defaultKey:
continue
if key == self.writeRecipesKey:
continue

# Can be a str or a dict.
specificWriteParameters = {}
Expand All @@ -428,13 +509,15 @@ def registerFormatters(self, config: Config, *, universe: DimensionUniverse) ->
raise ValueError(f"Formatter for key {key} has unexpected value: '{f}'")

# Apply any default parameters for this formatter
writeParameters = defaultParameters.get(formatter, {})
writeParameters = copy.deepcopy(defaultParameters.get(formatter, {}))
writeParameters.update(specificWriteParameters)

kwargs: Dict[str, Any] = {}
if writeParameters:
# Need to coerce Config to dict
kwargs["writeParameters"] = dict(writeParameters)
kwargs["writeParameters"] = writeParameters

if formatter in writeRecipes:
kwargs["writeRecipes"] = writeRecipes[formatter]

self.registerFormatter(key, formatter, **kwargs)

Expand Down
20 changes: 19 additions & 1 deletion python/lsst/daf/butler/core/location.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,10 @@ def updateExtension(self, ext: Optional[str]) -> None:
if ext is None:
return

path, _ = os.path.splitext(self.pathInStore)
if not self._datastoreRootUri.scheme:
path, _ = os.path.splitext(self.pathInStore)
else:
path, _ = posixpath.splitext(self.pathInStore)

# Ensure that we have a leading "." on file extension (and we do not
# try to modify the empty string)
Expand All @@ -550,6 +553,21 @@ def updateExtension(self, ext: Optional[str]) -> None:

self._path = path + ext

def getExtension(self) -> str:
"""Return the file extension associated with this location.
Returns
-------
ext : `str`
The file extension (including the ``.``). Can be empty string
if there is no file extension.
"""
if not self._datastoreRootUri.scheme:
_, ext = os.path.splitext(self.pathInStore)
else:
_, ext = posixpath.splitext(self.pathInStore)
return ext


class LocationFactory:
"""Factory for `Location` instances.
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/core/mappingFactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def getFromRegistryWithMatch(self, targetClasses: Iterable[Any], *args: Any,
# Simplest to use Config for this
config_kwargs = Config(registry_kwargs)
config_kwargs.update(kwargs)
merged_kwargs = dict(config_kwargs)
merged_kwargs = config_kwargs.toDict()

return key, cls(*args, **merged_kwargs)

Expand Down
3 changes: 3 additions & 0 deletions python/lsst/daf/butler/datastores/fileLikeDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,9 @@ def _prepare_for_put(self, inMemoryDataset: Any, ref: DatasetRef) -> Tuple[Locat
except KeyError as e:
raise DatasetTypeNotSupportedError(f"Unable to find formatter for {ref}") from e

# Now that we know the formatter, update the location
location = formatter.makeUpdatedLocation(location)

return location, formatter

@abstractmethod
Expand Down
9 changes: 6 additions & 3 deletions python/lsst/daf/butler/datastores/s3Datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,11 @@ def _read_artifact_into_memory(self, getInfo: DatastoreFileGetInformation,
result = formatter.fromBytes(serializedDataset,
component=getInfo.component if isComponent else None)
except NotImplementedError:
with tempfile.NamedTemporaryFile(suffix=formatter.extension) as tmpFile:
# formatter might not always have an extension so mypy complains
# We can either ignore the complaint or use a temporary location
tmpLoc = Location(".", "temp")
tmpLoc = formatter.makeUpdatedLocation(tmpLoc)
with tempfile.NamedTemporaryFile(suffix=tmpLoc.getExtension()) as tmpFile:
tmpFile.write(serializedDataset)
# Flush the write. Do not close the file because that
# will delete it.
Expand All @@ -196,7 +200,6 @@ def _write_in_memory_to_artifact(self, inMemoryDataset: Any, ref: DatasetRef) ->
# `Keys` instead only look like directories, but are not. We check if
# an *exact* full key already exists before writing instead. The insert
# key operation is equivalent to creating the dir and the file.
location.updateExtension(formatter.extension)
if s3CheckFileExists(location, client=self.client,)[0]:
raise FileExistsError(f"Cannot write file for ref {ref} as "
f"output file {location.uri} exists.")
Expand All @@ -209,7 +212,7 @@ def _write_in_memory_to_artifact(self, inMemoryDataset: Any, ref: DatasetRef) ->
Body=serializedDataset)
log.debug("Wrote file directly to %s", location.uri)
except NotImplementedError:
with tempfile.NamedTemporaryFile(suffix=formatter.extension) as tmpFile:
with tempfile.NamedTemporaryFile(suffix=location.getExtension()) as tmpFile:
formatter._fileDescriptor.location = Location(*os.path.split(tmpFile.name))
formatter.write(inMemoryDataset)
with open(tmpFile.name, 'rb') as f:
Expand Down

0 comments on commit dd1ad65

Please sign in to comment.