Skip to content

Commit

Permalink
Rewrite S3 ingest to use ButlerURI
Browse files Browse the repository at this point in the history
Removes a lot of code.
  • Loading branch information
timj committed Jul 24, 2020
1 parent 8894565 commit 78eb395
Showing 1 changed file with 14 additions and 39 deletions.
53 changes: 14 additions & 39 deletions python/lsst/daf/butler/datastores/s3Datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,24 +295,14 @@ def _standardizeIngestPath(self, path: str, *, transfer: Optional[str] = None) -
# Schemeless URIs are assumed to obey os.path rules. Equivalent to
# os.path.exists(fullPath) check in PosixDatastore.
srcUri = ButlerURI(path)
if srcUri.scheme == 'file' or not srcUri.scheme:
if not os.path.exists(srcUri.ospath):
raise FileNotFoundError(f"File at '{srcUri}' does not exist.")
elif srcUri.scheme == 's3':
if not s3CheckFileExists(srcUri, client=self.client)[0]:
raise FileNotFoundError(f"File at '{srcUri}' does not exist.")
else:
raise NotImplementedError(f"Scheme type {srcUri.scheme} not supported.")
if not srcUri.exists():
raise FileNotFoundError(f"Resource at {srcUri} does not exist")

if transfer is None:
rootUri = ButlerURI(self.root)
if srcUri.scheme == "file":
raise RuntimeError(f"'{srcUri}' is not inside repository root '{rootUri}'. "
"Ingesting local data to S3Datastore without upload "
"to S3 is not allowed.")
elif srcUri.scheme == "s3":
if not srcUri.path.startswith(rootUri.path):
raise RuntimeError(f"'{srcUri}' is not inside repository root '{rootUri}'.")
if not srcUri.relative_to(rootUri):
raise RuntimeError(f"Transfer is none but source file ({srcUri}) is not "
f"within datastore ({rootUri})")
return path

def _extractIngestInfo(self, path: str, ref: DatasetRef, *,
Expand All @@ -321,36 +311,21 @@ def _extractIngestInfo(self, path: str, ref: DatasetRef, *,
# Docstring inherited from FileLikeDatastore._extractIngestInfo.
srcUri = ButlerURI(path)
if transfer is None:
rootUri = ButlerURI(self.root)
p = pathlib.PurePosixPath(srcUri.relativeToPathRoot)
pathInStore = str(p.relative_to(rootUri.relativeToPathRoot))
# The source file is already in the datastore but we have
# to work out the path relative to the root of the datastore
rootUri = ButlerURI(self.root, forceDirectory=True)
pathInStore = srcUri.relative_to(rootUri)
if pathInStore is None:
raise RuntimeError(f"Unexpectedly learned that {srcUri} is not within datastore {rootUri}")
tgtLocation = self.locationFactory.fromPath(pathInStore)
else:
assert transfer == "move" or transfer == "copy", "Should be guaranteed by _standardizeIngestPath"

# Work out the name we want this ingested file to have
# inside the datastore
tgtLocation = self._calculate_ingested_datastore_name(srcUri, ref, formatter)

if srcUri.scheme == "file":
# source is on local disk.
with open(srcUri.ospath, 'rb') as f:
self.client.put_object(Bucket=tgtLocation.netloc,
Key=tgtLocation.relativeToPathRoot, Body=f)
if transfer == "move":
os.remove(srcUri.ospath)
elif srcUri.scheme == "s3":
# source is another S3 Bucket
relpath = srcUri.relativeToPathRoot
copySrc = {"Bucket": srcUri.netloc, "Key": relpath}
self.client.copy(copySrc, self.locationFactory.netloc,
tgtLocation.relativeToPathRoot)
if transfer == "move":
# https://github.com/boto/boto3/issues/507 - there is no
# way of knowing if the file was actually deleted except
# for checking all the keys again, reponse is HTTP 204 OK
# response all the time
self.client.delete(Bucket=srcUri.netloc, Key=relpath)
# Convert that to a ButlerURI and transfer the resource to S3
targetUri = ButlerURI(tgtLocation.uri)
targetUri.transfer_from(srcUri, transfer=transfer)

# the file should exist on the bucket by now
_, size = s3CheckFileExists(path=tgtLocation.relativeToPathRoot,
Expand Down

0 comments on commit 78eb395

Please sign in to comment.