Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-23024: Allow multiple datasets to be associated with a single file on ingest #218

Merged
merged 13 commits into from
Jan 16, 2020
Merged
5 changes: 3 additions & 2 deletions python/lsst/daf/butler/_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -706,8 +706,9 @@ class used. `FileDataset.path` attributes may be modified to put
raise TypeError("Butler is read-only.")
# TODO: once Registry has vectorized API for addDataset, use it here.
for dataset in datasets:
dataset.ref = self.registry.addDataset(dataset.ref.datasetType, dataset.ref.dataId,
run=self.run, recursive=True)
for i, ref in enumerate(dataset.refs):
dataset.refs[i] = self.registry.addDataset(ref.datasetType, ref.dataId,
run=self.run, recursive=True)
self.datastore.ingest(*datasets, transfer=transfer)

@contextlib.contextmanager
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/core/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ def ingest(self, *datasets: FileDataset, transfer: Optional[str] = None):
in their class documentation.
"""
prepData = self._prepIngest(*datasets, transfer=transfer)
refs = {dataset.ref.id: dataset.ref for dataset in datasets}
refs = {ref.id: ref for dataset in datasets for ref in dataset.refs}
if refs.keys() != prepData.refs.keys():
unsupported = refs.keys() - prepData.refs.keys()
# Group unsupported refs by DatasetType for an informative
Expand Down
16 changes: 14 additions & 2 deletions python/lsst/daf/butler/core/formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from __future__ import annotations

__all__ = ("Formatter", "FormatterFactory")

from abc import ABCMeta, abstractmethod
import logging
import copy
from typing import ClassVar, Set, FrozenSet, Union, Optional, Dict, Any, Tuple, Type
from typing import ClassVar, Set, FrozenSet, Union, Optional, Dict, Any, Tuple, Type, TYPE_CHECKING

from .configSupport import processLookupConfigs, LookupKey
from .mappingFactory import MappingFactory
Expand All @@ -42,6 +44,10 @@
Entity = Union[DatasetType, DatasetRef, StorageClass, str]


if TYPE_CHECKING:
from .dimensions import DataCoordinate


class Formatter(metaclass=ABCMeta):
"""Interface for reading and writing Datasets with a particular
`StorageClass`.
Expand All @@ -60,10 +66,11 @@ class Formatter(metaclass=ABCMeta):
are supported (`frozenset`).
"""

def __init__(self, fileDescriptor: FileDescriptor):
def __init__(self, fileDescriptor: FileDescriptor, dataId: DataCoordinate = None):
if not isinstance(fileDescriptor, FileDescriptor):
raise TypeError("File descriptor must be a FileDescriptor")
self._fileDescriptor = fileDescriptor
self._dataId = dataId

def __str__(self):
return f"{self.name()}@{self.fileDescriptor.location.path}"
Expand All @@ -77,6 +84,11 @@ def fileDescriptor(self) -> FileDescriptor:
(`FileDescriptor`, read-only)"""
return self._fileDescriptor

@property
def dataId(self) -> DataCoordinate:
"""DataId associated with this formatter (`DataCoordinate`)"""
return self._dataId

@classmethod
def name(cls) -> str:
"""Returns the fully qualified name of the formatter.
Expand Down
28 changes: 16 additions & 12 deletions python/lsst/daf/butler/core/repoTransfers.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from lsst.utils import doImport
from .config import ConfigSubset
from .datasets import DatasetType, DatasetRef
from .utils import NamedValueSet
from .utils import NamedValueSet, iterable

if TYPE_CHECKING:
from .dimensions import DimensionElement, DimensionRecord, ExpandedDataCoordinate
Expand All @@ -57,10 +57,10 @@ class RepoTransferFormatConfig(ConfigSubset):
class FileDataset:
"""A struct that represents a dataset exported to a file.
"""
__slots__ = ("ref", "path", "formatter")
__slots__ = ("refs", "path", "formatter")

ref: DatasetRef
"""Registry information about the dataset (`DatasetRef`).
refs: List[DatasetRef]
"""Registry information about the dataset. (`list` of `DatasetRef`).
"""

path: str
Expand All @@ -77,9 +77,12 @@ class FileDataset:
"""A `Formatter` class or fully-qualified name.
"""

