Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-28650: Add Butler.transfer_from #523

Merged
merged 5 commits into from
May 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions doc/changes/DM-28650.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Add ``butler transfer-datasets`` command-line tool and associated ``Butler.transfer_from()`` API.

This can be used to transfer datasets between different butlers, with the caveat that dimensions and dataset types must be pre-defined in the receiving butler repository.
116 changes: 115 additions & 1 deletion python/lsst/daf/butler/_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"PurgeUnsupportedPruneCollectionsError",
)


import collections.abc
from collections import defaultdict
import contextlib
import logging
Expand Down Expand Up @@ -1673,6 +1673,120 @@ def doImport(importStream: TextIO) -> None:
else:
doImport(filename)

@transactional
def transfer_from(self, source_butler: Butler, source_refs: Iterable[DatasetRef],
transfer: str = "auto",
id_gen_map: Dict[str, DatasetIdGenEnum] = None) -> List[DatasetRef]:
"""Transfer datasets to this Butler from a run in another Butler.

Parameters
----------
source_butler : `Butler`
Butler from which the datasets are to be transferred.
source_refs : iterable of `DatasetRef`
Datasets defined in the source butler that should be transferred to
this butler.
transfer : `str`, optional
Transfer mode passed to `~lsst.daf.butler.Datastore.transfer_from`.
id_gen_map : `dict` of [`str`, `DatasetIdGenEnum`], optional
A mapping of dataset type to ID generation mode. Only used if
the source butler is using integer IDs. Should not be used
if this receiving butler uses integer IDs. Without this dataset
import always uses unique.

Returns
-------
refs : `list` of `DatasetRef`
The refs added to this Butler.

Notes
-----
Requires that any dimension definitions are already present in the
receiving Butler. The datastore artifact has to exist for a transfer
to be made but non-existence is not an error.

Datasets that already exist in this run will be skipped.
"""
if not self.isWriteable():
raise TypeError("Butler is read-only.")
progress = Progress("lsst.daf.butler.Butler.transfer_from", level=VERBOSE)

# Will iterate through the refs multiple times so need to convert
# to a list if this isn't a collection.
if not isinstance(source_refs, collections.abc.Collection):
source_refs = list(source_refs)

log.info("Transferring %d datasets into %s", len(source_refs), str(self))

if id_gen_map is None:
id_gen_map = {}

# Importing requires that we group the refs by dataset type and run
# before doing the import.
grouped_refs = defaultdict(list)
grouped_indices = defaultdict(list)
for i, ref in enumerate(source_refs):
grouped_refs[ref.datasetType, ref.run].append(ref)
grouped_indices[ref.datasetType, ref.run].append(i)

# The returned refs should be identical for UUIDs.
# For now must also support integers and so need to retain the
# newly-created refs from this registry.
# Pre-size it so we can assign refs into the correct slots
transferred_refs_tmp: List[Optional[DatasetRef]] = [None] * len(source_refs)
default_id_gen = DatasetIdGenEnum.UNIQUE

for (datasetType, run), refs_to_import in progress.iter_item_chunks(grouped_refs.items(),
desc="Importing to registry by "
"run and dataset type"):
run_doc = source_butler.registry.getCollectionDocumentation(run)
self.registry.registerCollection(run, CollectionType.RUN, doc=run_doc)

id_generation_mode = default_id_gen
if isinstance(refs_to_import[0].id, int):
# ID generation mode might need to be overridden when
# targetting UUID
id_generation_mode = id_gen_map.get(datasetType.name, default_id_gen)

n_refs = len(refs_to_import)
log.log(VERBOSE, "Importing %d ref%s of dataset type %s into run %s",
n_refs, "" if n_refs == 1 else "s", datasetType.name, run)

# No way to know if this butler's registry uses UUID.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure it's a job for this ticket, but I do think we want a public API for this. It's not just an implementation detail; it's a behavioral change that's important to a lot of higher-level code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a transitory problem to a certain extent. Things break if you try to use UUID in an old registry and if I knew that I could strip the ids before calling the import. The question is whether we actually care about that kind of migration. One question I have is whether you are thinking of this as a long-term API or a quick hack. I could easily see a "private" attribute on the dataset manager that we retire when we retire integer IDs. Do you envisage a more public API on Registry that will return the dataset_id_type even though in the long term that would presumably be the type of DatasetRef.id?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think totally public - things in pipe_base and maybe obs_base do care (thinking especially about Quantum and QuantumGraph because I'm working on those now).
I imagine we would probably deprecate this API when we deprecate int support, but that depends on how much we want to hedge against changing again in the future.

