Skip to content

Commit

Permalink
Use vectorized datastore export and handle multi-ref files.
Browse files Browse the repository at this point in the history
This addresses what seemed to be a TODO comment whose conditions had
been met a while ago.
  • Loading branch information
TallJimbo committed Jun 18, 2021
1 parent 4ce8b27 commit 2cbf299
Showing 1 changed file with 26 additions and 12 deletions.
38 changes: 26 additions & 12 deletions python/lsst/daf/butler/transfers/_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,26 +207,40 @@ def saveDatasets(self, refs: Iterable[DatasetRef], *,
collections. Other collections will be included in the export in the
future (once `Registry` provides a way to look up that information).
"""
dataIds = set()
data_ids = set()
refs_to_export = {}
for ref in sorted(refs):
dataset_id = ref.getCheckedId()
# The query interfaces that are often used to generate the refs
# passed here often don't remove duplicates, so do that here for
# convenience.
if ref.id in self._dataset_ids:
if dataset_id in self._dataset_ids or dataset_id in refs_to_export:
continue
# Also convert components to composites.
if ref.isComponent():
ref = ref.makeCompositeRef()
dataIds.add(ref.dataId)
# `exports` is a single-element list here, because we anticipate
# a future where more than just Datastore.export has a vectorized
# API and we can pull this out of the loop.
exports = self._datastore.export([ref], directory=self._directory, transfer=self._transfer)
data_ids.add(ref.dataId)
refs_to_export[dataset_id] = ref
# Do a vectorized datastore export, which might be a lot faster than
# one-by-one.
exports = self._datastore.export(
refs_to_export.values(),
directory=self._directory,
transfer=self._transfer,
)
# Export associated data IDs.
self.saveDataIds(data_ids, elements=elements)
# Rewrite export filenames if desired, and then save them to the
# data structure we'll write in `_finish`.
# If a single exported FileDataset has multiple DatasetRefs, we save
# it with each of them.
for file_dataset in exports:
if rewrite is not None:
exports = [rewrite(export) for export in exports]
self._dataset_ids.add(ref.getCheckedId())
assert ref.run is not None
self._datasets[ref.datasetType][ref.run].extend(exports)
self.saveDataIds(dataIds, elements=elements)
file_dataset = rewrite(file_dataset)
for ref in file_dataset.refs:
assert ref.run is not None
self._datasets[ref.datasetType][ref.run].append(file_dataset)
self._dataset_ids.update(refs_to_export.keys())

def _finish(self) -> None:
"""Delegate to the backend to finish the export process.
Expand Down

0 comments on commit 2cbf299

Please sign in to comment.