Skip to content

Commit

Permalink
Merge pull request #705 from lsst/tickets/DM-35458
Browse files Browse the repository at this point in the history
DM-35458: Always write local datastore files to temporaries
  • Loading branch information
timj committed Jul 7, 2022
2 parents df4b34c + 08a51ac commit a69945d
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 29 deletions.
2 changes: 2 additions & 0 deletions doc/changes/DM-35458.misc.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
File datastore now always writes a temporary file and renames it even for local file system datastores.
This minimizes the risk of a corrupt file being written if the process writing the file is killed at the wrong time.
61 changes: 32 additions & 29 deletions python/lsst/daf/butler/datastores/fileDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -1122,22 +1122,13 @@ def _removeFileExists(uri: ResourcePath) -> None:
# something fails below
self._transaction.registerUndo("artifactWrite", _removeFileExists, uri)

# For a local file, simply use the formatter directly
if uri.isLocal:
try:
formatter.write(inMemoryDataset)
except Exception as e:
raise RuntimeError(
f"Failed to serialize dataset {ref} of type {type(inMemoryDataset)} to location {uri}"
) from e
log.debug("Successfully wrote python object to local file at %s", uri)
else:
data_written = False
if not uri.isLocal:
# This is a remote URI. Some datasets can be serialized directly
# to bytes and sent to the remote datastore without writing a
# file. If the dataset is intended to be saved to the cache
# a file is always written and direct write to the remote
# datastore is bypassed.
data_written = False
if not self.cacheManager.should_be_cached(ref):
try:
serializedDataset = formatter.toBytes(inMemoryDataset)
Expand All @@ -1154,28 +1145,40 @@ def _removeFileExists(uri: ResourcePath) -> None:
log.debug("Successfully wrote bytes directly to %s", uri)
data_written = True

if not data_written:
# Did not write the bytes directly to object store so instead
# write to temporary file.
with ResourcePath.temporary_uri(suffix=uri.getExtension()) as temporary_uri:
# Need to configure the formatter to write to a different
# location and that needs us to overwrite internals
log.debug("Writing dataset to temporary location at %s", temporary_uri)
with formatter._updateLocation(Location(None, temporary_uri)):
try:
formatter.write(inMemoryDataset)
except Exception as e:
raise RuntimeError(
f"Failed to serialize dataset {ref} of type"
f" {type(inMemoryDataset)} to "
f"temporary location {temporary_uri}"
) from e
uri.transfer_from(temporary_uri, transfer="copy", overwrite=True)
if not data_written:
# Did not write the bytes directly to object store so instead
# write to temporary file. Always write to a temporary even if
# using a local file system -- that gives us atomic writes.
# If a process is killed as the file is being written we do not
# want it to remain in the correct place but in corrupt state.
# For local files write to the output directory not temporary dir.
prefix = uri.dirname() if uri.isLocal else None
with ResourcePath.temporary_uri(suffix=uri.getExtension(), prefix=prefix) as temporary_uri:
# Need to configure the formatter to write to a different
# location and that needs us to overwrite internals
log.debug("Writing dataset to temporary location at %s", temporary_uri)
with formatter._updateLocation(Location(None, temporary_uri)):
try:
formatter.write(inMemoryDataset)
except Exception as e:
raise RuntimeError(
f"Failed to serialize dataset {ref} of type"
f" {type(inMemoryDataset)} to "
f"temporary location {temporary_uri}"
) from e

# Use move for a local file since that becomes an efficient
# os.rename. For remote resources we use copy to allow the
# file to be cached afterwards.
transfer = "move" if uri.isLocal else "copy"

uri.transfer_from(temporary_uri, transfer=transfer, overwrite=True)

if transfer == "copy":
# Cache if required
self.cacheManager.move_to_cache(temporary_uri, ref)

log.debug("Successfully wrote dataset to %s via a temporary file.", uri)
log.debug("Successfully wrote dataset to %s via a temporary file.", uri)

# URI is needed to resolve what ingest case are we dealing with
return self._extractIngestInfo(uri, ref, formatter=formatter)
Expand Down
18 changes: 18 additions & 0 deletions tests/test_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,24 @@ def setUp(self):
self.root = tempfile.mkdtemp(dir=TESTDIR)
super().setUp()

def testAtomicWrite(self):
"""Test that we write to a temporary and then rename"""
datastore = self.makeDatastore()
storageClass = self.storageClassFactory.getStorageClass("StructuredData")
dimensions = self.universe.extract(("visit", "physical_filter"))
metrics = makeExampleMetrics()

dataId = {"instrument": "dummy", "visit": 0, "physical_filter": "V"}
ref = self.makeDatasetRef("metric", dimensions, storageClass, dataId, conform=False)

with self.assertLogs("lsst.resources", "DEBUG") as cm:
datastore.put(metrics, ref)
move_logs = [ll for ll in cm.output if "transfer=" in ll]
self.assertIn("transfer=move", move_logs[0])

# And the transfer should be file to file.
self.assertEqual(move_logs[0].count("file://"), 2)

def testCanNotDeterminePutFormatterLocation(self):
"""Verify that the expected exception is raised if the FileDatastore
can not determine the put formatter location."""
Expand Down

0 comments on commit a69945d

Please sign in to comment.