Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-38409: Remove support for int dataset IDs in Butler.transfer_from #807

Merged
merged 2 commits into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions doc/changes/DM-38409.removal.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Removed support for non-UUID dataset IDs in ``Butler.transfer_from()``.
The ``id_gen_map`` parameter has been removed and the ``local_refs`` parameter has been removed from ``Datastore.transfer_from()``.
68 changes: 12 additions & 56 deletions python/lsst/daf/butler/_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2179,11 +2179,10 @@ def transfer_from(
source_butler: LimitedButler,
source_refs: Iterable[DatasetRef],
transfer: str = "auto",
id_gen_map: Dict[str, DatasetIdGenEnum] | None = None,
skip_missing: bool = True,
register_dataset_types: bool = False,
transfer_dimensions: bool = False,
) -> List[DatasetRef]:
) -> collections.abc.Collection[DatasetRef]:
"""Transfer datasets to this Butler from a run in another Butler.

Parameters
Expand All @@ -2197,11 +2196,6 @@ def transfer_from(
this butler.
transfer : `str`, optional
Transfer mode passed to `~lsst.daf.butler.Datastore.transfer_from`.
id_gen_map : `dict` of [`str`, `DatasetIdGenEnum`], optional
A mapping of dataset type to ID generation mode. Only used if
the source butler is using integer IDs. Should not be used
if this receiving butler uses integer IDs. Without this dataset
import always uses unique.
skip_missing : `bool`
If `True`, datasets with no datastore artifact associated with
them are not transferred. If `False` a registry entry will be
Expand All @@ -2221,8 +2215,7 @@ def transfer_from(

Notes
-----
Requires that any dimension definitions are already present in the
receiving Butler. The datastore artifact has to exist for a transfer
The datastore artifact has to exist for a transfer
to be made but non-existence is not an error.

Datasets that already exist in this run will be skipped.
Expand All @@ -2244,9 +2237,6 @@ def transfer_from(
original_count = len(source_refs)
log.info("Transferring %d datasets into %s", original_count, str(self))

if id_gen_map is None:
id_gen_map = {}

# In some situations the datastore artifact may be missing
# and we do not want that registry entry to be imported.
# Asking datastore is not sufficient, the records may have been
Expand All @@ -2272,10 +2262,8 @@ def transfer_from(
# before doing the import.
source_dataset_types = set()
grouped_refs = defaultdict(list)
grouped_indices = defaultdict(list)
for i, ref in enumerate(source_refs):
for ref in source_refs:
grouped_refs[ref.datasetType, ref.run].append(ref)
grouped_indices[ref.datasetType, ref.run].append(i)
source_dataset_types.add(ref.datasetType)

# Check to see if the dataset type in the source butler has
Expand Down Expand Up @@ -2336,13 +2324,6 @@ def transfer_from(
if record is not None and record.definition in elements:
dimension_records[record.definition].setdefault(record.dataId, record)

# The returned refs should be identical for UUIDs.
# For now must also support integers and so need to retain the
# newly-created refs from this registry.
# Pre-size it so we can assign refs into the correct slots
transferred_refs_tmp: List[Optional[DatasetRef]] = [None] * len(source_refs)
default_id_gen = DatasetIdGenEnum.UNIQUE

handled_collections: Set[str] = set()

# Do all the importing in a single transaction.
Expand All @@ -2356,6 +2337,7 @@ def transfer_from(
# is consistent.
self.registry.insertDimensionData(element, *records, skip_existing=True)

n_imported = 0
for (datasetType, run), refs_to_import in progress.iter_item_chunks(
grouped_refs.items(), desc="Importing to registry by run and dataset type"
):
Expand All @@ -2370,12 +2352,6 @@ def transfer_from(
if registered:
log.log(VERBOSE, "Creating output run %s", run)

id_generation_mode = default_id_gen
if isinstance(refs_to_import[0].id, int):
# ID generation mode might need to be overridden when
# targetting UUID
id_generation_mode = id_gen_map.get(datasetType.name, default_id_gen)

n_refs = len(refs_to_import)
log.verbose(
"Importing %d ref%s of dataset type %s into run %s",
Expand All @@ -2385,40 +2361,20 @@ def transfer_from(
run,
)

# No way to know if this butler's registry uses UUID.
# We have to trust the caller on this. If it fails they will
# have to change their approach. We can't catch the exception
# and retry with unique because that will mess up the
# transaction handling. We aren't allowed to ask the registry
# manager what type of ID it is using.
imported_refs = self.registry._importDatasets(
refs_to_import, idGenerationMode=id_generation_mode, expand=False
)

# Map them into the correct slots to match the initial order
for i, ref in zip(grouped_indices[datasetType, run], imported_refs):
transferred_refs_tmp[i] = ref

# Mypy insists that we might have None in here so we have to make
# that explicit by assigning to a new variable and filtering out
# something that won't be there.
transferred_refs = [ref for ref in transferred_refs_tmp if ref is not None]

# Check consistency
assert len(source_refs) == len(transferred_refs), "Different number of refs imported than given"

log.verbose("Imported %d datasets into destination butler", len(transferred_refs))
# Assume we are using UUIDs and the source refs will match
# those imported.
imported_refs = self.registry._importDatasets(refs_to_import, expand=False)
assert set(imported_refs) == set(refs_to_import)
n_imported += len(imported_refs)

# The transferred refs need to be reordered to match the original
# ordering given by the caller. Without this the datastore transfer
# will be broken.
assert len(source_refs) == n_imported
log.verbose("Imported %d datasets into destination butler", n_imported)

# Ask the datastore to transfer. The datastore has to check that
# the source datastore is compatible with the target datastore.
accepted, rejected = self.datastore.transfer_from(
source_butler.datastore,
source_refs,
local_refs=transferred_refs,
transfer=transfer,
artifact_existence=artifact_existence,
)
Expand All @@ -2432,7 +2388,7 @@ def transfer_from(
run,
)

return transferred_refs
return source_refs

def validateConfiguration(
self,
Expand Down
5 changes: 0 additions & 5 deletions python/lsst/daf/butler/core/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,6 @@ def transfer_from(
self,
source_datastore: Datastore,
refs: Iterable[DatasetRef],
local_refs: Optional[Iterable[DatasetRef]] = None,
transfer: str = "auto",
artifact_existence: Optional[Dict[ResourcePath, bool]] = None,
) -> tuple[set[DatasetRef], set[DatasetRef]]:
Expand All @@ -752,10 +751,6 @@ def transfer_from(
must be compatible with this datastore receiving the artifacts.
refs : iterable of `DatasetRef`
The datasets to transfer from the source datastore.
local_refs : iterable of `DatasetRef`, optional
The dataset refs associated with the registry associated with
this datastore. Can be `None` if the source and target datastore
are using UUIDs.
transfer : `str`, optional
How (and whether) the dataset should be added to the datastore.
Choices include "move", "copy",
Expand Down
24 changes: 7 additions & 17 deletions python/lsst/daf/butler/datastores/chainedDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,6 @@ def transfer_from(
self,
source_datastore: Datastore,
refs: Iterable[DatasetRef],
local_refs: Optional[Iterable[DatasetRef]] = None,
transfer: str = "auto",
artifact_existence: Optional[Dict[ResourcePath, bool]] = None,
) -> tuple[set[DatasetRef], set[DatasetRef]]:
Expand All @@ -1038,11 +1037,7 @@ def transfer_from(
source_datastores = tuple([source_datastore])

# Need to know the set of all possible refs that could be transferred.
if local_refs is None:
remaining_refs = set(refs)
local_refs = refs
else:
remaining_refs = set(local_refs)
remaining_refs = set(refs)

missing_from_source: set[DatasetRef] | None = None
all_accepted = set()
Expand Down Expand Up @@ -1078,11 +1073,9 @@ def transfer_from(
# Filter the initial list based on the datasets we have
# not yet transferred.
these_refs = []
these_local = []
for ref, local_ref in zip(refs, local_refs):
if local_ref in remaining_refs and known_to_source[ref]:
for ref in refs:
if ref in remaining_refs and known_to_source[ref]:
these_refs.append(ref)
these_local.append(local_ref)

if not these_refs:
# Already transferred all datasets known to this datastore.
Expand All @@ -1091,19 +1084,16 @@ def transfer_from(
for datastore, constraints in zip(self.datastores, self.datastoreConstraints):
if constraints is not None:
filtered_refs = []
filtered_local = []
for ref, local_ref in zip(these_refs, these_local):
if constraints.isAcceptable(local_ref):
for ref in these_refs:
if constraints.isAcceptable(ref):
filtered_refs.append(ref)
filtered_local.append(local_ref)
else:
log.debug("Rejecting ref by constraints: %s", local_ref)
log.debug("Rejecting ref by constraints: %s", ref)
else:
filtered_refs = [ref for ref in these_refs]
filtered_local = [ref for ref in these_local]
try:
accepted, _ = datastore.transfer_from(
source_child, filtered_refs, filtered_local, transfer, artifact_existence
source_child, filtered_refs, transfer, artifact_existence
)
except (TypeError, NotImplementedError):
# The datastores were incompatible.
Expand Down
24 changes: 9 additions & 15 deletions python/lsst/daf/butler/datastores/fileDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -2479,7 +2479,6 @@ def transfer_from(
self,
source_datastore: Datastore,
refs: Iterable[DatasetRef],
local_refs: Optional[Iterable[DatasetRef]] = None,
transfer: str = "auto",
artifact_existence: Optional[Dict[ResourcePath, bool]] = None,
) -> tuple[set[DatasetRef], set[DatasetRef]]:
Expand Down Expand Up @@ -2515,11 +2514,6 @@ def transfer_from(
# generators to lists.
refs = list(refs)

if local_refs is None:
local_refs = refs
else:
local_refs = list(local_refs)

# In order to handle disassembled composites the code works
# at the records level since it can assume that internal APIs
# can be used.
Expand Down Expand Up @@ -2612,7 +2606,7 @@ def transfer_from(
source_records[missing].extend(dataset_records)

# See if we already have these records
target_records = self._get_stored_records_associated_with_refs(local_refs)
target_records = self._get_stored_records_associated_with_refs(refs)

# The artifacts to register
artifacts = []
Expand All @@ -2627,21 +2621,21 @@ def transfer_from(
accepted = set()

# Now can transfer the artifacts
for source_ref, target_ref in zip(refs, local_refs):
if not self.constraints.isAcceptable(target_ref):
for ref in refs:
if not self.constraints.isAcceptable(ref):
# This datastore should not be accepting this dataset.
rejected.add(target_ref)
rejected.add(ref)
continue

accepted.add(target_ref)
accepted.add(ref)

if target_ref.id in target_records:
if ref.id in target_records:
# Already have an artifact for this.
already_present.append(target_ref)
already_present.append(ref)
continue

# mypy needs to know these are always resolved refs
for info in source_records[source_ref.getCheckedId()]:
for info in source_records[ref.getCheckedId()]:
source_location = info.file_location(source_datastore.locationFactory)
target_location = info.file_location(self.locationFactory)
if source_location == target_location:
Expand All @@ -2668,7 +2662,7 @@ def transfer_from(
source_location.uri, transfer=transfer, overwrite=True, transaction=self._transaction
)

artifacts.append((target_ref, info))
artifacts.append((ref, info))

self._register_datasets(artifacts)

Expand Down
15 changes: 5 additions & 10 deletions tests/test_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1966,7 +1966,7 @@ def assertButlerTransfers(self, id_gen_map=None, purge=False, storageClassName="
datasetType = DatasetType(datasetTypeName, dimensions, badStorageClass)
self.target_butler.registry.registerDatasetType(datasetType)
with self.assertRaises(ConflictingDefinitionError) as cm:
self.target_butler.transfer_from(self.source_butler, source_refs, id_gen_map=id_gen_map)
self.target_butler.transfer_from(self.source_butler, source_refs)
self.assertIn("dataset type differs", str(cm.exception))

# And remove the bad definitions.
Expand All @@ -1975,13 +1975,11 @@ def assertButlerTransfers(self, id_gen_map=None, purge=False, storageClassName="

# Transfer without creating dataset types should fail.
with self.assertRaises(KeyError):
self.target_butler.transfer_from(self.source_butler, source_refs, id_gen_map=id_gen_map)
self.target_butler.transfer_from(self.source_butler, source_refs)

# Transfer without creating dimensions should fail.
with self.assertRaises(ConflictingDefinitionError) as cm:
self.target_butler.transfer_from(
self.source_butler, source_refs, id_gen_map=id_gen_map, register_dataset_types=True
)
self.target_butler.transfer_from(self.source_butler, source_refs, register_dataset_types=True)
self.assertIn("dimension", str(cm.exception))

# The failed transfer above leaves registry in an inconsistent
Expand All @@ -1995,7 +1993,6 @@ def assertButlerTransfers(self, id_gen_map=None, purge=False, storageClassName="
transferred = self.target_butler.transfer_from(
self.source_butler,
source_refs,
id_gen_map=id_gen_map,
register_dataset_types=True,
transfer_dimensions=True,
)
Expand All @@ -2012,9 +2009,7 @@ def assertButlerTransfers(self, id_gen_map=None, purge=False, storageClassName="
# dataset_id.
if purge:
# This should not need to register dataset types.
transferred = self.target_butler.transfer_from(
self.source_butler, source_refs, id_gen_map=id_gen_map
)
transferred = self.target_butler.transfer_from(self.source_butler, source_refs)
self.assertEqual(len(transferred), n_expected)

# Also do an explicit low-level transfer to trigger some
Expand Down Expand Up @@ -2048,7 +2043,7 @@ def assertButlerTransfers(self, id_gen_map=None, purge=False, storageClassName="
# Re-importing the run1 datasets can be problematic if they
# use integer IDs so filter those out.
to_transfer = [ref for ref in source_refs if ref.run == "run2"]
self.target_butler.transfer_from(self.source_butler, to_transfer, id_gen_map=id_gen_map)
self.target_butler.transfer_from(self.source_butler, to_transfer)


class ChainedDatastoreTransfers(PosixDatastoreTransfers):
Expand Down