Skip to content

Commit

Permalink
Move checksum and size from StorageInfo to StoredFileInfo
Browse files Browse the repository at this point in the history
Since StorageInfo is now solely the datastore name, there is no
reason for registry.updateStorageInfo to exist so it has
been removed.
  • Loading branch information
timj committed Aug 22, 2018
1 parent 52e66b5 commit 121159f
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 120 deletions.
10 changes: 1 addition & 9 deletions config/schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1554,7 +1554,7 @@ schema:

DatasetStorage:
doc: >
A table that provides information on how a Dataset is stored in one
A table that provides information on whether a Dataset is stored in one
or more Datastores. The presence or absence of a record in this
table itself indicates whether the Dataset is present in that
Datastore.
Expand All @@ -1572,14 +1572,6 @@ schema:
nullable: false
doc: >
Name of the Datastore this entry corresponds to.
- name: checksum
type: string
doc: >
Checksum (e.g. md5) of the stored dataset, if applicable.
- name: size
type: int
doc: >
Total size of the stored dataset in bytes, if applicable.
foreignKeys:
-
src: dataset_id
Expand Down
18 changes: 0 additions & 18 deletions python/lsst/daf/butler/core/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,24 +397,6 @@ def addStorageInfo(self, ref, storageInfo):
"""
raise NotImplementedError("Must be implemented by subclass")

@abstractmethod
@transactional
def updateStorageInfo(self, ref, datastoreName, storageInfo):
"""Update storage information for a given dataset.
Typically used by `Datastore`.
Parameters
----------
ref : `DatasetRef`
A reference to the dataset for which to add storage information.
datastoreName : `str`
What datastore association to update.
storageInfo : `StorageInfo`
Storage information about the dataset.
"""
raise NotImplementedError("Must be implemented by subclass")

@abstractmethod
def getStorageInfo(self, ref, datastoreName):
"""Retrieve storage information for a given dataset.
Expand Down
23 changes: 3 additions & 20 deletions python/lsst/daf/butler/core/storageInfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,34 +37,17 @@ class StorageInfo:
Size of stored object in bytes.
"""
__eq__ = slotValuesAreEqual
__slots__ = ("_datastoreName", "_checksum", "_size")
__slots__ = ("_datastoreName",)

def __str__(self):
return "StorageInfo(checksum={}, size={}, datastoreName={})".format(self.checksum, self.size,
self.datastoreName)
return "StorageInfo(datastoreName={})".format(self.datastoreName)

def __init__(self, datastoreName, checksum=None, size=None):
def __init__(self, datastoreName):
assert isinstance(datastoreName, str)
self._datastoreName = datastoreName
assert checksum is None or isinstance(checksum, str)
self._checksum = checksum
assert size is None or isinstance(size, int)
self._size = size

@property
def datastoreName(self):
"""Name of datastore (`str`).
"""
return self._datastoreName

@property
def checksum(self):
"""Checksum (`str`).
"""
return self._checksum

@property
def size(self):
"""Size of stored object in bytes (`int`).
"""
return self._size
24 changes: 22 additions & 2 deletions python/lsst/daf/butler/core/storedFileInfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ class StoredFileInfo:
"""

__eq__ = slotValuesAreEqual
__slots__ = ("_formatter", "_path", "_storageClass")
__slots__ = ("_formatter", "_path", "_storageClass", "_checksum", "_size")

def __init__(self, formatter, path, storageClass):
def __init__(self, formatter, path, storageClass, checksum=None, size=None):
assert isinstance(formatter, str) or isinstance(formatter, Formatter)
if isinstance(formatter, Formatter):
formatter = formatter.name()
Expand All @@ -58,6 +58,10 @@ def __init__(self, formatter, path, storageClass):
self._path = path
assert isinstance(storageClass, StorageClass)
self._storageClass = storageClass
assert checksum is None or isinstance(checksum, str)
self._checksum = checksum
assert size is None or isinstance(size, int)
self._size = size

@property
def formatter(self):
Expand All @@ -76,3 +80,19 @@ def storageClass(self):
"""StorageClass used (`StorageClass`).
"""
return self._storageClass

@property
def checksum(self):
"""Checksum (`str`).
"""
return self._checksum

@property
def size(self):
"""Size of stored object in bytes (`int`).
"""
return self._size

def __repr__(self):
return f"{type(self).__qualname__}(path=\"{self.path}\", formatter=\"{self.formatter}\"" \
f" size={self.size}, checksum=\"{self.checksum}\", storageClass=\"{self.storageClass.name}\")"
4 changes: 1 addition & 3 deletions python/lsst/daf/butler/datastores/inMemoryDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,7 @@ def put(self, inMemoryDataset, ref):
# We have to register this content with registry.
# Currently this assumes we have a file so we need to use stub entries
# TODO: Add to ephemeral part of registry
checksum = str(id(inMemoryDataset))
size = 0
info = StorageInfo(self.name, checksum, size)
info = StorageInfo(self.name)
self.registry.addStorageInfo(ref, info)