If we wanted to keep it around forever, we might want to think about whether there should be some other flags, in addition to the type; those might last longer:

  • has_globally_unique_ids
  • supports_deterministic_ids

Of course, if we expect both of those to always be true forever after int is gone, then maybe just the int vs uuid type flag is enough.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. I'll do this on another ticket because it will also lead to a cleanup of obs_base ingest-raws.

# We have to trust the caller on this. If it fails they will have
# to change their approach. We can't catch the exception and
# retry with unique because that will mess up the transaction
# handling. We aren't allowed to ask the registry manager what
# type of ID it is using.
imported_refs = self.registry._importDatasets(refs_to_import,
idGenerationMode=id_generation_mode,
expand=False)

# Map them into the correct slots to match the initial order
for i, ref in zip(grouped_indices[datasetType, run], imported_refs):
transferred_refs_tmp[i] = ref

# Mypy insists that we might have None in here so we have to make
# that explicit by assigning to a new variable and filtering out
# something that won't be there.
transferred_refs = [ref for ref in transferred_refs_tmp if ref is not None]

# Check consistency
assert len(source_refs) == len(transferred_refs), "Different number of refs imported than given"

log.log(VERBOSE, "Imported %d datasets into destination butler", len(transferred_refs))

# The transferred refs need to be reordered to match the original
# ordering given by the caller. Without this the datastore transfer
# will be broken.

# Ask the datastore to transfer. The datastore has to check that
# the source datastore is compatible with the target datastore.
self.datastore.transfer_from(source_butler.datastore, source_refs,
local_refs=transferred_refs, transfer=transfer)

return transferred_refs

def validateConfiguration(self, logFailures: bool = False,
datasetTypeNames: Optional[Iterable[str]] = None,
ignore: Iterable[str] = None) -> None:
Expand Down
2 changes: 2 additions & 0 deletions python/lsst/daf/butler/cli/cmd/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
"query_dimension_records",
"retrieve_artifacts",
"remove_dataset_type",
"transfer_datasets",
)


Expand All @@ -52,4 +53,5 @@
query_dimension_records,
retrieve_artifacts,
remove_dataset_type,
transfer_datasets,
)
18 changes: 18 additions & 0 deletions python/lsst/daf/butler/cli/cmd/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,3 +479,21 @@ def retrieve_artifacts(**kwargs):
print(uri)
print()
print(f"Number of artifacts retrieved into destination {kwargs['destination']}: {len(transferred)}")


@click.command(cls=ButlerCommand)
@click.argument("source", required=True)
@click.argument("dest", required=True)
@query_datasets_options(showUri=False, useArguments=False, repo=False)
@transfer_option()
@options_file_option()
def transfer_datasets(**kwargs):
"""Transfer datasets from a source butler to a destination butler.

SOURCE is a URI to the Butler repository containing the RUN dataset.

DEST is a URI to the Butler repository that will receive copies of the
datasets.
"""
number = script.transferDatasets(**kwargs)
print(f"Number of datasets transferred: {number}")
40 changes: 40 additions & 0 deletions python/lsst/daf/butler/core/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,46 @@ def ingest(self, *datasets: FileDataset, transfer: Optional[str] = None) -> None
)
self._finishIngest(prepData, transfer=transfer)

def transfer_from(self, source_datastore: Datastore, refs: Iterable[DatasetRef],
local_refs: Optional[Iterable[DatasetRef]] = None,
transfer: str = "auto") -> None:
"""Transfer dataset artifacts from another datastore to this one.

Parameters
----------
source_datastore : `Datastore`
The datastore from which to transfer artifacts. That datastore
must be compatible with this datastore receiving the artifacts.
refs : iterable of `DatasetRef`
The datasets to transfer from the source datastore.
local_refs : iterable of `DatasetRef`, optional
The dataset refs associated with the registry associated with
this datastore. Can be `None` if the source and target datastore
are using UUIDs.
transfer : `str`, optional
How (and whether) the dataset should be added to the datastore.
Choices include "move", "copy",
"link", "symlink", "relsymlink", and "hardlink". "link" is a
special transfer mode that will first try to make a hardlink and
if that fails a symlink will be used instead. "relsymlink" creates
a relative symlink rather than use an absolute path.
Most datastores do not support all transfer modes.
"auto" (the default) is a special option that will let the
data store choose the most natural option for itself.
If the source location and transfer location are identical the
transfer mode will be ignored.

Raises
------
TypeError
Raised if the two datastores are not compatible.
"""
if type(self) is not type(source_datastore):
raise TypeError(f"Datastore mismatch between this datastore ({type(self)}) and the "
f"source datastore ({type(source_datastore)}).")

