Skip to content

Commit

Permalink
Remove StorageInfo and use datastore name directly
Browse files Browse the repository at this point in the history
StorageInfo now only has a single item in it so
change all the APIs to be explicit about that item.
Rename the StorageInfo registry methods as Address
methods. getAddresses now returns all matching
datastores.
  • Loading branch information
timj committed Aug 23, 2018
1 parent 121159f commit b4a1399
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 187 deletions.
1 change: 0 additions & 1 deletion python/lsst/daf/butler/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from .run import *
from .schema import *
from .storageClass import *
from .storageInfo import *
from .storedFileInfo import *
from .dataUnit import *
from .databaseDict import *
31 changes: 13 additions & 18 deletions python/lsst/daf/butler/core/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,49 +383,44 @@ def disassociate(self, collection, refs, remove=True):

@abstractmethod
@transactional
def addStorageInfo(self, ref, storageInfo):
"""Add storage information for a given dataset.
def addDatasetLocation(self, ref, datastoreName):
"""Add datastore name locating a given dataset.
Typically used by `Datastore`.
Parameters
----------
ref : `DatasetRef`
A reference to the dataset for which to add storage information.
storageInfo : `StorageInfo`
Storage information about the dataset.
datastoreName : `str`
Name of the datastore holding this dataset.
"""
raise NotImplementedError("Must be implemented by subclass")

@abstractmethod
def getStorageInfo(self, ref, datastoreName):
"""Retrieve storage information for a given dataset.
def getDatasetLocations(self, ref):
"""Retrieve datastore locations for a given dataset.
Typically used by `Datastore`.
Parameters
----------
ref : `DatasetRef`
A reference to the dataset for which to add storage information.
datastoreName : `str`
What datastore association to update.
A reference to the dataset for which to retrieve storage
information.
Returns
-------
info : `StorageInfo`
Storage information about the dataset.
Raises
------
KeyError
The requested Dataset does not exist.
datastores : `set` of `str`
All the matching datastores holding this dataset. Empty set
if the dataset does not exist anywhere.
"""
raise NotImplementedError("Must be implemented by subclass")

@abstractmethod
@transactional
def removeStorageInfo(self, datastoreName, ref):
"""Remove storage information associated with this dataset.
def removeDatasetLocation(self, datastoreName, ref):
"""Remove datastore location associated with this dataset.
Typically used by `Datastore` when a dataset is removed.
Expand Down
53 changes: 0 additions & 53 deletions python/lsst/daf/butler/core/storageInfo.py

This file was deleted.

4 changes: 2 additions & 2 deletions python/lsst/daf/butler/core/storedFileInfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,5 +94,5 @@ def size(self):
return self._size

def __repr__(self):
return f"{type(self).__qualname__}(path=\"{self.path}\", formatter=\"{self.formatter}\"" \
f" size={self.size}, checksum=\"{self.checksum}\", storageClass=\"{self.storageClass.name}\")"
return f'{type(self).__qualname__}(path="{self.path}", formatter="{self.formatter}"' \
f' size={self.size}, checksum="{self.checksum}", storageClass="{self.storageClass.name}")'
10 changes: 4 additions & 6 deletions python/lsst/daf/butler/datastores/inMemoryDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

from lsst.daf.butler.core.datastore import Datastore
from lsst.daf.butler.core.storageClass import StorageClassFactory
from lsst.daf.butler.core.storageInfo import StorageInfo

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -316,8 +315,7 @@ def put(self, inMemoryDataset, ref):
# We have to register this content with registry.
# Currently this assumes we have a file so we need to use stub entries
# TODO: Add to ephemeral part of registry
info = StorageInfo(self.name)
self.registry.addStorageInfo(ref, info)
self.registry.addDatasetLocation(ref, self.name)

# Store time we received this content, to allow us to optionally
# expire it. Instead of storing a filename here, we include the
Expand All @@ -327,7 +325,7 @@ def put(self, inMemoryDataset, ref):

# Register all components with same information
for compRef in ref.components.values():
self.registry.addStorageInfo(compRef, info)
self.registry.addDatasetLocation(compRef, self.name)
self.addStoredItemInfo(compRef, itemInfo)

if self._transaction is not None:
Expand Down Expand Up @@ -395,9 +393,9 @@ def remove(self, ref):

# Remove rows from registries
self.removeStoredItemInfo(ref)
self.registry.removeStorageInfo(self.name, ref)
self.registry.removeDatasetLocation(self.name, ref)
for compRef in ref.components.values():
self.registry.removeStorageInfo(self.name, compRef)
self.registry.removeDatasetLocation(self.name, compRef)
self.removeStoredItemInfo(compRef)

