Skip to content

Commit

Permalink
Add support for ingesting multiple datasets from a single file
Browse files Browse the repository at this point in the history
FileDataset now uses a list of DatasetRef and ingest iterates
over them.
Formatters can now access the associated dataId so that they
can extract a subset from a file.
  • Loading branch information
timj committed Jan 13, 2020
1 parent f41850c commit 4e8f1c6
Show file tree
Hide file tree
Showing 13 changed files with 116 additions and 45 deletions.
5 changes: 3 additions & 2 deletions python/lsst/daf/butler/_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -706,8 +706,9 @@ class used. `FileDataset.path` attributes may be modified to put
raise TypeError("Butler is read-only.")
# TODO: once Registry has vectorized API for addDataset, use it here.
for dataset in datasets:
dataset.ref = self.registry.addDataset(dataset.ref.datasetType, dataset.ref.dataId,
run=self.run, recursive=True)
for i, ref in enumerate(dataset.refs):
dataset.refs[i] = self.registry.addDataset(ref.datasetType, ref.dataId,
run=self.run, recursive=True)
self.datastore.ingest(*datasets, transfer=transfer)

@contextlib.contextmanager
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/core/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ def ingest(self, *datasets: FileDataset, transfer: Optional[str] = None):
in their class documentation.
"""
prepData = self._prepIngest(*datasets, transfer=transfer)
refs = {dataset.ref.id: dataset.ref for dataset in datasets}
refs = {ref.id: ref for dataset in datasets for ref in dataset.refs}
if refs.keys() != prepData.refs.keys():
unsupported = refs.keys() - prepData.refs.keys()
# Group unsupported refs by DatasetType for an informative
Expand Down
14 changes: 12 additions & 2 deletions python/lsst/daf/butler/core/formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from abc import ABCMeta, abstractmethod
import logging
import copy
from typing import ClassVar, Set, FrozenSet, Union, Optional, Dict, Any, Tuple, Type
from typing import ClassVar, Set, FrozenSet, Union, Optional, Dict, Any, Tuple, Type, TYPE_CHECKING

from .configSupport import processLookupConfigs, LookupKey
from .mappingFactory import MappingFactory
Expand All @@ -42,6 +42,10 @@
Entity = Union[DatasetType, DatasetRef, StorageClass, str]


if TYPE_CHECKING:
from .dimensions import DataCoordinate


class Formatter(metaclass=ABCMeta):
"""Interface for reading and writing Datasets with a particular
`StorageClass`.
Expand All @@ -60,10 +64,11 @@ class Formatter(metaclass=ABCMeta):
are supported (`frozenset`).
"""

def __init__(self, fileDescriptor: FileDescriptor):
def __init__(self, fileDescriptor: FileDescriptor, dataId: "DataCoordinate" = None):
if not isinstance(fileDescriptor, FileDescriptor):
raise TypeError("File descriptor must be a FileDescriptor")
self._fileDescriptor = fileDescriptor
self._dataId = dataId

def __str__(self):
return f"{self.name()}@{self.fileDescriptor.location.path}"
Expand All @@ -77,6 +82,11 @@ def fileDescriptor(self) -> FileDescriptor:
(`FileDescriptor`, read-only)"""
return self._fileDescriptor

@property
def dataId(self) -> Dict:
"""DataId associated with this formatter (`dict`)"""
return self._dataId

@classmethod
def name(cls) -> str:
"""Returns the fully qualified name of the formatter.
Expand Down
29 changes: 17 additions & 12 deletions python/lsst/daf/butler/core/repoTransfers.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from lsst.utils import doImport
from .config import ConfigSubset
from .datasets import DatasetType, DatasetRef
from .utils import NamedValueSet
from .utils import NamedValueSet, iterable

if TYPE_CHECKING:
from .dimensions import DimensionElement, DimensionRecord, ExpandedDataCoordinate
Expand All @@ -57,10 +57,11 @@ class RepoTransferFormatConfig(ConfigSubset):
class FileDataset:
"""A struct that represents a dataset exported to a file.
"""
__slots__ = ("ref", "path", "formatter")
__slots__ = ("refs", "path", "formatter")

ref: DatasetRef
"""Registry information about the dataset (`DatasetRef`).
refs: List[DatasetRef]
"""Registry information about the dataset. (`DatasetRef` or `list` of
`DatasetRef`).
"""

path: str
Expand All @@ -77,9 +78,12 @@ class FileDataset:
"""A `Formatter` class or fully-qualified name.
"""

def __init__(self, path: str, ref: DatasetRef, *, formatter: Union[None, str, Type[Formatter]] = None):
def __init__(self, path: str, refs: Union[DatasetRef, List[DatasetRef]], *,
formatter: Union[None, str, Type[Formatter]] = None):
self.path = path
self.ref = ref
if isinstance(refs, DatasetRef):
refs = [refs]
self.refs = refs
self.formatter = formatter