raise NotImplementedError(f"Datastore {type(self)} must implement a transfer_from method.")

@abstractmethod
def getURIs(self, datasetRef: DatasetRef,
predict: bool = False) -> Tuple[Optional[ButlerURI], Dict[str, ButlerURI]]:
Expand Down
154 changes: 152 additions & 2 deletions python/lsst/daf/butler/datastores/fileDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,9 +364,34 @@ def getStoredItemsInfo(self, ref: DatasetIdRef) -> List[StoredFileInfo]:

# Look for the dataset_id -- there might be multiple matches
# if we have disassembled the dataset.
records = list(self._table.fetch(dataset_id=ref.id))
records = self._table.fetch(dataset_id=ref.id)
return [StoredFileInfo.from_record(record) for record in records]

def _get_stored_records_associated_with_refs(self,
refs: Iterable[DatasetIdRef]
) -> Dict[DatasetId, List[StoredFileInfo]]:
"""Retrieve all records associated with the provided refs.

Parameters
----------
refs : iterable of `DatasetIdRef`
The refs for which records are to be retrieved.

Returns
-------
records : `dict` of [`DatasetId`, `list` of `StoredFileInfo`]
The matching records indexed by the ref ID. The number of entries
in the dict can be smaller than the number of requested refs.
"""
records = self._table.fetch(dataset_id=[ref.id for ref in refs])

# Uniqueness is dataset_id + component so can have multiple records
# per ref.
records_by_ref = defaultdict(list)
for record in records:
records_by_ref[record["dataset_id"]].append(StoredFileInfo.from_record(record))
return records_by_ref

def _refs_associated_with_artifacts(self, paths: List[Union[str, ButlerURI]]) -> Dict[str,
Set[DatasetId]]:
"""Return paths and associated dataset refs.
Expand All @@ -381,7 +406,7 @@ def _refs_associated_with_artifacts(self, paths: List[Union[str, ButlerURI]]) ->
mapping : `dict` of [`str`, `set` [`DatasetId`]]
Mapping of each path to a set of associated database IDs.
"""
records = list(self._table.fetch(path=[str(path) for path in paths]))
records = self._table.fetch(path=[str(path) for path in paths])
result = defaultdict(set)
for row in records:
result[row["path"]].add(row["dataset_id"])
Expand Down Expand Up @@ -1723,6 +1748,131 @@ def emptyTrash(self, ignore_errors: bool = True) -> None:
else:
raise

@transactional
def transfer_from(self, source_datastore: Datastore, refs: Iterable[DatasetRef],
local_refs: Optional[Iterable[DatasetRef]] = None,
transfer: str = "auto") -> None:
# Docstring inherited
if type(self) is not type(source_datastore):
raise TypeError(f"Datastore mismatch between this datastore ({type(self)}) and the "
f"source datastore ({type(source_datastore)}).")

# Be explicit for mypy
if not isinstance(source_datastore, FileDatastore):
raise TypeError("Can only transfer to a FileDatastore from another FileDatastore, not"
f" {type(source_datastore)}")

# Stop early if "direct" transfer mode is requested. That would
# require that the URI inside the source datastore should be stored
# directly in the target datastore, which seems unlikely to be useful
# since at any moment the source datastore could delete the file.
timj marked this conversation as resolved.
Show resolved Hide resolved
if transfer == "direct":
raise ValueError("Can not transfer from a source datastore using direct mode since"
" those files are controlled by the other datastore.")

# We will go through the list multiple times so must convert
# generators to lists.
refs = list(refs)

if local_refs is None:
local_refs = refs
else:
local_refs = list(local_refs)

