Skip to content

Commit

Permalink
Remove support for int dataset IDs in Butler.transfer_from
Browse files Browse the repository at this point in the history
This significantly cleans up the code since there is no longer
a need to collect the source and target refs separately and
make Datastore.transfer_from have to keep track of them.
With UUIDs the source ref and the target ref are the same.
  • Loading branch information
timj committed Mar 22, 2023
1 parent 734c93b commit b2177ae
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 92 deletions.
66 changes: 11 additions & 55 deletions python/lsst/daf/butler/_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2179,7 +2179,6 @@ def transfer_from(
source_butler: LimitedButler,
source_refs: Iterable[DatasetRef],
transfer: str = "auto",
id_gen_map: Dict[str, DatasetIdGenEnum] | None = None,
skip_missing: bool = True,
register_dataset_types: bool = False,
transfer_dimensions: bool = False,
Expand All @@ -2197,11 +2196,6 @@ def transfer_from(
this butler.
transfer : `str`, optional
Transfer mode passed to `~lsst.daf.butler.Datastore.transfer_from`.
id_gen_map : `dict` of [`str`, `DatasetIdGenEnum`], optional
A mapping of dataset type to ID generation mode. Only used if
the source butler is using integer IDs. Should not be used
if this receiving butler uses integer IDs. Without this dataset
import always uses unique.
skip_missing : `bool`
If `True`, datasets with no datastore artifact associated with
them are not transferred. If `False` a registry entry will be
Expand All @@ -2221,8 +2215,7 @@ def transfer_from(
Notes
-----
Requires that any dimension definitions are already present in the
receiving Butler. The datastore artifact has to exist for a transfer
The datastore artifact has to exist for a transfer
to be made but non-existence is not an error.
Datasets that already exist in this run will be skipped.
Expand All @@ -2244,9 +2237,6 @@ def transfer_from(
original_count = len(source_refs)
log.info("Transferring %d datasets into %s", original_count, str(self))

if id_gen_map is None:
id_gen_map = {}

# In some situations the datastore artifact may be missing
# and we do not want that registry entry to be imported.
# Asking datastore is not sufficient, the records may have been
Expand All @@ -2272,10 +2262,8 @@ def transfer_from(
# before doing the import.
source_dataset_types = set()
grouped_refs = defaultdict(list)
grouped_indices = defaultdict(list)
for i, ref in enumerate(source_refs):
for ref in source_refs:
grouped_refs[ref.datasetType, ref.run].append(ref)
grouped_indices[ref.datasetType, ref.run].append(i)
source_dataset_types.add(ref.datasetType)

# Check to see if the dataset type in the source butler has
Expand Down Expand Up @@ -2336,13 +2324,6 @@ def transfer_from(
if record is not None and record.definition in elements:
dimension_records[record.definition].setdefault(record.dataId, record)

# The returned refs should be identical for UUIDs.
# For now must also support integers and so need to retain the
# newly-created refs from this registry.
# Pre-size it so we can assign refs into the correct slots
transferred_refs_tmp: List[Optional[DatasetRef]] = [None] * len(source_refs)
default_id_gen = DatasetIdGenEnum.UNIQUE

handled_collections: Set[str] = set()

# Do all the importing in a single transaction.
Expand All @@ -2356,6 +2337,7 @@ def transfer_from(
# is consistent.
self.registry.insertDimensionData(element, *records, skip_existing=True)

n_imported = 0
for (datasetType, run), refs_to_import in progress.iter_item_chunks(
grouped_refs.items(), desc="Importing to registry by run and dataset type"
):
Expand All @@ -2370,12 +2352,6 @@ def transfer_from(
if registered:
log.log(VERBOSE, "Creating output run %s", run)

id_generation_mode = default_id_gen
if isinstance(refs_to_import[0].id, int):
# ID generation mode might need to be overridden when
# targetting UUID
id_generation_mode = id_gen_map.get(datasetType.name, default_id_gen)

n_refs = len(refs_to_import)
log.verbose(
"Importing %d ref%s of dataset type %s into run %s",
Expand All @@ -2385,40 +2361,20 @@ def transfer_from(
run,
)

# No way to know if this butler's registry uses UUID.
# We have to trust the caller on this. If it fails they will
# have to change their approach. We can't catch the exception
# and retry with unique because that will mess up the
# transaction handling. We aren't allowed to ask the registry
# manager what type of ID it is using.
imported_refs = self.registry._importDatasets(
refs_to_import, idGenerationMode=id_generation_mode, expand=False
)

# Map them into the correct slots to match the initial order
for i, ref in zip(grouped_indices[datasetType, run], imported_refs):
transferred_refs_tmp[i] = ref

# Mypy insists that we might have None in here so we have to make
# that explicit by assigning to a new variable and filtering out
# something that won't be there.
transferred_refs = [ref for ref in transferred_refs_tmp if ref is not None]

# Check consistency
assert len(source_refs) == len(transferred_refs), "Different number of refs imported than given"

log.verbose("Imported %d datasets into destination butler", len(transferred_refs))
# Assume we are using UUIDs and the source refs will match
# those imported.
imported_refs = self.registry._importDatasets(refs_to_import, expand=False)
assert set(imported_refs) == set(refs_to_import)
n_imported += len(imported_refs)

# The transferred refs need to be reordered to match the original
# ordering given by the caller. Without this the datastore transfer
# will be broken.
assert len(source_refs) == n_imported
log.verbose("Imported %d datasets into destination butler", n_imported)

# Ask the datastore to transfer. The datastore has to check that
# the source datastore is compatible with the target datastore.
accepted, rejected = self.datastore.transfer_from(
source_butler.datastore,
source_refs,
local_refs=transferred_refs,
transfer=transfer,
artifact_existence=artifact_existence,
)
Expand All @@ -2432,7 +2388,7 @@ def transfer_from(
run,
)

return transferred_refs
return source_refs

def validateConfiguration(
self,
Expand Down
5 changes: 0 additions & 5 deletions python/lsst/daf/butler/core/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,6 @@ def transfer_from(
self,
source_datastore: Datastore,
refs: Iterable[DatasetRef],
local_refs: Optional[Iterable[DatasetRef]] = None,
transfer: str = "auto",
artifact_existence: Optional[Dict[ResourcePath, bool]] = None,
) -> tuple[set[DatasetRef], set[DatasetRef]]:
Expand All @@ -752,10 +751,6 @@ def transfer_from(
must be compatible with this datastore receiving the artifacts.
refs : iterable of `DatasetRef`
The datasets to transfer from the source datastore.
local_refs : iterable of `DatasetRef`, optional
The dataset refs associated with the registry associated with
this datastore. Can be `None` if the source and target datastore
are using UUIDs.
transfer : `str`, optional
How (and whether) the dataset should be added to the datastore.
Choices include "move", "copy",
Expand Down
24 changes: 7 additions & 17 deletions python/lsst/daf/butler/datastores/chainedDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,6 @@ def transfer_from(
self,
source_datastore: Datastore,
refs: Iterable[DatasetRef],
local_refs: Optional[Iterable[DatasetRef]] = None,
transfer: str = "auto",
artifact_existence: Optional[Dict[ResourcePath, bool]] = None,
) -> tuple[set[DatasetRef], set[DatasetRef]]:
Expand All @@ -1038,11 +1037,7 @@ def transfer_from(
source_datastores = tuple([source_datastore])

# Need to know the set of all possible refs that could be transferred.
if local_refs is None:
remaining_refs = set(refs)
local_refs = refs
else:
remaining_refs = set(local_refs)
remaining_refs = set(refs)

missing_from_source: set[DatasetRef] | None = None
all_accepted = set()
Expand Down Expand Up @@ -1078,11 +1073,9 @@ def transfer_from(
# Filter the initial list based on the datasets we have
# not yet transferred.
these_refs = []
these_local = []
for ref, local_ref in zip(refs, local_refs):
if local_ref in remaining_refs and known_to_source[ref]:
for ref in refs:
if ref in remaining_refs and known_to_source[ref]:
these_refs.append(ref)
these_local.append(local_ref)

if not these_refs:
# Already transferred all datasets known to this datastore.
Expand All @@ -1091,19 +1084,16 @@ def transfer_from(
for datastore, constraints in zip(self.datastores, self.datastoreConstraints):
if constraints is not None:
filtered_refs = []
filtered_local = []
for ref, local_ref in zip(these_refs, these_local):
if constraints.isAcceptable(local_ref):
for ref in these_refs:
if constraints.isAcceptable(ref):
filtered_refs.append(ref)
filtered_local.append(local_ref)
else:
log.debug("Rejecting ref by constraints: %s", local_ref)
log.debug("Rejecting ref by constraints: %s", ref)
else:
filtered_refs = [ref for ref in these_refs]
filtered_local = [ref for ref in these_local]
try:
accepted, _ = datastore.transfer_from(
source_child, filtered_refs, filtered_local, transfer, artifact_existence
source_child, filtered_refs, transfer, artifact_existence
)
except (TypeError, NotImplementedError):
# The datastores were incompatible.
Expand Down
24 changes: 9 additions & 15 deletions python/lsst/daf/butler/datastores/fileDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -2479,7 +2479,6 @@ def transfer_from(
self,
source_datastore: Datastore,
refs: Iterable[DatasetRef],
local_refs: Optional[Iterable[DatasetRef]] = None,
transfer: str = "auto",
artifact_existence: Optional[Dict[ResourcePath, bool]] = None,
) -> tuple[set[DatasetRef], set[DatasetRef]]:
Expand Down Expand Up @@ -2515,11 +2514,6 @@ def transfer_from(
# generators to lists.
refs = list(refs)

if local_refs is None:
local_refs = refs
else:
local_refs = list(local_refs)

# In order to handle disassembled composites the code works
# at the records level since it can assume that internal APIs
# can be used.
Expand Down Expand Up @@ -2612,7 +2606,7 @@ def transfer_from(
source_records[missing].extend(dataset_records)

# See if we already have these records
target_records = self._get_stored_records_associated_with_refs(local_refs)
target_records = self._get_stored_records_associated_with_refs(refs)

# The artifacts to register
artifacts = []
Expand All @@ -2627,21 +2621,21 @@ def transfer_from(
accepted = set()

# Now can transfer the artifacts
for source_ref, target_ref in zip(refs, local_refs):
if not self.constraints.isAcceptable(target_ref):
for ref in refs:
if not self.constraints.isAcceptable(ref):
# This datastore should not be accepting this dataset.
rejected.add(target_ref)
rejected.add(ref)
continue

accepted.add(target_ref)
accepted.add(ref)

if target_ref.id in target_records:
if ref.id in target_records:
# Already have an artifact for this.
already_present.append(target_ref)
already_present.append(ref)
continue

# mypy needs to know these are always resolved refs
for info in source_records[source_ref.getCheckedId()]:
for info in source_records[ref.getCheckedId()]:
source_location = info.file_location(source_datastore.locationFactory)
target_location = info.file_location(self.locationFactory)
if source_location == target_location:
Expand All @@ -2668,7 +2662,7 @@ def transfer_from(
source_location.uri, transfer=transfer, overwrite=True, transaction=self._transaction
)

artifacts.append((target_ref, info))
artifacts.append((ref, info))

self._register_datasets(artifacts)

Expand Down

0 comments on commit b2177ae

Please sign in to comment.