def transfer(self, inputDatastore, ref):
Expand Down
10 changes: 4 additions & 6 deletions python/lsst/daf/butler/datastores/posixDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
from lsst.daf.butler.core.fileDescriptor import FileDescriptor
from lsst.daf.butler.core.formatter import FormatterFactory
from lsst.daf.butler.core.fileTemplates import FileTemplates
from lsst.daf.butler.core.storageInfo import StorageInfo
from lsst.daf.butler.core.storedFileInfo import StoredFileInfo
from lsst.daf.butler.core.utils import getInstanceOf, transactional
from lsst.daf.butler.core.storageClass import StorageClassFactory
Expand Down Expand Up @@ -437,8 +436,7 @@ def ingest(self, path, ref, formatter=None, transfer=None):
checksum = self.computeChecksum(fullPath)
stat = os.stat(fullPath)
size = stat.st_size
info = StorageInfo(self.name)
self.registry.addStorageInfo(ref, info)
self.registry.addDatasetLocation(ref, self.name)

# Associate this dataset with the formatter for later read.
fileInfo = StoredFileInfo(formatter, path, ref.datasetType.storageClass,
Expand All @@ -450,7 +448,7 @@ def ingest(self, path, ref, formatter=None, transfer=None):

# Register all components with same information
for compRef in ref.components.values():
self.registry.addStorageInfo(compRef, info)
self.registry.addDatasetLocation(compRef, self.name)
self.addStoredFileInfo(compRef, fileInfo)

def getUri(self, ref, predict=False):
Expand Down Expand Up @@ -535,9 +533,9 @@ def remove(self, ref):

# Remove rows from registries
self.removeStoredFileInfo(ref)
self.registry.removeStorageInfo(self.name, ref)
self.registry.removeDatasetLocation(self.name, ref)
for compRef in ref.components.values():
self.registry.removeStorageInfo(self.name, compRef)
self.registry.removeDatasetLocation(self.name, compRef)
self.removeStoredFileInfo(compRef)

def transfer(self, inputDatastore, ref):
Expand Down
45 changes: 16 additions & 29 deletions python/lsst/daf/butler/registries/sqlRegistry.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
from ..core.execution import Execution
from ..core.run import Run
from ..core.quantum import Quantum
from ..core.storageInfo import StorageInfo
from ..core.storageClass import StorageClassFactory
from ..core.config import Config
from ..core.sqlRegistryDatabaseDict import SqlRegistryDatabaseDict
Expand Down Expand Up @@ -514,62 +513,50 @@ def disassociate(self, collection, refs, remove=True):
return []

@transactional
def addStorageInfo(self, ref, storageInfo):
"""Add storage information for a given dataset.
def addDatasetLocation(self, ref, datastoreName):
"""Add datastore name locating a given dataset.
Typically used by `Datastore`.
Parameters
----------
ref : `DatasetRef`
A reference to the dataset for which to add storage information.
storageInfo : `StorageInfo`
Storage information about the dataset.
datastoreName : `str`
Name of the datastore holding this dataset.
"""
datasetStorageTable = self._schema.tables["DatasetStorage"]
values = dict(dataset_id=ref.id,
datastore_name=storageInfo.datastoreName)
datastore_name=datastoreName)
self._connection.execute(datasetStorageTable.insert().values(**values))

def getStorageInfo(self, ref, datastoreName):
"""Retrieve storage information for a given dataset.
def getDatasetLocations(self, ref):
"""Retrieve datastore locations for a given dataset.
Typically used by `Datastore`.
Parameters
----------
ref : `DatasetRef`
A reference to the dataset for which to add storage information.
datastoreName : `str`
What datastore association to update.
A reference to the dataset for which to retrieve storage
information.
Returns
-------
info : `StorageInfo`
Storage information about the dataset.
Raises
------
KeyError
The requested Dataset does not exist.
datastores : `set` of `str`
All the matching datastores holding this dataset. Empty set
if the dataset does not exist anywhere.
"""
datasetStorageTable = self._schema.tables["DatasetStorage"]
storageInfo = None
result = self._connection.execute(
select([datasetStorageTable.c.datastore_name]).where(
and_(datasetStorageTable.c.dataset_id == ref.id,
datasetStorageTable.c.datastore_name == datastoreName))).fetchone()

if result is None:
raise KeyError("Unable to retrieve information associated with "
"Dataset {} in datastore {}".format(ref.id, datastoreName))
and_(datasetStorageTable.c.dataset_id == ref.id))).fetchall()

