Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-34887: Transfer dimension records with butler.transfer_from() #708

Merged
merged 6 commits into from
Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions doc/changes/DM-34887.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Now ``Butler.transfer_from()`` can copy dimension records as well as datasets.
This significantly enhances the usability of this method when transferring between disconnected Butlers.
The ``butler transfer-datasets`` command will transfer dimension records by default but this can be disabled with the ``--no-transfer-dimensions`` option (which can be more efficient if you know that the destination Butler contains all the records).
36 changes: 36 additions & 0 deletions python/lsst/daf/butler/_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@
Datastore,
Dimension,
DimensionConfig,
DimensionElement,
DimensionRecord,
DimensionUniverse,
FileDataset,
Progress,
Expand Down Expand Up @@ -2115,6 +2117,7 @@ def transfer_from(
id_gen_map: Dict[str, DatasetIdGenEnum] = None,
skip_missing: bool = True,
register_dataset_types: bool = False,
transfer_dimensions: bool = False,
) -> List[DatasetRef]:
"""Transfer datasets to this Butler from a run in another Butler.

Expand All @@ -2140,6 +2143,9 @@ def transfer_from(
register_dataset_types : `bool`
If `True` any missing dataset types are registered. Otherwise
an exception is raised.
transfer_dimensions : `bool`, optional
If `True`, dimension record data associated with the new datasets
will be transferred.

Returns
-------
Expand Down Expand Up @@ -2235,6 +2241,27 @@ def transfer_from(
else:
log.log(VERBOSE, "All required dataset types are known to the target Butler")

dimension_records: Dict[DimensionElement, Dict[DataCoordinate, DimensionRecord]] = defaultdict(dict)
if transfer_dimensions:
# Collect all the dimension records for these refs.
# All dimensions are to be copied but the list of valid dimensions
# come from this butler's universe.
elements = frozenset(
element
for element in self.registry.dimensions.getStaticElements()
if element.hasTable() and element.viewOf is None
)
dataIds = set(ref.dataId for ref in source_refs)
# This logic comes from saveDataIds.
for dataId in dataIds:
# Should be a no-op if the ref has already been expanded.
dataId = source_butler.registry.expandDataId(dataId)
# If this butler doesn't know about a dimension in the source
# butler things will break later.
for record in dataId.records.values():
if record is not None and record.definition in elements:
dimension_records[record.definition].setdefault(record.dataId, record)

# The returned refs should be identical for UUIDs.
# For now must also support integers and so need to retain the
# newly-created refs from this registry.
Expand All @@ -2246,6 +2273,15 @@ def transfer_from(

# Do all the importing in a single transaction.
with self.transaction():
if dimension_records:
log.verbose("Ensuring that dimension records exist for transferred datasets.")
for element, r in dimension_records.items():
records = [r[dataId] for dataId in r]
# Assume that if the record is already present that we can
# use it without having to check that the record metadata
# is consistent.
self.registry.insertDimensionData(element, *records, skip_existing=True)

for (datasetType, run), refs_to_import in progress.iter_item_chunks(
grouped_refs.items(), desc="Importing to registry by run and dataset type"
):
Expand Down
11 changes: 11 additions & 0 deletions python/lsst/daf/butler/cli/cmd/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,17 @@ def retrieve_artifacts(**kwargs):
@query_datasets_options(showUri=False, useArguments=False, repo=False)
@transfer_option()
@register_dataset_types_option()
@click.option(
"--transfer-dimensions/--no-transfer-dimensions",
is_flag=True,
default=True,
help=unwrap(
"""If true, also copy dimension records along with datasets.
If the dmensions are already present in the destination butler it
can be more efficient to disable this. The default is to transfer
dimensions."""
),
)
@options_file_option()
def transfer_datasets(**kwargs):
"""Transfer datasets from a source butler to a destination butler.
Expand Down
14 changes: 11 additions & 3 deletions python/lsst/daf/butler/registry/dimensions/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ def insert(self, *records: DimensionRecord, replace: bool = False, skip_existing
else:
self._db.insert(self._table, *elementRows)
if self._skyPixOverlap is not None:
self._skyPixOverlap.insert(records, replace=replace)
self._skyPixOverlap.insert(records, replace=replace, skip_existing=skip_existing)

def sync(self, record: DimensionRecord, update: bool = False) -> Union[bool, Dict[str, Any]]:
# Docstring inherited from DimensionRecordStorage.sync.
Expand Down Expand Up @@ -629,7 +629,9 @@ def _fill(
)
self._db.insert(self._overlapTable, *overlapRecords)

def insert(self, records: Sequence[DimensionRecord], replace: bool = False) -> None:
def insert(
self, records: Sequence[DimensionRecord], replace: bool = False, skip_existing: bool = False
) -> None:
"""Insert overlaps for a sequence of ``self.element`` records that
have just been inserted.

Expand All @@ -646,6 +648,9 @@ def insert(self, records: Sequence[DimensionRecord], replace: bool = False) -> N
If `True` (`False` is default) one or more of the given records may
already exist and is being updated, so we need to delete any
existing overlap records first.
skip_existing : `bool`, optional
If `True` (`False` is default), skip insertion if a record with
the same primary key values already exists.
"""
# Group records by family.governor value.
grouped: Dict[str, List[DimensionRecord]] = defaultdict(list)
Expand Down Expand Up @@ -716,7 +721,10 @@ def insert(self, records: Sequence[DimensionRecord], replace: bool = False) -> N
self._governor.element.name,
grouped.keys(),
)
self._db.insert(self._overlapTable, *overlapRecords)
if skip_existing:
self._db.ensure(self._overlapTable, *overlapRecords, primary_key_only=True)
else:
self._db.insert(self._overlapTable, *overlapRecords)

