Skip to content

Commit

Permalink
Make ingest an optional Datastore API, add transfers to it.
Browse files Browse the repository at this point in the history
  • Loading branch information
TallJimbo committed Jul 24, 2018
1 parent 8fa4c58 commit 1e6c30e
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 2 deletions.
31 changes: 31 additions & 0 deletions python/lsst/daf/butler/core/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,37 @@ def put(self, inMemoryDataset, datasetRef):
"""
raise NotImplementedError("Must be implemented by subclass")

def ingest(self, path, ref, formatter=None, transfer=None):
"""Add an on-disk file with the given `DatasetRef` to the store,
possibly transferring it.
Datastores are not required to implement this method, but must do so
in order to support direct raw data ingest.
Parameters
----------
path : `str`
File path, relative to the repository root.
ref : `DatasetRef`
Reference to the associated Dataset.
formatter : `Formatter` (optional)
Formatter that should be used to retreive the Dataset.
transfer : str (optional)
If not None, must be one of 'move', 'copy', 'hardlink', or
'symlink' indicating how to transfer the file.
Datastores need not support all options, but must raise
NotImplementedError if the passed option is not supported
(including None).
Raises
------
NotImplementedError
Raised if the given transfer mode is not supported.
"""
raise NotImplementedError(
"Datastore does not support direct file-based ingest."
)

@abstractmethod
def getUri(self, datasetRef):
"""URI to the Dataset.
Expand Down
33 changes: 31 additions & 2 deletions python/lsst/daf/butler/datastores/posixDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"""POSIX datastore."""

import os
import shutil
import hashlib
from collections import namedtuple

Expand Down Expand Up @@ -319,8 +320,9 @@ def put(self, inMemoryDataset, ref):
if self._transaction is not None:
self._transaction.registerUndo('put', self.remove, ref)

def ingest(self, path, ref, formatter=None):
"""Record that a Dataset with the given `DatasetRef` exists in the store.
def ingest(self, path, ref, formatter=None, transfer=None):
"""Add an on-disk file with the given `DatasetRef` to the store,
possibly transferring it.
Parameters
----------
Expand All @@ -330,10 +332,37 @@ def ingest(self, path, ref, formatter=None):
Reference to the associated Dataset.
formatter : `Formatter` (optional)
Formatter that should be used to retreive the Dataset.
transfer : str (optional)
If not None, must be one of 'move', 'copy', 'hardlink', or
'symlink' indicating how to transfer the file. The new
filename and location will be determined via template substitution,
as with ``put``.
"""
with self.transaction() as transaction:
fullPath = os.path.join(self.root, path)

if transfer is not None:
template = self.templates.getTemplate(ref.datasetType.name)
location = self.locationFactory.fromPath(template.format(ref))
newPath = formatter.predictPath(location)
newFullPath = os.path.join(self.root, newPath)
if transfer == "move":
with transaction.undoAs("move", shutil.move, newFullPath, fullPath):
shutil.move(fullPath, newFullPath)
elif transfer == "copy":
with transaction.undoAs("copy", os.remove, newFullPath):
shutil.copy(fullPath, newFullPath)
elif transfer == "hardlink":
with transaction.undoAs("hardlink", os.unlink, newFullPath):
shutil.link(fullPath, newFullPath)
elif transfer == "symlink":
with transaction.undoAs("symlink", os.unlink, newFullPath):
shutil.symlink(fullPath, newFullPath)
else:
raise NotImplementedError("Transfer type '{}' not supported.".format(transfer))
path = newPath
fullPath = newFullPath

if formatter is None:
formatter = self.formatterFactory.getFormatter(ref.datasetType.storageClass,
ref.datasetType.name)
Expand Down

0 comments on commit 1e6c30e

Please sign in to comment.