storageInfo = StorageInfo(datastoreName=result["datastore_name"])
return storageInfo
return {r["datastore_name"] for r in result}

@transactional
def removeStorageInfo(self, datastoreName, ref):
"""Remove storage information associated with this dataset.
def removeDatasetLocation(self, datastoreName, ref):
"""Remove datastore location associated with this dataset.
Typically used by `Datastore` when a dataset is removed.
Expand Down
14 changes: 8 additions & 6 deletions tests/dummyRegistry.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,23 @@ def __init__(self):
self._counter = 0
self._entries = {}

def addStorageInfo(self, ref, storageInfo):
def addDatasetLocation(self, ref, datastoreName):
# Only set ID if ID is 0 or None
incrementCounter = True
if ref.id is None or ref.id == 0:
ref._id = self._counter
incrementCounter = False
self._entries[(storageInfo.datastoreName, ref.id)] = storageInfo
if ref.id not in self._entries:
self._entries[ref.id] = set()
self._entries[ref.id].add(datastoreName)
if incrementCounter:
self._counter += 1

def getStorageInfo(self, ref, datastoreName):
return self._entries[(datastoreName, ref.id)]
def getDatasetLocations(self, ref):
return self._entries[ref.id].copy()

def removeStorageInfo(self, datastoreName, ref):
del self._entries[(datastoreName, ref.id)]
def removeDatasetLocation(self, datastoreName, ref):
self._entries[ref.id].remove(datastoreName)

def makeDatabaseDict(self, table, types, key, value):
return dict()
Expand Down
33 changes: 27 additions & 6 deletions tests/test_sqlRegistry.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import lsst.sphgeom

from sqlalchemy.exc import OperationalError
from lsst.daf.butler.core.storageInfo import StorageInfo
from lsst.daf.butler.core.execution import Execution
from lsst.daf.butler.core.quantum import Quantum
from lsst.daf.butler.core.run import Run
Expand Down Expand Up @@ -198,21 +197,43 @@ def testQuantum(self):
outQuantum = registry.getQuantum(quantum.id)
self.assertEqual(outQuantum, quantum)

def testStorageInfo(self):
def testDatasetLocations(self):
registry = Registry.fromConfig(self.butlerConfig, create=True)
storageClass = StorageClass("testStorageInfo")
registry.storageClasses.registerStorageClass(storageClass)
datasetType = DatasetType(name="test", dataUnits=("Camera",), storageClass=storageClass)
datasetType2 = DatasetType(name="test2", dataUnits=("Camera",), storageClass=storageClass)
registry.registerDatasetType(datasetType)
registry.registerDatasetType(datasetType2)
registry.addDataUnitEntry("Camera", {"camera": "DummyCam"})
run = registry.makeRun(collection="test")
ref = registry.addDataset(datasetType, dataId={"camera": "DummyCam"}, run=run)
ref2 = registry.addDataset(datasetType2, dataId={"camera": "DummyCam"}, run=run)
datastoreName = "dummystore"
storageInfo = StorageInfo(datastoreName)
datastoreName2 = "dummystore2"
# Test adding information about a new dataset
registry.addStorageInfo(ref, storageInfo)
outStorageInfo = registry.getStorageInfo(ref, datastoreName)
self.assertEqual(outStorageInfo, storageInfo)
registry.addDatasetLocation(ref, datastoreName)
addresses = registry.getDatasetLocations(ref)
self.assertIn(datastoreName, addresses)
self.assertEqual(len(addresses), 1)
registry.addDatasetLocation(ref, datastoreName2)
registry.addDatasetLocation(ref2, datastoreName2)
addresses = registry.getDatasetLocations(ref)
self.assertEqual(len(addresses), 2)
self.assertIn(datastoreName, addresses)
self.assertIn(datastoreName2, addresses)
registry.removeDatasetLocation(datastoreName, ref)
addresses = registry.getDatasetLocations(ref)
self.assertEqual(len(addresses), 1)
self.assertNotIn(datastoreName, addresses)
self.assertIn(datastoreName2, addresses)
registry.removeDatasetLocation(datastoreName2, ref)
addresses = registry.getDatasetLocations(ref)
self.assertEqual(len(addresses), 0)
self.assertNotIn(datastoreName2, addresses)
addresses = registry.getDatasetLocations(ref2)
self.assertEqual(len(addresses), 1)
self.assertIn(datastoreName2, addresses)

def testFind(self):
registry = Registry.fromConfig(self.butlerConfig, create=True)
Expand Down

0 comments on commit b4a1399

Please sign in to comment.