Skip to content

Commit

Permalink
Use opaque tables instead of DatabaseDict in Datastores.
Browse files Browse the repository at this point in the history
  • Loading branch information
TallJimbo committed Oct 7, 2019
1 parent d2db5bd commit 797a3df
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 149 deletions.
116 changes: 49 additions & 67 deletions python/lsst/daf/butler/datastores/fileLikeDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@

import logging

from sqlalchemy import Integer, String

from dataclasses import dataclass
from typing import ClassVar, Type, Optional
from typing import Optional

from lsst.daf.butler import (
Config,
DatabaseDict,
DatabaseDictRecordBase,
DatasetTypeNotSupportedError,
DatastoreConfig,
DatastoreValidationError,
Expand All @@ -44,33 +44,18 @@
LocationFactory,
StorageClass,
StoredFileInfo,
TableSpec,
FieldSpec,
ForeignKeySpec,
)

from lsst.daf.butler.core.repoRelocation import replaceRoot
from lsst.daf.butler.core.utils import getInstanceOf
from lsst.daf.butler.core.utils import getInstanceOf, NamedValueSet
from .genericDatastore import GenericBaseDatastore

log = logging.getLogger(__name__)


@dataclass(frozen=True)
class DatastoreRecord(DatabaseDictRecordBase):
"""Describes the contents of a datastore record of a dataset in the
registry.
The record is usually populated by a `StoredFileInfo` object.
"""
__slots__ = {"path", "formatter", "storage_class", "file_size", "checksum"}
path: str
formatter: str
storage_class: str
file_size: int
checksum: str

lengths = {"path": 256, "formatter": 128, "storage_class": 64, "checksum": 128}
"""Lengths of string fields."""


@dataclass(frozen=True)
class DatastoreFileGetInformation:
"""Collection of useful parameters needed to retrieve a file from
Expand Down Expand Up @@ -118,9 +103,6 @@ class FileLikeDatastore(GenericBaseDatastore):
absolute path. Can be None if no defaults specified.
"""

Record: ClassVar[Type] = DatastoreRecord
"""Class to use to represent datastore records."""

root: str
"""Root directory or URI of this `Datastore`."""

Expand All @@ -133,9 +115,6 @@ class FileLikeDatastore(GenericBaseDatastore):
templates: FileTemplates
"""File templates that can be used by this `Datastore`."""

records: DatabaseDict
"""Place to store internal records about datasets."""

@classmethod
def setConfigRoot(cls, root, config, full, overwrite=True):
"""Set any filesystem-dependent config options for this Datastore to
Expand Down Expand Up @@ -171,6 +150,23 @@ def setConfigRoot(cls, root, config, full, overwrite=True):
toUpdate={"root": root},
toCopy=("cls", ("records", "table")), overwrite=overwrite)

@classmethod
def makeTableSpec(cls):
return TableSpec(
fields=NamedValueSet([
FieldSpec(name="dataset_id", dtype=Integer, primaryKey=True),
FieldSpec(name="path", dtype=String, length=256, nullable=False),
FieldSpec(name="formatter", dtype=String, length=128, nullable=False),
FieldSpec(name="storage_class", dtype=String, length=64, nullable=False),
# TODO: should checksum be Base64Bytes instead?
FieldSpec(name="checksum", dtype=String, length=128, nullable=True),
FieldSpec(name="file_size", dtype=Integer, nullable=True),
]),
unique=frozenset(),
foreignKeys=[ForeignKeySpec(table="dataset", source=("dataset_id",), target=("dataset_id",),
onDelete="CASCADE")]
)

def __init__(self, config, registry, butlerRoot=None):
super().__init__(config, registry)
if "root" not in self.config:
Expand Down Expand Up @@ -202,51 +198,37 @@ def __init__(self, config, registry, butlerRoot=None):
universe=self.registry.dimensions)

# Storage of paths and formatters, keyed by dataset_id
self.records = DatabaseDict.fromConfig(self.config["records"],
value=self.Record, key="dataset_id",
registry=registry)
self._tableName = self.config["records", "table"]
registry.registerOpaqueTable(self._tableName, self.makeTableSpec())

def __str__(self):
return self.root

def _info_to_record(self, info):
"""Convert a `StoredFileInfo` to a suitable database record.
Parameters
----------
info : `StoredFileInfo`
Metadata associated with the stored Dataset.
Returns
-------
record : `DatastoreRecord`
Record to be stored.
"""
return self.Record(formatter=info.formatter, path=info.path,
storage_class=info.storageClass.name,
checksum=info.checksum, file_size=info.file_size)

def _record_to_info(self, record):
"""Convert a record associated with this dataset to a `StoredItemInfo`
Parameters
----------
record : `DatastoreRecord`
Object stored in the record table.
Returns
-------
info : `StoredFileInfo`
The information associated with this dataset record as a Python
class.
"""
def addStoredItemInfo(self, ref, info):
# Docstring inherited from GenericBaseDatastore
record = dict(dataset_id=ref.id, formatter=info.formatter, path=info.path,
storage_class=info.storageClass.name,
checksum=info.checksum, file_size=info.file_size)
self.registry.insertOpaqueData(self._tableName, record)

def getStoredItemInfo(self, ref):
# Docstring inherited from GenericBaseDatastore
records = list(self.registry.fetchOpaqueData(self._tableName, dataset_id=ref.id))
if len(records) == 0:
raise KeyError("Unable to retrieve formatter associated with Dataset {}".format(ref.id))
assert len(records) == 1, "Primary key constraint should make more than one result impossible."
record = records[0]
# Convert name of StorageClass to instance
storageClass = self.storageClassFactory.getStorageClass(record.storage_class)
return StoredFileInfo(formatter=record.formatter,
path=record.path,
storageClass = self.storageClassFactory.getStorageClass(record["storage_class"])
return StoredFileInfo(formatter=record["formatter"],
path=record["path"],
storageClass=storageClass,
checksum=record.checksum,
file_size=record.file_size)
checksum=record["checksum"],
file_size=record["file_size"])

def removeStoredItemInfo(self, ref):
# Docstring inherited from GenericBaseDatastore
self.registry.deleteOpaqueData(self._tableName, dataset_id=ref.id)

def _get_dataset_location_info(self, ref):
"""Find the `Location` of the requested dataset in the
Expand Down
57 changes: 7 additions & 50 deletions python/lsst/daf/butler/datastores/genericDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
__all__ = ("GenericBaseDatastore", )

