Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-42947: Implement RemoteButler.retrieveArtifacts #964

Merged
merged 1 commit into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
49 changes: 29 additions & 20 deletions python/lsst/daf/butler/datastores/fileDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@
generate_datastore_get_information,
get_dataset_as_python_object_from_get_info,
)
from lsst.daf.butler.datastores.file_datastore.retrieve_artifacts import (
determine_destination_for_retrieved_artifact,
)
from lsst.daf.butler.datastores.fileDatastoreClient import (
FileDatastoreGetPayload,
FileDatastoreGetPayloadFileInfo,
Expand Down Expand Up @@ -1976,16 +1979,9 @@
locations = self._get_dataset_locations_info(ref)
for location, _ in locations:
source_uri = location.uri
target_path: ResourcePathExpression
if preserve_path:
target_path = location.pathInStore
if target_path.isabs():
# This is an absolute path to an external file.
# Use the full path.
target_path = target_path.relativeToPathRoot
else:
target_path = source_uri.basename()
target_uri = destination.join(target_path)
target_uri = determine_destination_for_retrieved_artifact(
destination, location.pathInStore, preserve_path
)
to_transfer[source_uri] = target_uri

# In theory can now parallelize the transfer
Expand Down Expand Up @@ -2055,22 +2051,13 @@
# URLs for people without access rights to download.
url_expiration_time_seconds = 1 * 60 * 60

def to_file_info_payload(info: DatasetLocationInformation) -> FileDatastoreGetPayloadFileInfo:
location, file_info = info
return FileDatastoreGetPayloadFileInfo(
url=location.uri.generate_presigned_get_url(
expiration_time_seconds=url_expiration_time_seconds
),
datastoreRecords=file_info.to_simple(),
)

locations = self._get_dataset_locations_info(ref)
if len(locations) == 0:
return None

return FileDatastoreGetPayload(
datastore_type="file",
file_info=[to_file_info_payload(info) for info in locations],
file_info=[_to_file_info_payload(info, url_expiration_time_seconds) for info in locations],
)

@transactional
Expand Down Expand Up @@ -2849,3 +2836,25 @@
def get_opaque_table_definitions(self) -> Mapping[str, DatastoreOpaqueTable]:
# Docstring inherited from the base class.
return {self._opaque_table_name: DatastoreOpaqueTable(self.makeTableSpec(ddl.GUID), StoredFileInfo)}


def _to_file_info_payload(
info: DatasetLocationInformation, url_expiration_time_seconds: int
) -> FileDatastoreGetPayloadFileInfo:
location, file_info = info

# Make sure that we send only relative paths, to avoid leaking
# details of our configuration to the client.
path = location.pathInStore
if path.isabs():
relative_path = path.relativeToPathRoot

Check warning on line 2850 in python/lsst/daf/butler/datastores/fileDatastore.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/datastores/fileDatastore.py#L2850

Added line #L2850 was not covered by tests
else:
relative_path = str(path)

datastoreRecords = file_info.to_simple()
datastoreRecords.path = relative_path

return FileDatastoreGetPayloadFileInfo(
url=location.uri.generate_presigned_get_url(expiration_time_seconds=url_expiration_time_seconds),
datastoreRecords=datastoreRecords,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# This file is part of daf_butler.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (http://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This software is dual licensed under the GNU General Public License and also
# under a 3-clause BSD license. Recipients may choose which of these licenses
# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
# respectively. If you choose the GPL option then the following text applies
# (but note that there is still no warranty even if you opt for BSD instead):
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from __future__ import annotations

__all__ = ("determine_destination_for_retrieved_artifact",)

from lsst.resources import ResourcePath, ResourcePathExpression


def determine_destination_for_retrieved_artifact(
destination_directory: ResourcePath, source_path: ResourcePath, preserve_path: bool
) -> ResourcePath:
"""Determine destination path for an artifact retrieved from a datastore.

Parameters
----------
destination_directory : `ResourcePath`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
destination_directory : `ResourcePath`
destination_directory : `~lsst.resources.ResourcePath`

I don't think Sphinx resolves them otherwise because ResourcePath is not part of daf_butler. It only really matters if this docstring turns up in the sphinx and in the longer term we keep dreaming that sphinx will pick up the type annotations and we can remove the type from here...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI I asked Jonathan about this at JTM. The latest version of the documentation tooling does do that, just science pipelines has not upgraded to use it yet.

Path to the output directory where file will be stored.
source_path : `ResourcePath`
Path to the source file to be transferred. This may be relative to the
datastore root, or an absolute path.
preserve_path : `bool`, optional
If `True` the full path of the artifact within the datastore
is preserved. If `False` the final file component of the path
is used.

Returns
-------
destination_uri : `~lsst.resources.ResourcePath`
Absolute path to the output destination.
"""
destination_directory = destination_directory.abspath()

target_path: ResourcePathExpression
if preserve_path:
target_path = source_path
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this was copied from above but looking at it again it might be clearer from a typing perspective as:

target_path: str
if preserve_path:
    if source_path.isabs():
        target_path = source_path.relativeToPathRoot
    else:
        target_path = source_path.path
else:
    target_path = source_path.basename()

then target_path is a simple string and doesn't have to be a ResourcePath in one branch. Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems reasonable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually should it be unquoted_path instead? I'm not sure on the vagaries of ResourcePath quoting and unquoting. I might just leave it as it was since otherwise it's going to get immediately converted back into a ResourcePath in join() anyway.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, join uses unquoted_path (so it does convert the resource path to a string inside there).

https://github.com/lsst/resources/blob/main/python/lsst/resources/_resourcePath.py#L742

Leaving it is fine, it is just a bit more convoluted with the typing as it currently stands.

if target_path.isabs():
# This is an absolute path to an external file.
# Use the full path.
target_path = target_path.relativeToPathRoot
else:
target_path = source_path.basename()

target_uri = destination_directory.join(target_path).abspath()
if target_uri.relative_to(destination_directory) is None:
raise ValueError(f"File path attempts to escape destination directory: '{source_path}'")
return target_uri
37 changes: 31 additions & 6 deletions python/lsst/daf/butler/remote_butler/_remote_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@

import httpx
from lsst.daf.butler import __version__
from lsst.daf.butler.datastores.file_datastore.retrieve_artifacts import (
determine_destination_for_retrieved_artifact,
)
from lsst.daf.butler.datastores.fileDatastoreClient import (
FileDatastoreGetPayload,
get_dataset_as_python_object,
Expand Down Expand Up @@ -298,10 +301,7 @@ def _get_file_info(
if isinstance(datasetRefOrType, DatasetRef):
if dataId is not None:
raise ValueError("DatasetRef given, cannot use dataId as well")
dataset_id = datasetRefOrType.id
response = self._get(f"get_file/{dataset_id}", expected_errors=(404,))
if response.status_code == 404:
raise FileNotFoundError(f"Dataset not found: {datasetRefOrType}")
return self._get_file_info_for_ref(datasetRefOrType)
else:
request = GetFileByDataIdRequestModel(
dataset_type_name=self._normalize_dataset_type_name(datasetRefOrType),
Expand All @@ -314,7 +314,12 @@ def _get_file_info(
f"Dataset not found with DataId: {dataId} DatasetType: {datasetRefOrType}"
f" collections: {collections}"
)
return self._parse_model(response, GetFileResponseModel)

def _get_file_info_for_ref(self, ref: DatasetRef) -> GetFileResponseModel:
response = self._get(f"get_file/{ref.id}", expected_errors=(404,))
if response.status_code == 404:
raise FileNotFoundError(f"Dataset not found: {ref.id}")
return self._parse_model(response, GetFileResponseModel)

def getURIs(
Expand Down Expand Up @@ -428,8 +433,28 @@ def retrieveArtifacts(
preserve_path: bool = True,
overwrite: bool = False,
) -> list[ResourcePath]:
# Docstring inherited.
raise NotImplementedError()
destination = ResourcePath(destination).abspath()
if not destination.isdir():
raise ValueError(f"Destination location must refer to a directory. Given {destination}.")

if transfer not in ("auto", "copy"):
raise ValueError("Only 'copy' and 'auto' transfer modes are supported.")

output_uris: list[ResourcePath] = []
for ref in refs:
file_info = _to_file_payload(self._get_file_info_for_ref(ref)).file_info
for file in file_info:
source_uri = ResourcePath(str(file.url))
relative_path = ResourcePath(file.datastoreRecords.path, forceAbsolute=False)
target_uri = determine_destination_for_retrieved_artifact(
destination, relative_path, preserve_path
)
# Because signed URLs expire, we want to do the transfer soon
# after retrieving the URL.
target_uri.transfer_from(source_uri, transfer="copy", overwrite=overwrite)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop is organized slightly different to FileDatastore in that it does the transfer in the main loop and can't report how many files it's about to transfer. The debug message was there before the transfer for cases where people have mistakenly asked for 10,000 datasets and are wondering why it's taking so long. We have had discussions in the past about refusing to do the transfers if there are too many refs and also implementing parallelism in the transfers (which is why it was written initially to do the transfers after all the database queries so it could be modified to use futures.

I realize looking at the original now that it was never upgraded to use the bulk query interface _get_stored_records_associated_with_refs but instead uses the much slower per-ref API that we had originally. This is also another slowdown -- we have had people wondering what is going on even with a few hundred datasets so a reorganization of this implementation to have a bulk endpoint ("give me the payloads of N refs") with futures to download with 10 threads will likely be important at some point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intentional here to some extent... because of URL expiration, we don't want to download a huge number of URLs upfront and then download all of them at the end. If the files are large the URLs will expire before we start downloading the later ones.

I think a bulk/parallel version of this would probably request like 10 URLs at a time from the server, instead of all of them.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment here to say that the reason is because of expiration?

Maybe also add a verbose log message after the loop to report how many files were transferred.

output_uris.append(target_uri)

return output_uris

def exists(
self,
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/tests/hybrid_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def retrieveArtifacts(
preserve_path: bool = True,
overwrite: bool = False,
) -> list[ResourcePath]:
return self._direct_butler.retrieveArtifacts(refs, destination, transfer, preserve_path, overwrite)
return self._remote_butler.retrieveArtifacts(refs, destination, transfer, preserve_path, overwrite)

def exists(
self,
Expand Down
7 changes: 7 additions & 0 deletions tests/test_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,13 @@ def runPutGetTest(self, storageClass: StorageClass, datasetTypeName: str) -> But
with self.assertRaises(ValueError):
butler.retrieveArtifacts([ref], destination, transfer="move")

with self.assertRaisesRegex(
ValueError, "^Destination location must refer to a directory"
):
butler.retrieveArtifacts(
[ref], ResourcePath("/some/file.txt", forceDirectory=False)
)

with self.assertRaises(FileExistsError):
butler.retrieveArtifacts([ref], destination)

Expand Down
30 changes: 30 additions & 0 deletions tests/test_remote_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
from unittest.mock import patch

from lsst.daf.butler import Butler
from lsst.daf.butler.datastores.file_datastore.retrieve_artifacts import (
determine_destination_for_retrieved_artifact,
)
from lsst.resources import ResourcePath
from pydantic import ValidationError

try:
Expand Down Expand Up @@ -63,5 +67,31 @@ def test_internal_server_error(self):
butler.get_dataset_type("int")


class RemoteButlerMiscTests(unittest.TestCase):
"""Test miscellaneous RemoteButler functionality."""

def test_retrieve_artifacts_security(self):
# Make sure that the function used to determine output file paths for
# retrieveArtifacts throws if a malicious server tries to escape its
# destination directory.
with self.assertRaisesRegex(ValueError, "^File path attempts to escape destination directory"):
determine_destination_for_retrieved_artifact(
ResourcePath("output_directory/"),
ResourcePath("../something.txt", forceAbsolute=False),
preserve_path=True,
)

# Make sure all paths are forced to relative paths, even if the server
# sends an absolute path.
self.assertEqual(
determine_destination_for_retrieved_artifact(
ResourcePath("/tmp/output_directory/"),
ResourcePath("file:///not/relative.txt"),
preserve_path=True,
),
ResourcePath("/tmp/output_directory/not/relative.txt"),
)


if __name__ == "__main__":
unittest.main()