Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-13353: Add support for Formatter writeRecipes #305

Merged
merged 16 commits into from
Jun 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions config/datastores/formatters.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# This file gives the mapping between DatasetType and the
# `lsst.daf.butler.Formatter` that handles it.
write_recipes: !include writeRecipes.yaml
default:
lsst.obs.base.fitsExposureFormatter.FitsExposureFormatter:
# default is the default recipe regardless but this demonstrates
# how to specify a default write parameter
recipe: default
timj marked this conversation as resolved.
Show resolved Hide resolved
TablePersistable: lsst.obs.base.fitsGenericFormatter.FitsGenericFormatter
Wcs: lsst.obs.base.fitsGenericFormatter.FitsGenericFormatter
Psf: lsst.obs.base.fitsGenericFormatter.FitsGenericFormatter
Expand Down
44 changes: 44 additions & 0 deletions config/datastores/writeRecipes.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
lsst.obs.base.fitsExposureFormatter.FitsExposureFormatter:
# No compression
noCompression: &noCompression
image: &noCompressionOptions
compression:
algorithm: NONE
scaling:
algorithm: NONE
mask:
<<: *noCompressionOptions
variance:
<<: *noCompressionOptions

# Lossless compression
lossless: &lossless
image: &losslessOptions
compression:
algorithm: GZIP_SHUFFLE
scaling:
algorithm: NONE
mask:
<<: *losslessOptions
variance:
<<: *losslessOptions

# Basic lossy (quantizing) compression
lossyBasic: &lossyBasic
image: &lossyBasicOptions
compression:
algorithm: RICE
scaling:
algorithm: STDEV_POSITIVE
maskPlanes: ["NO_DATA"]
bitpix: 32
quantizeLevel: 10.0
quantizePad: 10.0
mask:
<<: *losslessOptions
variance:
<<: *lossyBasicOptions