# In order to handle disassembled composites the code works
# at the records level since it can assume that internal APIs
# can be used.
# - If the record already exists in the destination this is assumed
# to be okay.
# - If there is no record but the source and destination URIs are
# identical no transfer is done but the record is added.
# - If the source record refers to an absolute URI currently assume
# that that URI should remain absolute and will be visible to the
# destination butler. May need to have a flag to indicate whether
# the dataset should be transferred. This will only happen if
# the detached Butler has had a local ingest.

# What we really want is all the records in the source datastore
# associated with these refs. Or derived ones if they don't exist
# in the source.
source_records = source_datastore._get_stored_records_associated_with_refs(refs)

# The source dataset_ids are the keys in these records
source_ids = set(source_records)
timj marked this conversation as resolved.
Show resolved Hide resolved
log.debug("Number of datastore records found in source: %d", len(source_ids))

# The not None check is to appease mypy
requested_ids = set(ref.id for ref in refs if ref.id is not None)
missing_ids = requested_ids - source_ids

# Missing IDs can be okay if that datastore has allowed
# gets based on file existence. Should we transfer what we can
# or complain about it and warn?
if missing_ids and not source_datastore.trustGetRequest:
raise ValueError(f"Some datasets are missing from source datastore {source_datastore}:"
f" {missing_ids}")

# Need to map these missing IDs to a DatasetRef so we can guess
# the details.
if missing_ids:
log.info("Number of expected datasets missing from source datastore records: %d",
len(missing_ids))
id_to_ref = {ref.id: ref for ref in refs if ref.id in missing_ids}

for missing in missing_ids:
expected = self._get_expected_dataset_locations_info(id_to_ref[missing])
source_records[missing].extend(info for _, info in expected)

# See if we already have these records
target_records = self._get_stored_records_associated_with_refs(local_refs)

# The artifacts to register
artifacts = []

# Refs that already exist
already_present = []

# Now can transfer the artifacts
for source_ref, target_ref in zip(refs, local_refs):
if target_ref.id in target_records:
# Already have an artifact for this.
already_present.append(target_ref)
continue

# mypy needs to know these are always resolved refs
for info in source_records[source_ref.getCheckedId()]:
source_location = info.file_location(source_datastore.locationFactory)
target_location = info.file_location(self.locationFactory)
if source_location == target_location:
# Either the dataset is already in the target datastore
# (which is how execution butler currently runs) or
# it is an absolute URI.
if source_location.pathInStore.isabs():
# Just because we can see the artifact when running
# the transfer doesn't mean it will be generally
# accessible to a user of this butler. For now warn
# but assume it will be accessible.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure whether this should be an exception or silent, but warning seems like it's still going to be a problem if it's user error, while being annoying if the user knows what they are doing. Do we want a flag so the user can say, "trust me on this"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my head this scenario was extremely unlikely because it only happens with ingested datasets and not those created by processing. Since the transfer does not (yet?) copy dimension records over and the receiving registry won't ever have seen these raw files it seems like we aren't really going to get this far in the code. If we turn this into the default mode for transferring content from one butler to another (and only use export/import for cases where you can't see both butlers from your one python process) then yes, we need the code higher up to insert missing dimension records and then ask the user whether absolute URIs should be transferred or left as-is. I was putting in a warning there as a stop-gap scenario where I'm not expecting it to happen but I don't want to raise an exception immediately -- but warning will generally prompt people to ask us what the morning means.

log.warning("Transfer request for an outside-datastore artifact has been found at %s",
source_location)
else:
# Need to transfer it to the new location.
# Assume we should always overwrite. If the artifact
# is there this might indicate that a previous transfer
# was interrupted but was not able to be rolled back
# completely (eg pre-emption) so follow Datastore default
# and overwrite.
target_location.uri.transfer_from(source_location.uri, transfer=transfer,
overwrite=True, transaction=self._transaction)

artifacts.append((target_ref, info))

self._register_datasets(artifacts)

if already_present:
n_skipped = len(already_present)
log.info("Skipped transfer of %d dataset%s already present in datastore", n_skipped,
"" if n_skipped == 1 else "s")

@transactional
def forget(self, refs: Iterable[DatasetRef]) -> None:
# Docstring inherited.
Expand Down