import logging
from typing import MutableMapping
from abc import abstractmethod

from lsst.daf.butler import Datastore, DatasetTypeNotSupportedError

Expand All @@ -37,41 +37,7 @@ class GenericBaseDatastore(Datastore):
Should always be sub-classed since key abstract methods are missing.
"""

records: MutableMapping
"""Place to store internal records about datasets."""

def _info_to_record(self, info):
"""Convert a `StoredDatastoreItemInfo` to a suitable database record.
Parameters
----------
info : `StoredDatastoreItemInfo`
Metadata associated with the stored Dataset.
Returns
-------
record : `MutableMapping`
Record to be stored.
"""
raise NotImplementedError("Must be implemented by subclass")

def _record_to_info(self, record):
"""Convert a record associated with this dataset to a
`StoredDatastoreItemInfo`
Parameters
----------
record : `MutableMapping`
Object stored in the record table.
Returns
-------
info : `StoredDatastoreItemInfo`
The information associated with this dataset record as a Python
class.
"""
raise NotImplementedError("Must be implemented by subclass")

@abstractmethod
def addStoredItemInfo(self, ref, info):
"""Record internal storage information associated with this
`DatasetRef`
Expand All @@ -83,11 +49,9 @@ def addStoredItemInfo(self, ref, info):
info : `StoredDatastoreItemInfo`
Metadata associated with the stored Dataset.
"""
if ref.id in self.records:
raise KeyError("Attempt to store item info with ID {}"
" when that ID exists as '{}'".format(ref.id, self.records[ref.id]))
self.records[ref.id] = self._info_to_record(info)
raise NotImplementedError()

@abstractmethod
def getStoredItemInfo(self, ref):
"""Retrieve information associated with file stored in this
`Datastore`.
Expand All @@ -107,12 +71,9 @@ def getStoredItemInfo(self, ref):
KeyError
Dataset with that id can not be found.
"""
record = self.records.get(ref.id, None)
if record is None:
raise KeyError("Unable to retrieve formatter associated with Dataset {}".format(ref.id))

return self._record_to_info(record)
raise NotImplementedError()

@abstractmethod
def removeStoredItemInfo(self, ref):
"""Remove information about the file associated with this dataset.
Expand All @@ -121,7 +82,7 @@ def removeStoredItemInfo(self, ref):
ref : `DatasetRef`
The Dataset that has been removed.
"""
del self.records[ref.id]
raise NotImplementedError()

def _register_dataset(self, ref, itemInfo):
"""Update registry to indicate that this dataset has been stored.
Expand All @@ -134,10 +95,6 @@ def _register_dataset(self, ref, itemInfo):
Internal datastore metadata associated with this dataset.
"""
self.registry.addDatasetLocation(ref, self.name)

# TODO: this is only transactional if the DatabaseDict uses
# self.registry internally. Probably need to add
# transactions to DatabaseDict to do better than that.
self.addStoredItemInfo(ref, itemInfo)

# Register all components with same information
Expand Down
41 changes: 9 additions & 32 deletions python/lsst/daf/butler/datastores/inMemoryDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,40 +136,17 @@ def setConfigRoot(cls, root, config, full, overwrite=True):
"""
return

def _info_to_record(self, info):
"""Convert a `StoredItemInfo` to a suitable database record.
def addStoredItemInfo(self, ref, info):
# Docstring inherited from GenericBaseDatastore.
self.records[ref.id] = info

Parameters
----------
info : `StoredItemInfo`
Metadata associated with the stored Dataset.
Returns
-------
record : `StoredItemInfo`
Record to be stored.
"""
return info

def _record_to_info(self, record):
"""Convert a record associated with this dataset to a `StoredItemInfo`
Parameters
----------
record : `StoredItemInfo`
Object stored in the record table.
Returns
-------
info : `StoredItemInfo`
The information associated with this dataset record as a Python
class.
def getStoredItemInfo(self, ref):
# Docstring inherited from GenericBaseDatastore.
return self.records[ref.id]

Notes
-----
Returns the record directly.
"""
return record
def removeStoredItemInfo(self, ref):
# Docstring inherited from GenericBaseDatastore.
del self.records[ref.id]

def exists(self, ref):
"""Check if the dataset exists in the datastore.
Expand Down

0 comments on commit 797a3df

Please sign in to comment.