Expand Down Expand Up @@ -331,8 +335,8 @@ def saveDatasets(self, datasetType: DatasetType, run: str, *datasets: FileDatase
"run": run,
"records": [
{
"dataset_id": dataset.ref.id,
"data_id": dataset.ref.dataId.byName(),
"dataset_id": [ref.id for ref in dataset.refs],
"data_id": [ref.dataId.byName() for ref in dataset.refs],
"path": dataset.path,
"formatter": dataset.formatter,
# TODO: look up and save other collections
Expand Down Expand Up @@ -407,7 +411,8 @@ def __init__(self, stream: IO, registry: Registry):
(
FileDataset(
d["path"],
DatasetRef(datasetType, d["data_id"], run=data["run"], id=d["dataset_id"]),
[DatasetRef(datasetType, dataId, run=data["run"], id=refid)
for dataId, refid in zip(iterable(d["data_id"]), iterable(d["dataset_id"]))],
formatter=doImport(d["formatter"])
),
d.get("collections", [])
Expand Down Expand Up @@ -437,13 +442,13 @@ def load(self, datastore: Datastore, *,
# For now, we ignore the dataset_id we pulled from the file
# and just insert without one to get a new autoincrement value.
# Eventually (once we have origin in IDs) we'll preserve them.
fileDataset.ref = self.registry.addDataset(datasetType, fileDataset.ref.dataId, run=run,
recursive=True)
fileDataset.refs = [self.registry.addDataset(datasetType, ref.dataId, run=run,
recursive=True) for ref in fileDataset.refs]
if directory is not None:
fileDataset.path = os.path.join(directory, fileDataset.path)
fileDatasets.append(fileDataset)
for collection in collectionsForDataset:
collections[collection].append(fileDataset.ref)
collections[collection].extend(fileDataset.refs)
datastore.ingest(*fileDatasets, transfer=transfer)
for collection, refs in collections.items():
self.registry.associate(collection, refs)
6 changes: 4 additions & 2 deletions python/lsst/daf/butler/datastores/chainedDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,10 @@ def _prepIngest(self, *datasets: FileDataset, transfer: Optional[str] = None) ->
raise NotImplementedError("ChainedDatastore does not support transfer=None or transfer='move'.")

def isDatasetAcceptable(dataset, *, name, constraints):
if not constraints.isAcceptable(dataset.ref):
log.debug("Datastore %s skipping ingest via configuration for ref %s", name, dataset.ref)
acceptable = [ref for ref in dataset.refs if constraints.isAcceptable(ref)]
if not acceptable:
log.debug("Datastore %s skipping ingest via configuration for refs %s",
name, ", ".join(str(ref) for ref in dataset.refs))
return False
else:
return True
Expand Down
22 changes: 14 additions & 8 deletions python/lsst/daf/butler/datastores/fileLikeDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class _IngestPrepData(Datastore.IngestPrepData):
Files to be ingested by this datastore.
"""
def __init__(self, datasets: List[FileDataset]):
super().__init__(dataset.ref for dataset in datasets)
super().__init__(ref for dataset in datasets for ref in dataset.refs)
self.datasets = datasets


Expand Down Expand Up @@ -315,7 +315,8 @@ def _prepare_for_get(self, ref, parameters=None):

formatter = getInstanceOf(storedFileInfo.formatter,
FileDescriptor(location, readStorageClass=readStorageClass,
storageClass=writeStorageClass, parameters=parameters))
storageClass=writeStorageClass, parameters=parameters),
ref.dataId)
formatterParams, assemblerParams = formatter.segregateParameters()

return DatastoreFileGetInformation(location, formatter, storedFileInfo,
Expand Down Expand Up @@ -361,7 +362,8 @@ def _prepare_for_put(self, inMemoryDataset, ref):
try:
formatter = self.formatterFactory.getFormatter(ref,
FileDescriptor(location,
storageClass=storageClass))
storageClass=storageClass),
ref.dataId)
except KeyError as e:
raise DatasetTypeNotSupportedError(f"Unable to find formatter for {ref}") from e

Expand Down Expand Up @@ -448,10 +450,13 @@ def _prepIngest(self, *datasets: FileDataset, transfer: Optional[str] = None) ->
# Docstring inherited from Datastore._prepIngest.
filtered = []
for dataset in datasets:
if not self.constraints.isAcceptable(dataset.ref):
acceptable = [ref for ref in dataset.refs if self.constraints.isAcceptable(ref)]
if not acceptable:
continue
else:
dataset.refs = acceptable
if dataset.formatter is None:
dataset.formatter = self.formatterFactory.getFormatterClass(dataset.ref)
dataset.formatter = self.formatterFactory.getFormatterClass(dataset.refs[0])
else:
dataset.formatter = getClassOf(dataset.formatter)
dataset.path = self._standardizeIngestPath(dataset.path, transfer=transfer)
Expand All @@ -463,9 +468,10 @@ def _finishIngest(self, prepData: Datastore.IngestPrepData, *, transfer: Optiona
# Docstring inherited from Datastore._finishIngest.
refsAndInfos = []
for dataset in prepData.datasets:
info = self._extractIngestInfo(dataset.path, dataset.ref, formatter=dataset.formatter,
transfer=transfer)
refsAndInfos.append((dataset.ref, info))
for ref in dataset.refs:
info = self._extractIngestInfo(dataset.path, ref, formatter=dataset.formatter,
transfer=transfer)
refsAndInfos.append((ref, info))
self._register_datasets(refsAndInfos)

def getUri(self, ref, predict=False):
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/datastores/posixDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ def export(self, refs: Iterable[DatasetRef], *,
raise FileNotFoundError(f"Could not retrieve Dataset {ref}.")
if transfer is None:
# TODO: do we also need to return the readStorageClass somehow?
yield FileDataset(ref=ref, path=location.pathInStore, formatter=storedFileInfo.formatter)
yield FileDataset(refs=[ref], path=location.pathInStore, formatter=storedFileInfo.formatter)
else:
# TODO: add support for other transfer modes. If we support
# moving, this method should become transactional.
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/formatters/jsonFormatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def _readFile(self, path, pytype=None):
"""
try:
with open(path, "rb") as fd:
data = self._fromBytes(fd.read())
data = self._fromBytes(fd.read(), pytype)
except FileNotFoundError:
data = None

Expand Down
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/formatters/pickleFormatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def _readFile(self, path, pytype=None):
"""
try:
with open(path, "rb") as fd:
data = self._fromBytes(fd.read())
data = self._fromBytes(fd.read(), pytype)
except FileNotFoundError:
data = None

Expand Down
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/formatters/yamlFormatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def _readFile(self, path, pytype=None):
"""
try:
with open(path, "rb") as fd:
data = self._fromBytes(fd.read())
data = self._fromBytes(fd.read(), pytype)
except FileNotFoundError:
data = None

Expand Down
22 changes: 22 additions & 0 deletions tests/datasetsHelper.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

__all__ = ("FitsCatalogDatasetsHelper", "DatasetTestHelper", "DatastoreTestHelper",
"BadWriteFormatter", "BadNoWriteFormatter", "MultiDetectorFormatter")

import os
from lsst.daf.butler import DatasetType, DatasetRef
from lsst.daf.butler.formatters.yamlFormatter import YamlFormatter
Expand Down Expand Up @@ -139,3 +142,22 @@ class BadNoWriteFormatter(BadWriteFormatter):

def _writeFile(self, inMemoryDataset):
raise RuntimeError("Did not writing anything at all")


class MultiDetectorFormatter(YamlFormatter):

def _writeFile(self, inMemoryDataset):
raise NotImplementedError("Can not write")

def _fromBytes(self, serializedDataset, pytype=None):
data = super()._fromBytes(serializedDataset)
if self.dataId is None:
raise RuntimeError("This formatter requires a dataId")
if "detector" not in self.dataId:
raise RuntimeError("This formatter requires detector to be present in dataId")
key = f"detector{self.dataId['detector']}"
print(key)
print(data)
if key in data:
return pytype(data[key])
raise RuntimeError(f"Could not find '{key}' in data file")
27 changes: 26 additions & 1 deletion tests/test_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def mock_s3(cls):
from lsst.daf.butler.core.s3utils import (s3CheckFileExists, setAwsEnvCredentials,
unsetAwsEnvCredentials)

from datasetsHelper import MultiDetectorFormatter

TESTDIR = os.path.abspath(os.path.dirname(__file__))

Expand Down Expand Up @@ -310,7 +311,7 @@ def testIngest(self):
refIn = DatasetRef(datasetType, dataId, id=None)

datasets.append(FileDataset(path=metricFile,
ref=refIn,
refs=[refIn],
formatter=formatter))

butler.ingest(*datasets, transfer="copy")
Expand All @@ -321,6 +322,30 @@ def testIngest(self):
"detector": 2, "visit": 423})
self.assertNotEqual(metrics1, metrics2)

# Now do a multi-dataset but single file ingest
metricFile = os.path.join(dataRoot, "detectors.yaml")
refs = []
for detector in (1, 2):
detector_name = f"detector_{detector}"
dataId = {"instrument": "DummyCamComp", "visit": 424, "detector": detector}
# Create a DatasetRef for ingest
refs.append(DatasetRef(datasetType, dataId, id=None))

datasets = []
datasets.append(FileDataset(path=metricFile,
refs=refs,
formatter=MultiDetectorFormatter))

butler.ingest(*datasets, transfer="copy")

multi1 = butler.get(datasetTypeName, {"instrument": "DummyCamComp",
"detector": 1, "visit": 424})
multi2 = butler.get(datasetTypeName, {"instrument": "DummyCamComp",
"detector": 2, "visit": 424})

self.assertEqual(multi1, metrics1)
self.assertEqual(multi2, metrics2)

def testPickle(self):
"""Test pickle support.
"""
Expand Down

0 comments on commit 4e8f1c6

Please sign in to comment.