# Store time we received this content, to allow us to optionally
Expand Down
28 changes: 16 additions & 12 deletions python/lsst/daf/butler/datastores/posixDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ class PosixDatastore(Datastore):
absolute path. Can be None if no defaults specified.
"""

RecordTuple = namedtuple("PosixDatastoreRecord", ["formatter", "path", "storage_class"])
RecordTuple = namedtuple("PosixDatastoreRecord", ["formatter", "path", "storage_class",
"checksum", "size"])

@classmethod
def setConfigRoot(cls, root, config, full):
Expand Down Expand Up @@ -138,7 +139,8 @@ def __init__(self, config, registry):
self.name = "POSIXDatastore@{}".format(self.root)

# Storage of paths and formatters, keyed by dataset_id
types = {"path": str, "formatter": str, "storage_class": str, "dataset_id": int}
types = {"path": str, "formatter": str, "storage_class": str,
"size": int, "checksum": str, "dataset_id": int}
self.records = DatabaseDict.fromConfig(self.config["records"], types=types,
value=self.RecordTuple, key="dataset_id",
registry=registry)
Expand All @@ -147,7 +149,8 @@ def __str__(self):
return self.root

def addStoredFileInfo(self, ref, info):
"""Record formatter information associated with this `DatasetRef`
"""Record internal storage information associated with this
`DatasetRef`
Parameters
----------
Expand All @@ -157,7 +160,8 @@ def addStoredFileInfo(self, ref, info):
Metadata associated with the stored Dataset.
"""
self.records[ref.id] = self.RecordTuple(formatter=info.formatter, path=info.path,
storage_class=info.storageClass.name)
storage_class=info.storageClass.name,
checksum=info.checksum, size=info.size)

def removeStoredFileInfo(self, ref):
"""Remove information about the file associated with this dataset.
Expand All @@ -181,8 +185,7 @@ def getStoredFileInfo(self, ref):
Returns
-------
info : `StoredFileInfo`
Stored information about the internal location of this file
and its formatter.
Stored information about this file and its formatter.
Raises
------
Expand All @@ -194,7 +197,8 @@ def getStoredFileInfo(self, ref):
raise KeyError("Unable to retrieve formatter associated with Dataset {}".format(ref.id))
# Convert name of StorageClass to instance
storageClass = self.storageClassFactory.getStorageClass(record.storage_class)
return StoredFileInfo(record.formatter, record.path, storageClass)
return StoredFileInfo(record.formatter, record.path, storageClass,
checksum=record.checksum, size=record.size)

def exists(self, ref):
"""Check if the dataset exists in the datastore.
Expand Down Expand Up @@ -247,7 +251,6 @@ def get(self, ref, parameters=None):

# Get file metadata and internal metadata
try:
storageInfo = self.registry.getStorageInfo(ref, self.name)
storedFileInfo = self.getStoredFileInfo(ref)
except KeyError:
raise FileNotFoundError("Could not retrieve Dataset {}".format(ref))
Expand All @@ -262,9 +265,9 @@ def get(self, ref, parameters=None):
" expected location of {}".format(ref.id, location.path))
stat = os.stat(location.path)
size = stat.st_size
if size != storageInfo.size:
if size != storedFileInfo.size:
raise RuntimeError("Integrity failure in Datastore. Size of file {} ({}) does not"
" match recorded size of {}".format(location.path, size, storageInfo.size))
" match recorded size of {}".format(location.path, size, storedFileInfo.size))

# We have a write storage class and a read storage class and they
# can be different for concrete composites.
Expand Down Expand Up @@ -434,11 +437,12 @@ def ingest(self, path, ref, formatter=None, transfer=None):
checksum = self.computeChecksum(fullPath)
stat = os.stat(fullPath)
size = stat.st_size
info = StorageInfo(self.name, checksum, size)
info = StorageInfo(self.name)
self.registry.addStorageInfo(ref, info)