# Set the default
default:
<<: *lossless
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1095,7 +1095,7 @@ def pruneDatasets(self, refs: Iterable[DatasetRef], *,
self.datastore.emptyTrash()

@transactional
def ingest(self, *datasets: FileDataset, transfer: Optional[str] = None, run: Optional[str] = None,
def ingest(self, *datasets: FileDataset, transfer: Optional[str] = "auto", run: Optional[str] = None,
tags: Optional[Iterable[str]] = None,):
"""Store and register one or more datasets that already exist on disk.

Expand Down
21 changes: 21 additions & 0 deletions python/lsst/daf/butler/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,27 @@ def updateParameters(configType, config, full, toUpdate=None, toCopy=None, overw
else:
config.update(localConfig)

def toDict(self):
"""Convert a `Config` to a standalone hierarchical `dict`.

Returns
-------
d : `dict`
The standalone hierarchical `dict` with any `Config` classes
in the hierarchy converted to `dict`.

Notes
-----
This can be useful when passing a Config to some code that
expects native Python types.
"""
output = copy.deepcopy(self._data)
for k, v in output.items():
if isinstance(v, Config):
v = v.toDict()
output[k] = v
return output


class ConfigSubset(Config):
"""Config representing a subset of a more general configuration.
Expand Down
3 changes: 2 additions & 1 deletion python/lsst/daf/butler/core/configSupport.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ def clone(self, name: Optional[str] = None, dimensions: Optional[DimensionGraph]

def processLookupConfigs(config: Config, *,
allow_hierarchy: bool = False,
universe: Optional[DimensionUniverse] = None) -> Dict[LookupKey, Union[str, Dict]]:
universe: Optional[DimensionUniverse] = None) -> Dict[LookupKey,
Union[str, Dict[str, Any]]]:
"""Process sections of configuration relating to lookups by dataset type
name, storage class name, dimensions, or values of dimensions.

Expand Down
125 changes: 104 additions & 21 deletions python/lsst/daf/butler/core/formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,9 @@ class Formatter(metaclass=ABCMeta):
how a dataset is serialized. `None` indicates that no parameters are
supported."""

extension: Optional[str] = None
"""File extension default provided by this formatter."""

def __init__(self, fileDescriptor: FileDescriptor, dataId: DataCoordinate = None,
writeParameters: Optional[Dict[str, Any]] = None):
writeParameters: Optional[Dict[str, Any]] = None,
writeRecipes: Optional[Dict[str, Any]] = None):
if not isinstance(fileDescriptor, FileDescriptor):
raise TypeError("File descriptor must be a FileDescriptor")
self._fileDescriptor = fileDescriptor
Expand All @@ -101,6 +99,7 @@ def __init__(self, fileDescriptor: FileDescriptor, dataId: DataCoordinate = None
raise ValueError(f"This formatter does not accept parameter{s} {unknownStr}")

self._writeParameters = writeParameters
self._writeRecipes = self.validateWriteRecipes(writeRecipes)

def __str__(self) -> str:
return f"{self.name()}@{self.fileDescriptor.location.path}"
Expand All @@ -120,11 +119,45 @@ def dataId(self) -> Optional[DataCoordinate]:
return self._dataId

@property
def writeParameters(self) -> Mapping:
def writeParameters(self) -> Mapping[str, Any]:
"""Parameters to use when writing out datasets."""
if self._writeParameters is not None:
return self._writeParameters
return {}

@property
def writeRecipes(self) -> Mapping[str, Any]:
"""Detailed write Recipes indexed by recipe name."""
if self._writeRecipes is not None:
return self._writeRecipes
return {}

@classmethod
def validateWriteRecipes(cls, recipes: Optional[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]:
"""Validate supplied recipes for this formatter.

The recipes are supplemented with default values where appropriate.

Parameters
----------
recipes : `dict`
Recipes to validate.

Returns
-------
validated : `dict`
Validated recipes.

Raises
------
RuntimeError
Raised if validation fails. The default implementation raises
if any recipes are given.
"""
if recipes:
raise RuntimeError(f"This formatter does not understand these writeRecipes: {recipes}")
return recipes

@classmethod
def name(cls) -> str:
"""Returns the fully qualified name of the formatter.
Expand Down Expand Up @@ -219,17 +252,28 @@ def makeUpdatedLocation(cls, location: Location) -> Location:
Returns
-------
updated : `Location`
The updated location with a new file extension applied.
A new `Location` with a new file extension applied.

Raises
------
NotImplementedError
Raised if there is no ``extension`` attribute associated with
this formatter.

Notes
-----
This method is available to all Formatters but might not be
implemented by all formatters. It requires that a formatter set
an ``extension`` attribute containing the file extension used when
writing files. If ``extension`` is `None` the supplied file will
not be updated. Not all formatters write files so this is not
defined in the base class.
"""
location = copy.deepcopy(location)
try:
location.updateExtension(cls.extension)
# We are deliberately allowing extension to be undefined by
# default in the base class and mypy complains.
location.updateExtension(cls.extension) # type:ignore
except AttributeError:
raise NotImplementedError("No file extension registered with this formatter") from None
return location
Expand Down Expand Up @@ -316,6 +360,9 @@ class FormatterFactory:
defaultKey = LookupKey("default")
"""Configuration key associated with default write parameter settings."""

writeRecipesKey = LookupKey("write_recipes")
"""Configuration key associated with write recipes."""

def __init__(self) -> None:
self._mappingFactory = MappingFactory(Formatter)

Expand Down Expand Up @@ -376,24 +423,49 @@ def registerFormatters(self, config: Config, *, universe: DimensionUniverse) ->

.. code-block:: yaml

formatters:
default:
lsst.daf.butler.formatters.example.ExampleFormatter:
max: 10
min: 2
comment: Default comment
calexp: lsst.daf.butler.formatters.example.ExampleFormatter
coadd:
formatter: lsst.daf.butler.formatters.example.ExampleFormatter
parameters:
max: 5
formatters:
default:
lsst.daf.butler.formatters.example.ExampleFormatter:
max: 10
min: 2
comment: Default comment
calexp: lsst.daf.butler.formatters.example.ExampleFormatter
coadd:
formatter: lsst.daf.butler.formatters.example.ExampleFormatter
parameters:
max: 5

Any time an ``ExampleFormatter`` is constructed it will use those
parameters. If an explicit entry later in the configuration specifies
a different set of parameters, the two will be merged with the later
entry taking priority. In the example above ``calexp`` will use
the default parameters but ``coadd`` will override the value for
``max``.

Formatter configuration can also include a special section describing
collections of write parameters that can be accessed through a
simple label. This allows common collections of options to be
specified in one place in the configuration and reused later.
The ``write_recipes`` section is indexed by Formatter class name
and each key is the label to associate with the parameters.

.. code-block:: yaml

formatters:
write_recipes:
lsst.obs.base.fitsExposureFormatter.FixExposureFormatter:
lossless:
...
noCompression:
...

By convention a formatter that uses write recipes will support a
``recipe`` write parameter that will refer to a recipe name in
the ``write_recipes`` component. The `Formatter` will be constructed
in the `FormatterFactory` with all the relevant recipes and
will not attempt to filter by looking at ``writeParameters`` in
advance. See the specific formatter documentation for details on
acceptable recipe options.
"""
allowed_keys = {"formatter", "parameters"}

Expand All @@ -405,10 +477,19 @@ def registerFormatters(self, config: Config, *, universe: DimensionUniverse) ->
raise RuntimeError("Default formatter parameters in config can not be a single string"
f" (got: {type(defaultParameters)})")

# Extract any global write recipes -- these are indexed by
# Formatter class name.
writeRecipes = contents.get(self.writeRecipesKey, {})
if isinstance(writeRecipes, str):
raise RuntimeError(f"The formatters.{self.writeRecipesKey} section must refer to a dict"
f" not '{writeRecipes}'")

for key, f in contents.items():
# default is handled in a special way
if key == self.defaultKey:
continue
if key == self.writeRecipesKey:
continue

# Can be a str or a dict.
specificWriteParameters = {}
Expand All @@ -428,13 +509,15 @@ def registerFormatters(self, config: Config, *, universe: DimensionUniverse) ->
raise ValueError(f"Formatter for key {key} has unexpected value: '{f}'")

# Apply any default parameters for this formatter
writeParameters = defaultParameters.get(formatter, {})
writeParameters = copy.deepcopy(defaultParameters.get(formatter, {}))
writeParameters.update(specificWriteParameters)

kwargs: Dict[str, Any] = {}
if writeParameters:
# Need to coerce Config to dict
kwargs["writeParameters"] = dict(writeParameters)
kwargs["writeParameters"] = writeParameters

if formatter in writeRecipes:
kwargs["writeRecipes"] = writeRecipes[formatter]

self.registerFormatter(key, formatter, **kwargs)

Expand Down
20 changes: 19 additions & 1 deletion python/lsst/daf/butler/core/location.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,10 @@ def updateExtension(self, ext: Optional[str]) -> None:
if ext is None:
return

path, _ = os.path.splitext(self.pathInStore)
if not self._datastoreRootUri.scheme:
path, _ = os.path.splitext(self.pathInStore)
else:
path, _ = posixpath.splitext(self.pathInStore)

# Ensure that we have a leading "." on file extension (and we do not
# try to modify the empty string)
Expand All @@ -550,6 +553,21 @@ def updateExtension(self, ext: Optional[str]) -> None:

self._path = path + ext

def getExtension(self) -> str:
"""Return the file extension associated with this location.

Returns
-------
ext : `str`
The file extension (including the ``.``). Can be empty string
if there is no file extension.
"""
if not self._datastoreRootUri.scheme:
_, ext = os.path.splitext(self.pathInStore)
else:
_, ext = posixpath.splitext(self.pathInStore)
return ext


class LocationFactory:
"""Factory for `Location` instances.
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/core/mappingFactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def getFromRegistryWithMatch(self, targetClasses: Iterable[Any], *args: Any,
# Simplest to use Config for this
config_kwargs = Config(registry_kwargs)
config_kwargs.update(kwargs)
merged_kwargs = dict(config_kwargs)
merged_kwargs = config_kwargs.toDict()

return key, cls(*args, **merged_kwargs)

Expand Down
3 changes: 3 additions & 0 deletions python/lsst/daf/butler/datastores/fileLikeDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,9 @@ def _prepare_for_put(self, inMemoryDataset: Any, ref: DatasetRef) -> Tuple[Locat
except KeyError as e:
raise DatasetTypeNotSupportedError(f"Unable to find formatter for {ref}") from e

# Now that we know the formatter, update the location
location = formatter.makeUpdatedLocation(location)

return location, formatter

@abstractmethod
Expand Down
9 changes: 6 additions & 3 deletions python/lsst/daf/butler/datastores/s3Datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,11 @@ def _read_artifact_into_memory(self, getInfo: DatastoreFileGetInformation,
result = formatter.fromBytes(serializedDataset,
component=getInfo.component if isComponent else None)
except NotImplementedError:
with tempfile.NamedTemporaryFile(suffix=formatter.extension) as tmpFile:
# formatter might not always have an extension so mypy complains
# We can either ignore the complaint or use a temporary location
tmpLoc = Location(".", "temp")
tmpLoc = formatter.makeUpdatedLocation(tmpLoc)
with tempfile.NamedTemporaryFile(suffix=tmpLoc.getExtension()) as tmpFile:
tmpFile.write(serializedDataset)
# Flush the write. Do not close the file because that
# will delete it.
Expand All @@ -196,7 +200,6 @@ def _write_in_memory_to_artifact(self, inMemoryDataset: Any, ref: DatasetRef) ->
# `Keys` instead only look like directories, but are not. We check if
# an *exact* full key already exists before writing instead. The insert
# key operation is equivalent to creating the dir and the file.
location.updateExtension(formatter.extension)
if s3CheckFileExists(location, client=self.client,)[0]:
raise FileExistsError(f"Cannot write file for ref {ref} as "
f"output file {location.uri} exists.")
Expand All @@ -209,7 +212,7 @@ def _write_in_memory_to_artifact(self, inMemoryDataset: Any, ref: DatasetRef) ->
Body=serializedDataset)
log.debug("Wrote file directly to %s", location.uri)
except NotImplementedError:
with tempfile.NamedTemporaryFile(suffix=formatter.extension) as tmpFile:
with tempfile.NamedTemporaryFile(suffix=location.getExtension()) as tmpFile:
formatter._fileDescriptor.location = Location(*os.path.split(tmpFile.name))
formatter.write(inMemoryDataset)
with open(tmpFile.name, 'rb') as f:
Expand Down