Skip to content

Commit

Permalink
Merge pull request #342 from lsst/tickets/DM-25818
Browse files Browse the repository at this point in the history
DM-25818: Allow datastore.put to overwrite the stored file
  • Loading branch information
timj committed Aug 11, 2020
2 parents a3e9947 + 581e80b commit ed85d3c
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 15 deletions.
25 changes: 23 additions & 2 deletions python/lsst/daf/butler/core/_butlerUri.py
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ def _fixupPathUri(parsed: urllib.parse.ParseResult, root: Optional[str] = None,
return parsed, dirLike

def transfer_from(self, src: ButlerURI, transfer: str,
overwrite: bool = False,
transaction: Optional[Union[DatastoreTransaction, NoTransaction]] = None) -> None:
"""Transfer the current resource to a new location.
Expand All @@ -671,6 +672,8 @@ def transfer_from(self, src: ButlerURI, transfer: str,
Mode to use for transferring the resource. Generically there are
many standard options: copy, link, symlink, hardlink, relsymlink.
Not all URIs support all modes.
overwrite : `bool`, optional
Allow an existing file to be overwritten. Defaults to `False`.
transaction : `DatastoreTransaction`, optional
A transaction object that can (depending on implementation)
rollback transfers on error. Not guaranteed to be implemented.
Expand Down Expand Up @@ -828,6 +831,7 @@ def mkdir(self) -> None:
raise FileExistsError(f"URI {self} exists but is not a directory!")

def transfer_from(self, src: ButlerURI, transfer: str,
overwrite: bool = False,
transaction: Optional[Union[DatastoreTransaction, NoTransaction]] = None) -> None:
"""Transfer the current resource to a local file.
Expand All @@ -838,6 +842,8 @@ def transfer_from(self, src: ButlerURI, transfer: str,
transfer : `str`
Mode to use for transferring the resource. Supports the following
options: copy, link, symlink, hardlink, relsymlink.
overwrite : `bool`, optional
Allow an existing file to be overwritten. Defaults to `False`.
transaction : `DatastoreTransaction`, optional
If a transaction is provided, undo actions will be registered.
"""
Expand Down Expand Up @@ -874,7 +880,8 @@ def transfer_from(self, src: ButlerURI, transfer: str,
transfer = "move"

# The output location should not exist
if self.exists():
dest_exists = self.exists()
if not overwrite and dest_exists:
raise FileExistsError(f"Destination path '{self}' already exists. Transfer "
f"from {src} cannot be completed.")

Expand All @@ -893,6 +900,17 @@ def transfer_from(self, src: ButlerURI, transfer: str,
# Use a no-op transaction to reduce code duplication
transaction = NoTransaction()

# For links the OS doesn't let us overwrite so if something does
# exist we have to remove it before we do the actual "transfer" below
if "link" in transfer and overwrite and dest_exists:
try:
self.remove()
except Exception:
# If this fails we ignore it since it's a problem
# that will manifest immediately below with a more relevant
# error message
pass

if transfer == "move":
with transaction.undoWith(f"move from {local_src}", shutil.move, newFullPath, local_src):
shutil.move(local_src, newFullPath)
Expand Down Expand Up @@ -1098,6 +1116,7 @@ def as_local(self) -> Tuple[str, bool]:
return tmpFile.name, True

def transfer_from(self, src: ButlerURI, transfer: str = "copy",
overwrite: bool = False,
transaction: Optional[Union[DatastoreTransaction, NoTransaction]] = None) -> None:
"""Transfer the current resource to an S3 bucket.
Expand All @@ -1108,6 +1127,8 @@ def transfer_from(self, src: ButlerURI, transfer: str = "copy",
transfer : `str`
Mode to use for transferring the resource. Supports the following
options: copy.
overwrite : `bool`, optional
Allow an existing file to be overwritten. Defaults to `False`.
transaction : `DatastoreTransaction`, optional
Currently unused.
"""
Expand All @@ -1118,7 +1139,7 @@ def transfer_from(self, src: ButlerURI, transfer: str = "copy",
log.debug(f"Transferring {src} [exists: {src.exists()}] -> "
f"{self} [exists: {self.exists()}] (transfer={transfer})")

if self.exists():
if not overwrite and self.exists():
raise FileExistsError(f"Destination path '{self}' already exists.")

if transfer == "auto":
Expand Down
6 changes: 5 additions & 1 deletion python/lsst/daf/butler/datastores/fileLikeDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,10 @@ def _prepare_for_put(self, inMemoryDataset: Any, ref: DatasetRef) -> Tuple[Locat
except KeyError as e:
raise DatasetTypeNotSupportedError(f"Unable to find template for {ref}") from e

# Validate the template to protect against filenames from different
# dataIds returning the same and causing overwrite confusion.
template.validateTemplate(ref)

location = self.locationFactory.fromPath(template.format(ref))

# Get the formatter based on the storage class
Expand Down Expand Up @@ -1172,7 +1176,7 @@ def emptyTrash(self, ignore_errors: bool = True) -> None:
"""
log.debug("Emptying trash in datastore %s", self.name)
# Context manager will empty trash iff we finish it without raising.
with self._bridge.emptyTrash() as trashed:
with self.bridge.emptyTrash() as trashed:
for ref in trashed:
fileLocations = self._get_dataset_locations_info(ref)

Expand Down
9 changes: 7 additions & 2 deletions python/lsst/daf/butler/datastores/posixDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,13 @@ def _write_in_memory_to_artifact(self, inMemoryDataset: Any, ref: DatasetRef) ->
predictedFullPath = os.path.join(self.root, formatter.predictPath())

if os.path.exists(predictedFullPath):
raise FileExistsError(f"Cannot write file for ref {ref} as "
f"output file {predictedFullPath} already exists")
# Assume that by this point if registry thinks the file should
# not exist then the file should not exist and therefore we can
# overwrite it. This can happen if a put was interrupted by
# an external interrupt. The only time this could be problematic is
# if the file template is incomplete and multiple dataset refs
# result in identical filenames.
log.warning("Object %s exists in datastore for ref %s", location.uri, ref)

def _removeFileExists(path: str) -> None:
"""Remove a file and do not complain if it is not there.
Expand Down
11 changes: 9 additions & 2 deletions python/lsst/daf/butler/datastores/s3Datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,15 @@ def _write_in_memory_to_artifact(self, inMemoryDataset: Any, ref: DatasetRef) ->
# an *exact* full key already exists before writing instead. The insert
# key operation is equivalent to creating the dir and the file.
if s3CheckFileExists(location, client=self.client,)[0]:
raise FileExistsError(f"Cannot write file for ref {ref} as "
f"output file {location.uri} exists.")
# Assume that by this point if registry thinks the file should
# not exist then the file should not exist and therefore we can
# overwrite it. This can happen if a put was interrupted by
# an external interrupt. The only time this could be problematic is
# if the file template is incomplete and multiple dataset refs
# result in identical filenames.
# Eventually we should remove the check completely (it takes
# non-zero time for network).
log.warning("Object %s exists in datastore for ref %s", location.uri, ref)

# upload the file directly from bytes or by using a temporary file if
# _toBytes is not implemented
Expand Down
9 changes: 1 addition & 8 deletions tests/test_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,6 @@ def testPutTemplates(self):

dataId1 = {"instrument": "DummyCamComp", "visit": 423}
dataId2 = {"instrument": "DummyCamComp", "visit": 423, "physical_filter": "d-r"}
dataId3 = {"instrument": "DummyCamComp", "visit": 425}

# Put with exactly the data ID keys needed
ref = butler.put(metric, "metric1", dataId1)
Expand All @@ -899,14 +898,8 @@ def testPutTemplates(self):
butler.datastore.templates.validateTemplates([ref])

# Now use a file template that will not result in unique filenames
ref = butler.put(metric, "metric3", dataId1)

# Check the template based on dimensions. This one is a bad template
with self.assertRaises(FileTemplateValidationError):
butler.datastore.templates.validateTemplates([ref])

with self.assertRaises(FileExistsError):
butler.put(metric, "metric3", dataId3)
butler.put(metric, "metric3", dataId1)

def testImportExport(self):
# Run put/get tests just to create and populate a repo.
Expand Down
10 changes: 10 additions & 0 deletions tests/test_uri.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ def testTransfer(self):
if mode in ("symlink", "relsymlink"):
self.assertTrue(os.path.islink(dest.ospath), f"Check that {dest} is symlink")

with self.assertRaises(FileExistsError):
dest.transfer_from(src, transfer=mode)

dest.transfer_from(src, transfer=mode, overwrite=True)

os.remove(dest.ospath)

b = src.read()
Expand Down Expand Up @@ -312,6 +317,11 @@ def testTransfer(self):
self.assertEqual(len(subset), nbytes) # Extra byte comes back
self.assertEqual(subset.decode(), content[:nbytes])

with self.assertRaises(FileExistsError):
dest.transfer_from(src, transfer="copy")

dest.transfer_from(src, transfer="copy", overwrite=True)

def testWrite(self):
s3write = ButlerURI(self.makeS3Uri("created.txt"))
content = "abcdefghijklmnopqrstuv\n"
Expand Down

0 comments on commit ed85d3c

Please sign in to comment.