Skip to content

Commit

Permalink
Unify local and remote file writing to always use temporary
Browse files Browse the repository at this point in the history
  • Loading branch information
timj committed Jul 6, 2022
1 parent 6f71188 commit 3c600f1
Showing 1 changed file with 32 additions and 43 deletions.
75 changes: 32 additions & 43 deletions python/lsst/daf/butler/datastores/fileDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import hashlib
import logging
import os
from collections import defaultdict
from dataclasses import dataclass
from typing import (
Expand Down Expand Up @@ -1123,35 +1122,13 @@ def _removeFileExists(uri: ResourcePath) -> None:
# something fails below
self._transaction.registerUndo("artifactWrite", _removeFileExists, uri)

# For a local file, simply use the formatter directly
if uri.isLocal:
# To allow atomic writes, write to a temporary location
# in the same working directory.
with ResourcePath.temporary_uri(suffix=uri.getExtension(), prefix=uri.dirname()) as temporary_uri:
# Need to configure the formatter to write to a different
# location and that needs us to overwrite internals
log.debug("Writing dataset to temporary location at %s", temporary_uri)
with formatter._updateLocation(Location(None, temporary_uri)):
try:
formatter.write(inMemoryDataset)
except Exception as e:
raise RuntimeError(
f"Failed to serialize dataset {ref} of type"
f" {type(inMemoryDataset)} to "
f"to {uri} via temporary location {temporary_uri}."
) from e
# We know we have a local file and we must rename it to the
# final destination. There is no ResourcePath support for this.
os.rename(temporary_uri.ospath, uri.ospath)

log.debug("Successfully wrote python object to local file at %s", uri)
else:
data_written = False
if not uri.isLocal:
# This is a remote URI. Some datasets can be serialized directly
# to bytes and sent to the remote datastore without writing a
# file. If the dataset is intended to be saved to the cache
# a file is always written and direct write to the remote
# datastore is bypassed.
data_written = False
if not self.cacheManager.should_be_cached(ref):
try:
serializedDataset = formatter.toBytes(inMemoryDataset)
Expand All @@ -1168,28 +1145,40 @@ def _removeFileExists(uri: ResourcePath) -> None:
log.debug("Successfully wrote bytes directly to %s", uri)
data_written = True

if not data_written:
# Did not write the bytes directly to object store so instead
# write to temporary file.
with ResourcePath.temporary_uri(suffix=uri.getExtension()) as temporary_uri:
# Need to configure the formatter to write to a different
# location and that needs us to overwrite internals
log.debug("Writing dataset to temporary location at %s", temporary_uri)
with formatter._updateLocation(Location(None, temporary_uri)):
try:
formatter.write(inMemoryDataset)
except Exception as e:
raise RuntimeError(
f"Failed to serialize dataset {ref} of type"
f" {type(inMemoryDataset)} to "
f"temporary location {temporary_uri}"
) from e
uri.transfer_from(temporary_uri, transfer="copy", overwrite=True)
if not data_written:
# Did not write the bytes directly to object store so instead
# write to temporary file. Always write to a temporary even if
# using a local file system -- that gives us atomic writes.
# If a process is killed as the file is being written we do not
# want it to remain in the correct place but in corrupt state.
# For local files write to the output directory not temporary dir.
prefix = uri.dirname() if uri.isLocal else None
with ResourcePath.temporary_uri(suffix=uri.getExtension(), prefix=prefix) as temporary_uri:
# Need to configure the formatter to write to a different
# location and that needs us to overwrite internals
log.debug("Writing dataset to temporary location at %s", temporary_uri)
with formatter._updateLocation(Location(None, temporary_uri)):
try:
formatter.write(inMemoryDataset)
except Exception as e:
raise RuntimeError(
f"Failed to serialize dataset {ref} of type"
f" {type(inMemoryDataset)} to "
f"temporary location {temporary_uri}"
) from e

# Use move for a local file since that becomes an efficient
# os.rename. For remote resources we use copy to allow the
# file to be cached afterwards.
transfer = "move" if uri.isLocal else "copy"

uri.transfer_from(temporary_uri, transfer=transfer, overwrite=True)

if transfer == "copy":
# Cache if required
self.cacheManager.move_to_cache(temporary_uri, ref)

log.debug("Successfully wrote dataset to %s via a temporary file.", uri)
log.debug("Successfully wrote dataset to %s via a temporary file.", uri)

# URI is needed to resolve what ingest case are we dealing with
return self._extractIngestInfo(uri, ref, formatter=formatter)
Expand Down

0 comments on commit 3c600f1

Please sign in to comment.