def _compute(
self,
Expand Down
11 changes: 10 additions & 1 deletion python/lsst/daf/butler/script/transferDatasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def transferDatasets(
find_first: bool,
transfer: str,
register_dataset_types: bool,
transfer_dimensions: bool = True,
) -> int:
"""Transfer datasets from run in source to dest.

Expand All @@ -61,6 +62,10 @@ def transferDatasets(
Transfer mode to use when placing artifacts in the destination.
register_dataset_types : `bool`
Indicate whether missing dataset types should be registered.
transfer_dimensions : `bool`
Indicate whether dimensions should be transferred along with
datasets. It can be more efficient to disable this if it is known
that all dimensions exist.
"""
source_butler = Butler(source, writeable=False)
dest_butler = Butler(dest, writeable=True)
Expand All @@ -81,6 +86,10 @@ def transferDatasets(
source_refs_set = set(source_refs)

transferred = dest_butler.transfer_from(
source_butler, source_refs_set, transfer=transfer, register_dataset_types=register_dataset_types
source_butler,
source_refs_set,
transfer=transfer,
register_dataset_types=register_dataset_types,
transfer_dimensions=transfer_dimensions,
)
return len(transferred)
54 changes: 36 additions & 18 deletions tests/test_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2077,25 +2077,24 @@ def assertButlerTransfers(self, id_gen_map=None, purge=False, storageClassName="
for run in runs:
self.source_butler.registry.registerCollection(run, CollectionType.RUN)

# Create dimensions in both butlers (transfer will not create them).
# Create dimensions in source butler.
n_exposures = 30
for butler in (self.source_butler, self.target_butler):
butler.registry.insertDimensionData("instrument", {"name": "DummyCamComp"})
butler.registry.insertDimensionData(
"physical_filter", {"instrument": "DummyCamComp", "name": "d-r", "band": "R"}
)
butler.registry.insertDimensionData(
"detector", {"instrument": "DummyCamComp", "id": 1, "full_name": "det1"}
)
self.source_butler.registry.insertDimensionData("instrument", {"name": "DummyCamComp"})
self.source_butler.registry.insertDimensionData(
"physical_filter", {"instrument": "DummyCamComp", "name": "d-r", "band": "R"}
)
self.source_butler.registry.insertDimensionData(
"detector", {"instrument": "DummyCamComp", "id": 1, "full_name": "det1"}
)

for i in range(n_exposures):
butler.registry.insertDimensionData(
"exposure",
{"instrument": "DummyCamComp", "id": i, "obs_id": f"exp{i}", "physical_filter": "d-r"},
)
for i in range(n_exposures):
self.source_butler.registry.insertDimensionData(
"exposure",
{"instrument": "DummyCamComp", "id": i, "obs_id": f"exp{i}", "physical_filter": "d-r"},
)

# Create dataset types in the source butler.
dimensions = butler.registry.dimensions.extract(["instrument", "exposure"])
dimensions = self.source_butler.registry.dimensions.extract(["instrument", "exposure"])
for datasetTypeName in datasetTypeNames:
datasetType = DatasetType(datasetTypeName, dimensions, storageClass)
self.source_butler.registry.registerDatasetType(datasetType)
Expand Down Expand Up @@ -2170,8 +2169,10 @@ def assertButlerTransfers(self, id_gen_map=None, purge=False, storageClassName="
for datasetTypeName in datasetTypeNames:
datasetType = DatasetType(datasetTypeName, dimensions, badStorageClass)
self.target_butler.registry.registerDatasetType(datasetType)
with self.assertRaises(ConflictingDefinitionError):
with self.assertRaises(ConflictingDefinitionError) as cm:
self.target_butler.transfer_from(self.source_butler, source_refs, id_gen_map=id_gen_map)
self.assertIn("dataset type differs", str(cm.exception))

# And remove the bad definitions.
for datasetTypeName in datasetTypeNames:
self.target_butler.registry.removeDatasetType(datasetTypeName)
Expand All @@ -2180,10 +2181,27 @@ def assertButlerTransfers(self, id_gen_map=None, purge=False, storageClassName="
with self.assertRaises(KeyError):
self.target_butler.transfer_from(self.source_butler, source_refs, id_gen_map=id_gen_map)

# Now transfer them to the second butler
# Transfer without creating dimensions should fail.
with self.assertRaises(ConflictingDefinitionError) as cm:
self.target_butler.transfer_from(
self.source_butler, source_refs, id_gen_map=id_gen_map, register_dataset_types=True
)
self.assertIn("dimension", str(cm.exception))

# The failed transfer above leaves registry in an inconsistent
# state because the run is created but then rolled back without
# the collection cache being cleared. For now force a refresh.
# Can remove with DM-35498.
self.target_butler.registry.refresh()

# Now transfer them to the second butler, including dimensions.
with self.assertLogs(level=logging.DEBUG) as cm:
transferred = self.target_butler.transfer_from(
self.source_butler, source_refs, id_gen_map=id_gen_map, register_dataset_types=True
self.source_butler,
source_refs,
id_gen_map=id_gen_map,
register_dataset_types=True,
transfer_dimensions=True,
)
self.assertEqual(len(transferred), n_expected)
log_output = ";".join(cm.output)
Expand Down