def __init__(self, path: str, ref: DatasetRef, *, formatter: Union[None, str, Type[Formatter]] = None):
def __init__(self, path: str, refs: Union[DatasetRef, List[DatasetRef]], *,
formatter: Union[None, str, Type[Formatter]] = None):
self.path = path
self.ref = ref
if isinstance(refs, DatasetRef):
refs = [refs]
self.refs = refs
self.formatter = formatter


Expand Down Expand Up @@ -331,8 +334,8 @@ def saveDatasets(self, datasetType: DatasetType, run: str, *datasets: FileDatase
"run": run,
"records": [
{
"dataset_id": dataset.ref.id,
"data_id": dataset.ref.dataId.byName(),
"dataset_id": [ref.id for ref in dataset.refs],
"data_id": [ref.dataId.byName() for ref in dataset.refs],
"path": dataset.path,
"formatter": dataset.formatter,
# TODO: look up and save other collections
Expand Down Expand Up @@ -407,7 +410,8 @@ def __init__(self, stream: IO, registry: Registry):
(
FileDataset(
d["path"],
DatasetRef(datasetType, d["data_id"], run=data["run"], id=d["dataset_id"]),
[DatasetRef(datasetType, dataId, run=data["run"], id=refid)
for dataId, refid in zip(iterable(d["data_id"]), iterable(d["dataset_id"]))],
formatter=doImport(d["formatter"])
),
d.get("collections", [])
Expand Down Expand Up @@ -437,13 +441,13 @@ def load(self, datastore: Datastore, *,
# For now, we ignore the dataset_id we pulled from the file
# and just insert without one to get a new autoincrement value.
# Eventually (once we have origin in IDs) we'll preserve them.
fileDataset.ref = self.registry.addDataset(datasetType, fileDataset.ref.dataId, run=run,
recursive=True)
fileDataset.refs = [self.registry.addDataset(datasetType, ref.dataId, run=run,
recursive=True) for ref in fileDataset.refs]
if directory is not None:
fileDataset.path = os.path.join(directory, fileDataset.path)
fileDatasets.append(fileDataset)
for collection in collectionsForDataset:
collections[collection].append(fileDataset.ref)
collections[collection].extend(fileDataset.refs)
datastore.ingest(*fileDatasets, transfer=transfer)
for collection, refs in collections.items():
self.registry.associate(collection, refs)
6 changes: 5 additions & 1 deletion python/lsst/daf/butler/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ def iterable(a):

There are three cases, when the input is:

- iterable, but not a `str` -> iterate over elements
- iterable, but not a `str` or Mapping -> iterate over elements
(e.g. ``[i for i in a]``)
- a `str` -> return single element iterable (e.g. ``[a]``)
- a Mapping -> return single element iterable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes me a little nervous, as I think it's hard to remember what makes a class qualify as a Mapping. Could we at least make this not the default behavior? Or maybe allow general isinstance exceptions via an optional kwarg, i.e.:
iterable(x, except=collections.abc.Mapping) (though that's not terribly readable either).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you ever want iterable to return the keys of a Mapping? The problem here is dataIds and compatbility with old YAML files.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a fair point; I think you're right that this is the only behavior we'd ever actually want for mappings - my concern is more that it's not the behavior I would have naively expected. But it's not a big deal either way.

I do think Python mappings would be much cleaner if they weren't iterable at all and you had to explicitly say keys(), but of course there's nothing we can do about that.

- not iterable -> return single elment iterable (e.g. ``[a]``).

Parameters
Expand All @@ -72,6 +73,9 @@ def iterable(a):
if isinstance(a, str):
yield a
return
if isinstance(a, Mapping):
yield a
return
try:
yield from a
except Exception:
Expand Down
6 changes: 4 additions & 2 deletions python/lsst/daf/butler/datastores/chainedDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,10 @@ def _prepIngest(self, *datasets: FileDataset, transfer: Optional[str] = None) ->
raise NotImplementedError("ChainedDatastore does not support transfer=None or transfer='move'.")

def isDatasetAcceptable(dataset, *, name, constraints):
if not constraints.isAcceptable(dataset.ref):
log.debug("Datastore %s skipping ingest via configuration for ref %s", name, dataset.ref)
acceptable = [ref for ref in dataset.refs if constraints.isAcceptable(ref)]
if not acceptable:
log.debug("Datastore %s skipping ingest via configuration for refs %s",
name, ", ".join(str(ref) for ref in dataset.refs))
return False
else:
return True
Expand Down
22 changes: 14 additions & 8 deletions python/lsst/daf/butler/datastores/fileLikeDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class _IngestPrepData(Datastore.IngestPrepData):
Files to be ingested by this datastore.
"""
def __init__(self, datasets: List[FileDataset]):
super().__init__(dataset.ref for dataset in datasets)
super().__init__(ref for dataset in datasets for ref in dataset.refs)
self.datasets = datasets


Expand Down Expand Up @@ -315,7 +315,8 @@ def _prepare_for_get(self, ref, parameters=None):

formatter = getInstanceOf(storedFileInfo.formatter,
FileDescriptor(location, readStorageClass=readStorageClass,
storageClass=writeStorageClass, parameters=parameters))
storageClass=writeStorageClass, parameters=parameters),
ref.dataId)
formatterParams, assemblerParams = formatter.segregateParameters()

return DatastoreFileGetInformation(location, formatter, storedFileInfo,
Expand Down Expand Up @@ -361,7 +362,8 @@ def _prepare_for_put(self, inMemoryDataset, ref):
try:
formatter = self.formatterFactory.getFormatter(ref,
FileDescriptor(location,
storageClass=storageClass))
storageClass=storageClass),
ref.dataId)
except KeyError as e:
raise DatasetTypeNotSupportedError(f"Unable to find formatter for {ref}") from e

Expand Down Expand Up @@ -448,10 +450,13 @@ def _prepIngest(self, *datasets: FileDataset, transfer: Optional[str] = None) ->
# Docstring inherited from Datastore._prepIngest.
filtered = []
for dataset in datasets:
if not self.constraints.isAcceptable(dataset.ref):
acceptable = [ref for ref in dataset.refs if self.constraints.isAcceptable(ref)]
if not acceptable:
continue
else:
dataset.refs = acceptable
if dataset.formatter is None:
dataset.formatter = self.formatterFactory.getFormatterClass(dataset.ref)
dataset.formatter = self.formatterFactory.getFormatterClass(dataset.refs[0])
else:
dataset.formatter = getClassOf(dataset.formatter)
dataset.path = self._standardizeIngestPath(dataset.path, transfer=transfer)
Expand All @@ -463,9 +468,10 @@ def _finishIngest(self, prepData: Datastore.IngestPrepData, *, transfer: Optiona
# Docstring inherited from Datastore._finishIngest.
refsAndInfos = []
for dataset in prepData.datasets:
info = self._extractIngestInfo(dataset.path, dataset.ref, formatter=dataset.formatter,
transfer=transfer)
refsAndInfos.append((dataset.ref, info))
for ref in dataset.refs:
info = self._extractIngestInfo(dataset.path, ref, formatter=dataset.formatter,
transfer=transfer)
refsAndInfos.append((ref, info))
self._register_datasets(refsAndInfos)

def getUri(self, ref, predict=False):
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/datastores/posixDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ def export(self, refs: Iterable[DatasetRef], *,
raise FileNotFoundError(f"Could not retrieve Dataset {ref}.")
if transfer is None:
# TODO: do we also need to return the readStorageClass somehow?
yield FileDataset(ref=ref, path=location.pathInStore, formatter=storedFileInfo.formatter)
yield FileDataset(refs=[ref], path=location.pathInStore, formatter=storedFileInfo.formatter)
else:
# TODO: add support for other transfer modes. If we support
# moving, this method should become transactional.
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/formatters/jsonFormatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def _readFile(self, path, pytype=None):
"""
try:
with open(path, "rb") as fd:
data = self._fromBytes(fd.read())
data = self._fromBytes(fd.read(), pytype)
except FileNotFoundError:
data = None

Expand Down
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/formatters/pickleFormatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def _readFile(self, path, pytype=None):
"""
try:
with open(path, "rb") as fd:
data = self._fromBytes(fd.read())
data = self._fromBytes(fd.read(), pytype)
except FileNotFoundError:
data = None

Expand Down
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/formatters/yamlFormatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def _readFile(self, path, pytype=None):
"""
try:
with open(path, "rb") as fd:
data = self._fromBytes(fd.read())
data = self._fromBytes(fd.read(), pytype)
except FileNotFoundError:
data = None

Expand Down
2 changes: 1 addition & 1 deletion tests/config/basic/posixDatastore.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ datastore:
templates:
default: "{collection}/{datasetType}.{component:?}/{tract:?}/{patch:?}/{physical_filter:?}/{instrument:?}_{visit:?}"
calexp: "{collection}/{datasetType}.{component:?}/{datasetType}_v{visit}_f{physical_filter:?}_{component:?}"
metric: "{collection}/{datasetType}.{component:?}/{instrument:?}_{datasetType}_v{visit:08d}_f{physical_filter}_{component:?}"
metric: "{collection}/{datasetType}.{component:?}/{instrument:?}_{datasetType}_v{visit:08d}_f{physical_filter}_d{detector:?}_{component:?}"
test_metric_comp: "{collection}/{datasetType}.{component:?}/{datasetType}_v{visit:08d}_f{instrument}_{component:?}"
metric2: "{collection}/{datasetType}.{component:?}/{tract:?}/{patch:?}/{physical_filter:?}/{instrument:?}_{visit.name:?}"
metric3: "{collection}/{datasetType}/{instrument}"
Expand Down
2 changes: 1 addition & 1 deletion tests/config/basic/posixDatastore2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ datastore:
templates:
default: "{run}/{datasetType}.{component:?}/{tract:?}/{patch:?}/{physical_filter:?}/{instrument:?}_{visit:?}"
calexp: "{run}/{datasetType}.{component:?}/{datasetType}_v{visit}_f{physical_filter:?}_{component:?}"
metric: "{run}/{datasetType}.{component:?}/{instrument:?}_{datasetType}_v{visit:08d}_f{physical_filter}_{component:?}"
metric: "{run}/{datasetType}.{component:?}/{instrument:?}_{datasetType}_v{visit:08d}_f{physical_filter}_d{detector:?}{component:?}"
test_metric_comp: "{run}/{datasetType}.{component:?}/{datasetType}_v{visit:08d}_f{instrument}_{component:?}"
formatters:
StructuredDataDictYaml: lsst.daf.butler.formatters.yamlFormatter.YamlFormatter
Expand Down
2 changes: 1 addition & 1 deletion tests/config/basic/s3Datastore.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ datastore:
templates:
default: "{collection}/{datasetType}.{component:?}/{tract:?}/{patch:?}/{physical_filter:?}/{instrument:?}_{visit:?}"
calexp: "{collection}/{datasetType}.{component:?}/{datasetType}_v{visit}_f{physical_filter:?}_{component:?}"
metric: "{collection}/{datasetType}.{component:?}/{instrument:?}_{datasetType}_v{visit:08d}_f{physical_filter}_{component:?}"
metric: "{collection}/{datasetType}.{component:?}/{instrument:?}_{datasetType}_v{visit:08d}_f{physical_filter}_d{detector:?}_{component:?}"
test_metric_comp: "{collection}/{datasetType}.{component:?}/{datasetType}_v{visit:08d}_f{instrument}_{component:?}"
metric3: "{collection}/{datasetType}/{instrument}"
metric4: "{collection}/{component:?}_{instrument}_{physical_filter}_{visit:08d}"
Expand Down
19 changes: 19 additions & 0 deletions tests/data/basic/detector_1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
summary:
AM1: 5.2
AMS: 30.6
output:
a:
- 1
- 2
- 3
b:
blue: 5
red: green
data:
- 563
- 234
- 456.7
- 752
- 8
- 9
- 27
19 changes: 19 additions & 0 deletions tests/data/basic/detector_2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
summary:
AM1: 15.2
AMS: 130.6
output:
a:
- -1
- -2
- -3
b:
blue: 10
red: red
data:
- 1563
- 1234
- 1456.7
- 1752
- 18
- 19
- 127
40 changes: 40 additions & 0 deletions tests/data/basic/detectors.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
detector1:
summary:
AM1: 5.2
AMS: 30.6
output:
a:
- 1
- 2
- 3
b:
blue: 5
red: green
data:
- 563
- 234
- 456.7
- 752
- 8
- 9
- 27
detector2:
summary:
AM1: 15.2
AMS: 130.6
output:
a:
- -1
- -2
- -3
b:
blue: 10
red: red
data:
- 1563
- 1234
- 1456.7
- 1752
- 18
- 19
- 127