# Associate this dataset with the formatter for later read.
fileInfo = StoredFileInfo(formatter, path, ref.datasetType.storageClass)
fileInfo = StoredFileInfo(formatter, path, ref.datasetType.storageClass,
size=size, checksum=checksum)
# TODO: this is only transactional if the DatabaseDict uses
# self.registry internally. Probably need to add
# transactions to DatabaseDict to do better than that.
Expand Down
39 changes: 5 additions & 34 deletions python/lsst/daf/butler/registries/sqlRegistry.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,34 +528,9 @@ def addStorageInfo(self, ref, storageInfo):
"""
datasetStorageTable = self._schema.tables["DatasetStorage"]
values = dict(dataset_id=ref.id,
datastore_name=storageInfo.datastoreName,
checksum=storageInfo.checksum,
size=storageInfo.size)
datastore_name=storageInfo.datastoreName)
self._connection.execute(datasetStorageTable.insert().values(**values))

@transactional
def updateStorageInfo(self, ref, datastoreName, storageInfo):
"""Update storage information for a given dataset.
Typically used by `Datastore`.
Parameters
----------
ref : `DatasetRef`
A reference to the dataset for which to add storage information.
datastoreName : `str`
What datastore association to update.
storageInfo : `StorageInfo`
Storage information about the dataset.
"""
datasetStorageTable = self._schema.tables["DatasetStorage"]
self._connection.execute(datasetStorageTable.update().where(and_(
datasetStorageTable.c.dataset_id == ref.id,
datasetStorageTable.c.datastore_name == datastoreName)).values(
datastore_name=storageInfo.datastoreName,
checksum=storageInfo.checksum,
size=storageInfo.size))

def getStorageInfo(self, ref, datastoreName):
"""Retrieve storage information for a given dataset.
Expand All @@ -581,19 +556,15 @@ def getStorageInfo(self, ref, datastoreName):
datasetStorageTable = self._schema.tables["DatasetStorage"]
storageInfo = None
result = self._connection.execute(
select([datasetStorageTable.c.datastore_name,
datasetStorageTable.c.checksum,
datasetStorageTable.c.size]).where(
and_(datasetStorageTable.c.dataset_id == ref.id,
datasetStorageTable.c.datastore_name == datastoreName))).fetchone()
select([datasetStorageTable.c.datastore_name]).where(
and_(datasetStorageTable.c.dataset_id == ref.id,
datasetStorageTable.c.datastore_name == datastoreName))).fetchone()

if result is None:
raise KeyError("Unable to retrieve information associated with "
"Dataset {} in datastore {}".format(ref.id, datastoreName))

storageInfo = StorageInfo(datastoreName=result["datastore_name"],
checksum=result["checksum"],
size=result["size"])
storageInfo = StorageInfo(datastoreName=result["datastore_name"])
return storageInfo

@transactional
Expand Down
10 changes: 1 addition & 9 deletions tests/test_sqlRegistry.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,19 +208,11 @@ def testStorageInfo(self):
run = registry.makeRun(collection="test")
ref = registry.addDataset(datasetType, dataId={"camera": "DummyCam"}, run=run)
datastoreName = "dummystore"
checksum = "d6fb1c0c8f338044b2faaf328f91f707"
size = 512
storageInfo = StorageInfo(datastoreName, checksum, size)
storageInfo = StorageInfo(datastoreName)
# Test adding information about a new dataset
registry.addStorageInfo(ref, storageInfo)
outStorageInfo = registry.getStorageInfo(ref, datastoreName)
self.assertEqual(outStorageInfo, storageInfo)
# Test updating storage information for an existing dataset
updatedStorageInfo = StorageInfo(datastoreName, "20a38163c50f4aa3aa0f4047674f8ca7", size+1)
registry.updateStorageInfo(ref, datastoreName, updatedStorageInfo)
outStorageInfo = registry.getStorageInfo(ref, datastoreName)
self.assertNotEqual(outStorageInfo, storageInfo)
self.assertEqual(outStorageInfo, updatedStorageInfo)

def testFind(self):
registry = Registry.fromConfig(self.butlerConfig, create=True)
Expand Down
18 changes: 5 additions & 13 deletions tests/test_storageInfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,14 @@ def testConstructor(self):
"""Test of constructor.
"""
datastoreName = "dummy"
checksum = "d6fb1c0c8f338044b2faaf328f91f707"
size = 512
storageInfo = StorageInfo(datastoreName, checksum, size)
storageInfo = StorageInfo(datastoreName)
self.assertEqual(storageInfo.datastoreName, datastoreName)
self.assertEqual(storageInfo.checksum, checksum)
self.assertEqual(storageInfo.size, size)

def testEquality(self):
self.assertEqual(StorageInfo("a", "d6fb1c0c8f338044b2faaf328f91f707", 2),
StorageInfo("a", "d6fb1c0c8f338044b2faaf328f91f707", 2))
self.assertNotEqual(StorageInfo("a", "d6fb1c0c8f338044b2faaf328f91f707", 2),
StorageInfo("b", "d6fb1c0c8f338044b2faaf328f91f707", 2))
self.assertNotEqual(StorageInfo("a", "d6fb1c0c8f338044b2faaf328f91f707", 2),
StorageInfo("a", "20a38163c50f4aa3aa0f4047674f8ca7", 2))
self.assertNotEqual(StorageInfo("a", "d6fb1c0c8f338044b2faaf328f91f707", 2),
StorageInfo("a", "d6fb1c0c8f338044b2faaf328f91f707", 3))
self.assertEqual(StorageInfo("a"),
StorageInfo("a",))
self.assertNotEqual(StorageInfo("a"),
StorageInfo("b"))


class MemoryTester(lsst.utils.tests.MemoryTestCase):
Expand Down

0 comments on commit 121159f

Please sign in to comment.